Мини-Multi-Agent RL в GridWorld: Q-обучение, локальная обратная связь и координатор-супервизор
'Практический туториал демонстрирует, как Action, Tool и Supervisor агенты сотрудничют в GridWorld: Q-обучение, локальные подсказки и контроль для безопасной навигации.'
Окружение: настройка GridWorld
Начинаем с определения компактного окружения GridWorld, где агент перемещается из левого верхнего угла в правый нижний, избегая препятствий. Окружение отслеживает посещённые клетки, количество шагов и доступные действия, формируя представление состояния для простых RL-экспериментов. Ниже код создаёт сетку, размещает препятствия и предоставляет состояние и допустимые перемещения.
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import clear_output
import time
from collections import defaultdict
class GridWorld:
def __init__(self, size=8):
self.size = size
self.agent_pos = [0, 0]
self.goal_pos = [size-1, size-1]
self.obstacles = self._generate_obstacles()
self.visited = set()
self.step_count = 0
self.max_steps = size * size * 2
def _generate_obstacles(self):
obstacles = set()
n_obstacles = self.size
while len(obstacles) < n_obstacles:
pos = (np.random.randint(1, self.size-1),
np.random.randint(1, self.size-1))
if pos != (0, 0) and pos != (self.size-1, self.size-1):
obstacles.add(pos)
return obstacles
def reset(self):
self.agent_pos = [0, 0]
self.visited = {tuple(self.agent_pos)}
self.step_count = 0
return self._get_state()
def _get_state(self):
return {
'position': tuple(self.agent_pos),
'goal': self.goal_pos,
'distance_to_goal': abs(self.agent_pos[0] - self.goal_pos[0]) +
abs(self.agent_pos[1] - self.goal_pos[1]),
'visited_count': len(self.visited),
'steps': self.step_count,
'can_move': self._get_valid_actions()
}
def _get_valid_actions(self):
valid = []
moves = {'up': [-1, 0], 'down': [1, 0], 'left': [0, -1], 'right': [0, 1]}
for action, delta in moves.items():
new_pos = [self.agent_pos[0] + delta[0], self.agent_pos[1] + delta[1]]
if (0 <= new_pos[0] < self.size and 0 <= new_pos[1] < self.size and
tuple(new_pos) not in self.obstacles):
valid.append(action)
return validШаги, награды и визуализация
Окружение реализует step, который обрабатывает движение, столкновения, вычисляет награду и условия завершения эпизода. render использует matplotlib для визуализации посещённых клеток, препятствий, цели и позиции агента, что позволяет наблюдать процесс обучения.
class GridWorld(GridWorld):
def step(self, action):
self.step_count += 1
moves = {'up': [-1, 0], 'down': [1, 0], 'left': [0, -1], 'right': [0, 1]}
if action not in moves:
return self._get_state(), -1, False, "Invalid action"
delta = moves[action]
new_pos = [self.agent_pos[0] + delta[0], self.agent_pos[1] + delta[1]]
if not (0 <= new_pos[0] < self.size and 0 <= new_pos[1] < self.size):
return self._get_state(), -1, False, "Hit wall"
if tuple(new_pos) in self.obstacles:
return self._get_state(), -1, False, "Hit obstacle"
self.agent_pos = new_pos
pos_tuple = tuple(self.agent_pos)
reward = -0.1
if pos_tuple not in self.visited:
reward += 0.5
self.visited.add(pos_tuple)
done = False
info = "Moved"
if self.agent_pos == self.goal_pos:
reward += 10
done = True
info = "Goal reached!"
elif self.step_count >= self.max_steps:
done = True
info = "Max steps reached"
return self._get_state(), reward, done, info
def render(self, agent_thoughts=None):
grid = np.zeros((self.size, self.size, 3))
for pos in self.visited:
grid[pos[0], pos[1]] = [0.7, 0.9, 1.0]
for obs in self.obstacles:
grid[obs[0], obs[1]] = [0.2, 0.2, 0.2]
grid[self.goal_pos[0], self.goal_pos[1]] = [0, 1, 0]
grid[self.agent_pos[0], self.agent_pos[1]] = [1, 0, 0]
plt.figure(figsize=(10, 8))
plt.imshow(grid, interpolation='nearest')
plt.title(f"Step: {self.step_count} | Visited: {len(self.visited)}/{self.size*self.size}")
for i in range(self.size + 1):
plt.axhline(i - 0.5, color='gray', linewidth=0.5)
plt.axvline(i - 0.5, color='gray', linewidth=0.5)
if agent_thoughts:
plt.text(0.5, -1.5, agent_thoughts, ha='center', fontsize=9,
bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.8),
wrap=True, transform=plt.gca().transData)
plt.axis('off')
plt.tight_layout()
plt.show()Агенты: ActionAgent и ToolAgent
Два компонента обеспечивают поведение: ActionAgent обучается простым Q-подобным обновлением и балансирует исследование/использование, а ToolAgent анализирует состояние, историю и тренды вознаграждений, выдавая подсказки — короткие рекомендации для корректировки стратегии.
class ActionAgent:
def __init__(self):
self.q_values = defaultdict(lambda: defaultdict(float))
self.epsilon = 0.3
self.learning_rate = 0.1
self.discount = 0.95
def choose_action(self, state):
valid_actions = state['can_move']
if not valid_actions:
return None
pos = state['position']
if np.random.random() < self.epsilon:
action = np.random.choice(valid_actions)
reasoning = f"Exploring randomly: chose '{action}'"
else:
action_values = {a: self.q_values[pos][a] for a in valid_actions}
action = max(action_values, key=action_values.get)
reasoning = f"Exploiting: chose '{action}' (Q={self.q_values[pos][action]:.2f})"
return action, reasoning
def learn(self, state, action, reward, next_state):
pos = state['position']
next_pos = next_state['position']
current_q = self.q_values[pos][action]
next_max_q = max([self.q_values[next_pos][a] for a in next_state['can_move']], default=0)
new_q = current_q + self.learning_rate * (
reward + self.discount * next_max_q - current_q)
self.q_values[pos][action] = new_q
class ToolAgent:
def analyze(self, state, action_taken, reward, history):
suggestions = []
distance = state['distance_to_goal']
if distance <= 3:
suggestions.append(" Very close to goal! Prioritize direct path.")
exploration_rate = state['visited_count'] / (state['steps'] + 1)
if exploration_rate < 0.5 and distance > 5:
suggestions.append(" Low exploration rate. Consider exploring more.")
if len(history) >= 5:
recent_rewards = [h[2] for h in history[-5:]]
avg_reward = np.mean(recent_rewards)
if avg_reward < -0.5:
suggestions.append(" Negative reward trend. Try different strategy.")
elif avg_reward > 0.3:
suggestions.append(" Good progress! Current strategy working.")
if len(state['can_move']) <= 2:
suggestions.append(" Limited movement options. Be careful.")
return suggestionsСупервизор: слой принятия окончательного решения
Супервизор оценивает предложенное действие и подсказки ToolAgent и при необходимости переопределяет выбор агента так, чтобы движение было безопаснее или ближе к цели. Это создаёт многоуровневый поток решений: обучение + анализ + надзор.
class SupervisorAgent:
def decide(self, state, proposed_action, tool_suggestions):
if not proposed_action:
return None, "No valid actions available"
decision = proposed_action
reasoning = f"Approved action '{proposed_action}'"
for suggestion in tool_suggestions:
if "goal" in suggestion.lower() and "close" in suggestion.lower():
goal_direction = self._get_goal_direction(state)
if goal_direction in state['can_move']:
decision = goal_direction
reasoning = f"Override: Moving '{goal_direction}' toward goal"
break
return decision, reasoning
def _get_goal_direction(self, state):
pos = state['position']
goal = state['goal']
if goal[0] > pos[0]:
return 'down'
elif goal[0] < pos[0]:
return 'up'
elif goal[1] > pos[1]:
return 'right'
else:
return 'left'Цикл обучения и визуализация
Цикл обучения выполняет эпизоды, где ActionAgent предлагает действие, ToolAgent генерирует подсказки, Supervisor может переопределить, окружающая среда делает шаг, и ActionAgent обновляет Q-значения. Награды и шаги собираются и отображаются для анализа прогресса.
def train_multi_agent(episodes=5, visualize=True):
env = GridWorld(size=8)
action_agent = ActionAgent()
tool_agent = ToolAgent()
supervisor = SupervisorAgent()
episode_rewards = []
episode_steps = []
for episode in range(episodes):
state = env.reset()
total_reward = 0
done = False
history = []
print(f"\n{'='*60}")
print(f"EPISODE {episode + 1}/{episodes}")
print(f"{'='*60}")
while not done:
action_result = action_agent.choose_action(state)
if action_result is None:
break
proposed_action, action_reasoning = action_result
suggestions = tool_agent.analyze(state, proposed_action, total_reward, history)
final_action, supervisor_reasoning = supervisor.decide(state, proposed_action, suggestions)
if final_action is None:
break
next_state, reward, done, info = env.step(final_action)
total_reward += reward
action_agent.learn(state, final_action, reward, next_state)
history.append((state, final_action, reward, next_state))
if visualize:
clear_output(wait=True)
thoughts = (f"Action Agent: {action_reasoning}\n"
f"Supervisor: {supervisor_reasoning}\n"
f"Tool Agent: {', '.join(suggestions[:2]) if suggestions else 'No suggestions'}\n"
f"Reward: {reward:.2f} | Total: {total_reward:.2f}")
env.render(thoughts)
time.sleep(0.3)
state = next_state
episode_rewards.append(total_reward)
episode_steps.append(env.step_count)
print(f"\nEpisode {episode+1} Complete!")
print(f"Total Reward: {total_reward:.2f}")
print(f"Steps Taken: {env.step_count}")
print(f"Cells Visited: {len(env.visited)}/{env.size**2}")
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(episode_rewards, marker='o')
plt.title('Episode Rewards')
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.grid(True, alpha=0.3)
plt.subplot(1, 2, 2)
plt.plot(episode_steps, marker='s', color='orange')
plt.title('Episode Steps')
plt.xlabel('Episode')
plt.ylabel('Steps to Complete')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
return action_agent, tool_agent, supervisor
if __name__ == "__main__":
print(" Multi-Agent RL System: Grid World Navigation")
print("=" * 60)
print("Components:")
print(" • Action Agent: Proposes actions using Q-learning")
print(" • Tool Agent: Analyzes performance and suggests improvements")
print(" • Supervisor Agent: Makes final decisions")
print("=" * 60)
trained_agents = train_multi_agent(episodes=5, visualize=True)Эта реализация показывает, как простые эвристики, локальный анализ и супервизор могут объединяться с Q-обучающимся агентом, создавая согласованное и улучшающееся поведение в небольшой сетке. Визуализация и графики помогают отслеживать прогресс по эпизодам и экспериментировать с параметрами.
Switch Language
Read this article in English