Нейро-символический гибридный агент: планирование встречает восприятие для устойчивой автономии
'Практическое руководство по созданию гибридного агента, который сочетает классическое планирование и нейронное восприятие, с полным Python-кодом для планирования, восприятия и визуализации.'
Символический планировщик как основа логики
Слой высокого уровня реализован с помощью классического символического планировщика, который описывает состояния, действия и переходы. Этот компонент генерирует интерпретируемые планы, направленные на достижение цели, используя A-star по дискретным состояниям. Планировщик учитывает движения (up, down, left, right), а также символические операции (pickup, drop), и соблюдает ограничения сетки, препятствий и коллекционирования объектов.
import numpy as np
import matplotlib.pyplot as plt
from dataclasses import dataclass, field
from typing import List, Dict, Tuple, Set, Optional
from collections import deque
import warnings
warnings.filterwarnings('ignore')
@dataclass
class State:
robot_pos: Tuple[int, int]
holding: Optional[str] = None
visited: Set[Tuple[int, int]] = field(default_factory=set)
objects_collected: Set[str] = field(default_factory=set)
def __hash__(self):
return hash((self.robot_pos, self.holding))
class SymbolicPlanner:
def __init__(self, grid_size: int = 8):
self.grid_size = grid_size
self.actions = ['up', 'down', 'left', 'right', 'pickup', 'drop']
def get_successors(self, state: State, obstacles: Set[Tuple[int, int]], objects: Dict[str, Tuple[int, int]]) -> List[Tuple[str, State]]:
successors = []
x, y = state.robot_pos
moves = {'up': (x, y-1), 'down': (x, y+1), 'left': (x-1, y), 'right': (x+1, y)}
for action, new_pos in moves.items():
nx, ny = new_pos
if (0 <= nx < self.grid_size and 0 <= ny < self.grid_size and new_pos not in obstacles):
new_state = State(new_pos, state.holding, state.visited | {new_pos}, state.objects_collected.copy())
successors.append((action, new_state))
if state.holding is None:
for obj_name, obj_pos in objects.items():
if state.robot_pos == obj_pos and obj_name not in state.objects_collected:
new_state = State(state.robot_pos, obj_name, state.visited.copy(), state.objects_collected.copy())
successors.append(('pickup', new_state))
if state.holding is not None:
new_state = State(state.robot_pos, None, state.visited.copy(), state.objects_collected | {state.holding})
successors.append(('drop', new_state))
return successors
def heuristic(self, state: State, goal: Tuple[int, int]) -> float:
return abs(state.robot_pos[0] - goal[0]) + abs(state.robot_pos[1] - goal[1])
def a_star_plan(self, start_state: State, goal: Tuple[int, int], obstacles: Set[Tuple[int, int]], objects: Dict[str, Tuple[int, int]]) -> List[str]:
counter = 0
frontier = [(self.heuristic(start_state, goal), counter, 0, start_state, [])]
visited = set()
while frontier:
frontier.sort()
_, _, cost, state, plan = frontier.pop(0)
counter += 1
if state.robot_pos == goal and len(state.objects_collected) >= len(objects):
return plan
state_key = (state.robot_pos, state.holding)
if state_key in visited:
continue
visited.add(state_key)
for action, next_state in self.get_successors(state, obstacles, objects):
new_cost = cost + 1
new_plan = plan + [action]
priority = new_cost + self.heuristic(next_state, goal)
frontier.append((priority, counter, new_cost, next_state, new_plan))
counter += 1
return []Нейронное восприятие и политика
Нейронный компонент берёт зашумлённую карту окружения и пропускает её через компактную сеть, чтобы получить вероятностную карту препятствий. Политика — это лёгкая сеть, которая по компактным признакам состояния предсказывает вероятности движений. При выборе действия политика может усиливать вероятность действия, рекомендованного символическим планировщиком, тем самым смешивая правила и обучение.
class NeuralPerception:
def __init__(self, grid_size: int = 8):
self.grid_size = grid_size
self.W1 = np.random.randn(grid_size * grid_size, 64) * 0.1
self.b1 = np.zeros(64)
self.W2 = np.random.randn(64, 32) * 0.1
self.b2 = np.zeros(32)
self.W3 = np.random.randn(32, grid_size * grid_size) * 0.1
self.b3 = np.zeros(grid_size * grid_size)
def relu(self, x):
return np.maximum(0, x)
def sigmoid(self, x):
return 1 / (1 + np.exp(-np.clip(x, -500, 500)))
def perceive(self, noisy_grid: np.ndarray) -> np.ndarray:
x = noisy_grid.flatten()
h1 = self.relu(x @ self.W1 + self.b1)
h2 = self.relu(h1 @ self.W2 + self.b2)
out = self.sigmoid(h2 @ self.W3 + self.b3)
return out.reshape(self.grid_size, self.grid_size)
class NeuralPolicy:
def __init__(self, state_dim: int = 4, action_dim: int = 4):
self.W = np.random.randn(state_dim, action_dim) * 0.1
self.b = np.zeros(action_dim)
self.action_map = ['up', 'down', 'left', 'right']
def softmax(self, x):
exp_x = np.exp(x - np.max(x))
return exp_x / exp_x.sum()
def get_action_probs(self, state_features: np.ndarray) -> np.ndarray:
logits = state_features @ self.W + self.b
return self.softmax(logits)
def select_action(self, state_features: np.ndarray, symbolic_action: str) -> str:
probs = self.get_action_probs(state_features)
if symbolic_action in self.action_map:
sym_idx = self.action_map.index(symbolic_action)
probs[sym_idx] += 0.7
probs = probs / probs.sum()
return np.random.choice(self.action_map, p=probs)Интеграция и выполнение миссии
Интеграционный слой объединяет планировщик и нейронные модули: план генерируется символически, наблюдение зашумляется и очищается нейросетью, затем по каждому шагу плана политика может уточнять действие. Цикл выполнения обновляет положение, избегает препятствий и собирает траекторию и список реальных действий.
class NeuroSymbolicAgent:
def __init__(self, grid_size: int = 8):
self.grid_size = grid_size
self.planner = SymbolicPlanner(grid_size)
self.perception = NeuralPerception(grid_size)
self.policy = NeuralPolicy()
self.obstacles = {(3, 3), (3, 4), (4, 3), (5, 5), (6, 2)}
self.objects = {'key': (2, 6), 'gem': (6, 6)}
self.goal = (7, 7)
def create_noisy_observation(self, true_grid: np.ndarray) -> np.ndarray:
noise = np.random.randn(*true_grid.shape) * 0.2
return np.clip(true_grid + noise, 0, 1)
def extract_state_features(self, pos: Tuple[int, int], goal: Tuple[int, int]) -> np.ndarray:
return np.array([pos[0]/self.grid_size, pos[1]/self.grid_size, goal[0]/self.grid_size, goal[1]/self.grid_size])
def execute_mission(self, verbose: bool = True) -> Tuple[List, List]:
start_state = State(robot_pos=(0, 0), visited={(0, 0)})
symbolic_plan = self.planner.a_star_plan(start_state, self.goal, self.obstacles, self.objects)
if verbose:
print(f" Symbolic Plan Generated: {len(symbolic_plan)} steps")
print(f" Plan: {symbolic_plan[:10]}{'...' if len(symbolic_plan) > 10 else ''}\n")
true_grid = np.zeros((self.grid_size, self.grid_size))
for obs in self.obstacles:
true_grid[obs[1], obs[0]] = 1.0
noisy_obs = self.create_noisy_observation(true_grid)
perceived_grid = self.perception.perceive(noisy_obs)
if verbose:
print(f" Neural Perception: Denoised obstacle map")
print(f" Perception accuracy: {np.mean((perceived_grid > 0.5) == true_grid):.2%}\n")
trajectory = [(0, 0)]
current_pos = (0, 0)
actions_taken = []
for i, sym_action in enumerate(symbolic_plan[:30]):
features = self.extract_state_features(current_pos, self.goal)
refined_action = self.policy.select_action(features, sym_action) if sym_action in ['up','down','left','right'] else sym_action
actions_taken.append(refined_action)
if refined_action == 'up': current_pos = (current_pos[0], max(0, current_pos[1]-1))
elif refined_action == 'down': current_pos = (current_pos[0], min(self.grid_size-1, current_pos[1]+1))
elif refined_action == 'left': current_pos = (max(0, current_pos[0]-1), current_pos[1])
elif refined_action == 'right': current_pos = (min(self.grid_size-1, current_pos[0]+1), current_pos[1])
if current_pos not in self.obstacles:
trajectory.append(current_pos)
return trajectory, actions_takenВизуализация и практические выводы
Хелпер по визуализации отображает препятствия, объекты, цель и траекторию агента, чтобы можно было проанализировать поведение. Основные идеи: символический слой даёт прозрачность и верифицируемость, нейронный слой обеспечивает устойчивость к шумным данным и адаптацию. Вместе они формируют агент, который одновременно интерпретируем и устойчив.
def visualize_execution(agent: NeuroSymbolicAgent, trajectory: List, title: str = "Neuro-Symbolic Agent Execution"):
fig, axes = plt.subplots(1, 2, figsize=(14, 6))
ax = axes[0]
grid = np.zeros((agent.grid_size, agent.grid_size, 3))
for obs in agent.obstacles:
grid[obs[1], obs[0]] = [0.3, 0.3, 0.3]
for obj_pos in agent.objects.values():
grid[obj_pos[1], obj_pos[0]] = [1.0, 0.8, 0.0]
grid[agent.goal[1], agent.goal[0]] = [0.0, 1.0, 0.0]
for i, pos in enumerate(trajectory):
intensity = 0.3 + 0.7 * (i / len(trajectory))
grid[pos[1], pos[0]] = [intensity, 0.0, 1.0]
if trajectory:
grid[trajectory[0][1], trajectory[0][0]] = [1.0, 0.0, 0.0]
ax.imshow(grid)
ax.set_title("Agent Trajectory in Environment", fontsize=14, fontweight='bold')
ax.set_xlabel("X Position")
ax.set_ylabel("Y Position")
ax.grid(True, alpha=0.3)
ax = axes[1]
ax.axis('off')
ax.text(0.5, 0.95, "Neuro-Symbolic Architecture", ha='center', fontsize=16, fontweight='bold', transform=ax.transAxes)
layers = [("SYMBOLIC LAYER", 0.75, "Planning • State Logic • Rules"), (" INTEGRATION", 0.60, "Feature Extraction • Action Blending"), ("NEURAL LAYER", 0.45, "Perception • Policy Learning"), (" EXECUTION", 0.30, "Action Refinement • Feedback"), ("ENVIRONMENT", 0.15, "State Transitions • Observations")]
colors = ['#FF6B6B', '#4ECDC4', '#45B7D1', '#96CEB4', '#FFEAA7']
for i, (name, y, desc) in enumerate(layers):
ax.add_patch(plt.Rectangle((0.1, y-0.05), 0.8, 0.08, facecolor=colors[i], alpha=0.7, transform=ax.transAxes))
ax.text(0.5, y, f"{name}\n{desc}", ha='center', va='center', fontsize=10, fontweight='bold', transform=ax.transAxes)
plt.tight_layout()
plt.savefig('neurosymbolic_agent.png', dpi=150, bbox_inches='tight')
plt.show()
print(f"\n Execution complete! Trajectory length: {len(trajectory)} steps")
if __name__ == "__main__":
print("=" * 70)
print("NEURO-SYMBOLIC HYBRID AGENT TUTORIAL")
print("Combining Classical AI Planning with Modern Neural Networks")
print("=" * 70)
print()
agent = NeuroSymbolicAgent(grid_size=8)
trajectory, actions = agent.execute_mission(verbose=True)
visualize_execution(agent, trajectory)
print("\n" + "=" * 70)
print("KEY INSIGHTS:")
print("=" * 70)
print("✦ Symbolic Layer: Provides interpretable, verifiable plans")
print("✦ Neural Layer: Handles noisy perception & adapts to uncertainty")
print("✦ Integration: Combines strengths of both paradigms")
print("✦ Benefits: Explainability + Flexibility + Robustness")
print("=" * 70)Практические заметки подсказывают, где развивать эту демонстрацию: заменить случайные веса на обученные модели, расширить вектор признаков для политики, и усложнить планирование для более сложных задач.
Switch Language
Read this article in English