Интеллект роя: Создание MCP-агентов с контекстной осведомлённостью и интеграцией Gemini
Интеллект роя: продвинутые MCP-агенты
Краткий обзор
В этом руководстве показано, как создать продвинутую систему MCP (Model Context Protocol) агентов, совместимую с Jupyter и Google Colab. Основные акценты — координация нескольких агентов, контекстная осведомлённость, управление памятью и динамическое использование инструментов. Каждый агент выполняет специализированную роль — координатор, исследователь, аналитик или исполнитель — и вместе они образуют рой, способный решать сложные задачи.
Основные импорты и проверка Gemini
Сначала импортируем стандартные библиотеки Python и проверим наличие клиента Google Gemini. Ниже пример кода, который настраивает логирование и флаг доступности Gemini.
import json
import logging
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
from datetime import datetime
import time
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
try:
import google.generativeai as genai
GEMINI_AVAILABLE = True
except ImportError:
print(" google-generativeai not installed. Run: pip install google-generativeai")
GEMINI_AVAILABLE = False
Если Gemini не установлен, система работает в режиме демонстрации с имитированными ответами.
Строительные блоки агентов
Определяем роли агентов, структуру сообщений и контекст агента. Эти классы формализуют обязанности и память диалогов для каждого агента.
class AgentRole(Enum):
COORDINATOR = "coordinator"
RESEARCHER = "researcher"
ANALYZER = "analyzer"
EXECUTOR = "executor"
@dataclass
class Message:
role: str
content: str
timestamp: datetime
metadata: Dict[str, Any] = None
@dataclass
class AgentContext:
agent_id: str
role: AgentRole
capabilities: List[str]
memory: List[Message]
tools: List[str]
AgentRole задаёт ответственность, Message хранит элементы разговора, а AgentContext фиксирует идентичность, возможности и доступные инструменты агента.
Реализация MCPAgent: агент, учитывающий роль
Класс MCPAgent отвечает за инициализацию, установку возможностей и инструментов, хранение памяти и генерацию ответов. При наличии ключа API он подключает Gemini, иначе использует демонстрационные ответы.
class MCPAgent:
"""Advanced MCP Agent with evolved capabilities - Jupyter Compatible"""
def __init__(self, agent_id: str, role: AgentRole, api_key: str = None):
self.agent_id = agent_id
self.role = role
self.api_key = api_key
self.memory = []
self.context = AgentContext(
agent_id=agent_id,
role=role,
capabilities=self._init_capabilities(),
memory=[],
tools=self._init_tools()
)
self.model = None
if GEMINI_AVAILABLE and api_key:
try:
genai.configure(api_key=api_key)
self.model = genai.GenerativeModel('gemini-pro')
print(f" Agent {agent_id} initialized with Gemini API")
except Exception as e:
print(f" Gemini configuration failed: {e}")
print(" Running in demo mode with simulated responses")
else:
print(f" Agent {agent_id} running in demo mode")
def _init_capabilities(self) -> List[str]:
"""Initialize role-specific capabilities"""
capabilities_map = {
AgentRole.COORDINATOR: ["task_decomposition", "agent_orchestration", "priority_management"],
AgentRole.RESEARCHER: ["data_gathering", "web_search", "information_synthesis"],
AgentRole.ANALYZER: ["pattern_recognition", "data_analysis", "insight_generation"],
AgentRole.EXECUTOR: ["action_execution", "result_validation", "output_formatting"]
}
return capabilities_map.get(self.role, [])
def _init_tools(self) -> List[str]:
"""Initialize available tools based on role"""
tools_map = {
AgentRole.COORDINATOR: ["task_splitter", "agent_selector", "progress_tracker"],
AgentRole.RESEARCHER: ["search_engine", "data_extractor", "source_validator"],
AgentRole.ANALYZER: ["statistical_analyzer", "pattern_detector", "visualization_tool"],
AgentRole.EXECUTOR: ["code_executor", "file_handler", "api_caller"]
}
return tools_map.get(self.role, [])
def process_message(self, message: str, context: Optional[Dict] = None) -> Dict[str, Any]:
"""Process incoming message with context awareness - Synchronous version"""
msg = Message(
role="user",
content=message,
timestamp=datetime.now(),
metadata=context
)
self.memory.append(msg)
prompt = self._generate_contextual_prompt(message, context)
try:
if self.model:
response = self._generate_response_gemini(prompt)
else:
response = self._generate_demo_response(message)
response_msg = Message(
role="assistant",
content=response,
timestamp=datetime.now(),
metadata={"agent_id": self.agent_id, "role": self.role.value}
)
self.memory.append(response_msg)
return {
"agent_id": self.agent_id,
"role": self.role.value,
"response": response,
"capabilities_used": self._analyze_capabilities_used(message),
"next_actions": self._suggest_next_actions(response),
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error processing message: {e}")
return {"error": str(e)}
def _generate_response_gemini(self, prompt: str) -> str:
"""Generate response using Gemini API - Synchronous"""
try:
response = self.model.generate_content(prompt)
return response.text
except Exception as e:
logger.error(f"Gemini API error: {e}")
return self._generate_demo_response(prompt)
def _generate_demo_response(self, message: str) -> str:
"""Generate simulated response for demo purposes"""
role_responses = {
AgentRole.COORDINATOR: f"As coordinator, I'll break down the task: '{message[:50]}...' into manageable components and assign them to specialized agents.",
AgentRole.RESEARCHER: f"I'll research information about: '{message[:50]}...' using my data gathering and synthesis capabilities.",
AgentRole.ANALYZER: f"Analyzing the patterns and insights from: '{message[:50]}...' to provide data-driven recommendations.",
AgentRole.EXECUTOR: f"I'll execute the necessary actions for: '{message[:50]}...' and validate the results."
}
base_response = role_responses.get(self.role, f"Processing: {message[:50]}...")
time.sleep(0.5)
additional_context = {
AgentRole.COORDINATOR: " I've identified 3 key subtasks and will coordinate their execution across the agent team.",
AgentRole.RESEARCHER: " My research indicates several relevant sources and current trends in this area.",
AgentRole.ANALYZER: " The data shows interesting correlations and actionable insights for decision making.",
AgentRole.EXECUTOR: " I've completed the requested actions and verified the outputs meet quality standards."
}
return base_response + additional_context.get(self.role, "")
def _generate_contextual_prompt(self, message: str, context: Optional[Dict]) -> str:
"""Generate context-aware prompt based on agent role"""
base_prompt = f"""
You are an advanced AI agent with the role: {self.role.value}
Your capabilities: {', '.join(self.context.capabilities)}
Available tools: {', '.join(self.context.tools)}
Recent conversation context:
{self._get_recent_context()}
Current request: {message}
"""
role_instructions = {
AgentRole.COORDINATOR: """
Focus on breaking down complex tasks, coordinating with other agents,
and maintaining overall project coherence. Consider dependencies and priorities.
Provide clear task decomposition and agent assignments.
""",
AgentRole.RESEARCHER: """
Prioritize accurate information gathering, source verification,
and comprehensive data collection. Synthesize findings clearly.
Focus on current trends and reliable sources.
""",
AgentRole.ANALYZER: """
Focus on pattern recognition, data interpretation, and insight generation.
Provide evidence-based conclusions and actionable recommendations.
Highlight key correlations and implications.
""",
AgentRole.EXECUTOR: """
Concentrate on practical implementation, result validation,
and clear output delivery. Ensure actions are completed effectively.
Focus on quality and completeness of execution.
"""
}
return base_prompt + role_instructions.get(self.role, "")
def _get_recent_context(self, limit: int = 3) -> str:
"""Get recent conversation context"""
if not self.memory:
return "No previous context"
recent = self.memory[-limit:]
context_str = ""
for msg in recent:
context_str += f"{msg.role}: {msg.content[:100]}...\n"
return context_str
def _analyze_capabilities_used(self, message: str) -> List[str]:
"""Analyze which capabilities were likely used"""
used_capabilities = []
message_lower = message.lower()
capability_keywords = {
"task_decomposition": ["break down", "divide", "split", "decompose"],
"data_gathering": ["research", "find", "collect", "gather"],
"pattern_recognition": ["analyze", "pattern", "trend", "correlation"],
"action_execution": ["execute", "run", "implement", "perform"],
"agent_orchestration": ["coordinate", "manage", "organize", "assign"],
"information_synthesis": ["synthesize", "combine", "merge", "integrate"]
}
for capability, keywords in capability_keywords.items():
if capability in self.context.capabilities:
if any(keyword in message_lower for keyword in keywords):
used_capabilities.append(capability)
return used_capabilities
def _suggest_next_actions(self, response: str) -> List[str]:
"""Suggest logical next actions based on response"""
suggestions = []
response_lower = response.lower()
if "need more information" in response_lower or "research" in response_lower:
suggestions.append("delegate_to_researcher")
if "analyze" in response_lower or "pattern" in response_lower:
suggestions.append("delegate_to_analyzer")
if "implement" in response_lower or "execute" in response_lower:
suggestions.append("delegate_to_executor")
if "coordinate" in response_lower or "manage" in response_lower:
suggestions.append("initiate_multi_agent_collaboration")
if "subtask" in response_lower or "break down" in response_lower:
suggestions.append("task_decomposition_required")
return suggestions if suggestions else ["continue_conversation"]
Координация роя и управление
MCPAgentSwarm создаёт и регистрирует агентов по мере необходимости, разбивает задачу, делегирует части специализированным агентам, собирает результаты и синтезирует итоговый вывод.
class MCPAgentSwarm:
"""Multi-agent coordination system - Jupyter Compatible"""
def __init__(self, api_key: str = None):
self.api_key = api_key
self.agents = {}
self.task_history = []
self.results = {}
def create_agent(self, agent_id: str, role: AgentRole) -> MCPAgent:
"""Create and register a new agent"""
agent = MCPAgent(agent_id, role, self.api_key)
self.agents[agent_id] = agent
print(f" Created agent: {agent_id} with role: {role.value}")
return agent
def coordinate_task(self, task: str) -> Dict[str, Any]:
"""Coordinate complex task across multiple agents - Synchronous"""
print(f"\n Coordinating task: {task}")
print("=" * 60)
if "coordinator" not in self.agents:
self.create_agent("coordinator", AgentRole.COORDINATOR)
coordinator = self.agents["coordinator"]
print("\n Step 1: Task Decomposition")
decomposition = coordinator.process_message(
f"Decompose this complex task into subtasks and identify which specialized agents are needed: {task}"
)
print(f"Coordinator: {decomposition['response']}")
self._ensure_required_agents()
print("\n Step 2: Agent Collaboration")
results = {}
for agent_id, agent in self.agents.items():
if agent_id != "coordinator":
print(f"\n {agent_id.upper()} working...")
result = agent.process_message(
f"Handle your specialized part of this task: {task}\n"
f"Coordinator's guidance: {decomposition['response'][:200]}..."
)
results[agent_id] = result
print(f" {agent_id}: {result['response'][:150]}...")
print("\n Step 3: Final Synthesis")
final_result = coordinator.process_message(
f"Synthesize these agent results into a comprehensive final output for the task '{task}':\n"
f"Results summary: {[f'{k}: {v['response'][:100]}...' for k, v in results.items()]}"
)
print(f"Final Result: {final_result['response']}")
task_record = {
"task": task,
"timestamp": datetime.now().isoformat(),
"decomposition": decomposition,
"agent_results": results,
"final_synthesis": final_result,
"agents_involved": list(self.agents.keys())
}
self.task_history.append(task_record)
return task_record
def _ensure_required_agents(self):
"""Ensure all required agent types exist"""
required_roles = [AgentRole.RESEARCHER, AgentRole.ANALYZER, AgentRole.EXECUTOR]
for role in required_roles:
agent_id = role.value
if agent_id not in self.agents:
self.create_agent(agent_id, role)
def get_swarm_status(self) -> Dict[str, Any]:
"""Get current status of the agent swarm"""
return {
"total_agents": len(self.agents),
"agent_roles": {aid: agent.role.value for aid, agent in self.agents.items()},
"tasks_completed": len(self.task_history),
"last_task": self.task_history[-1]["task"] if self.task_history else "None"
}
Демо, совместимое с ноутбуком
Функция demo_notebook_compatible показывает взаимодействие одного агента, затем координацию нескольких агентов и выводит статус роя. По умолчанию используется DEMO MODE, если не задан API_KEY.
def demo_notebook_compatible():
"""Demonstrate advanced MCP agent capabilities - Notebook Compatible"""
print(" Starting Advanced MCP Agent Tutorial")
print(" Jupyter/Colab Compatible Version")
print("=" * 60)
API_KEY = None # Set to your actual key
if not API_KEY:
print(" Running in DEMO MODE (simulated responses)")
print(" Set API_KEY variable for real Gemini AI responses")
print("-" * 60)
swarm = MCPAgentSwarm(API_KEY)
print("\n Demo 1: Single Agent Interaction")
researcher = swarm.create_agent("research_agent", AgentRole.RESEARCHER)
result = researcher.process_message(
"Research the latest trends in AI agent architectures and multi-agent systems"
)
print(f"\n Researcher Response:")
print(f" {result['response']}")
print(f" Capabilities Used: {result['capabilities_used']}")
print(f" Suggested Next Actions: {result['next_actions']}")
print("\n\n Demo 2: Multi-Agent Coordination")
complex_task = """
Analyze the impact of AI agents on software development productivity.
Include research on current tools, performance metrics, future predictions,
and provide actionable recommendations for development teams.
"""
coordination_result = swarm.coordinate_task(complex_task)
print("\n\n Demo 3: Swarm Status")
status = swarm.get_swarm_status()
print(f" Total Agents: {status['total_agents']}")
print(f" Agent Roles: {status['agent_roles']}")
print(f" Tasks Completed: {status['tasks_completed']}")
print("\n Tutorial Completed Successfully!")
return swarm
def run_demo():
"""Simple function to run the demo"""
return demo_notebook_compatible()
if __name__ == "__main__":
print(" Running MCP Agent Demo...")
swarm = run_demo()
else:
print(" MCP Agent Tutorial loaded!")
print(" Run: swarm = run_demo() to start the demonstration")
Используйте этот код в ноутбуке, чтобы увидеть разбиение задач координатором, сбор контекста исследователями, извлечение инсайтов аналитиками и выполнение действий исполнителями. При наличии и настройке Gemini агенты могут генерировать более насыщенные ответы.
Практические замечания
- Разделяйте ответственность агентов по ролям, чтобы повысить надёжность и предсказуемость.
- Хранение памяти и выборка недавнего контекста поддерживают непрерывность работы агентов.
- Демонстрационная синхронная версия удобна для ноутбуков; для продакшена адаптируйте архитектуру под асинхронность и хранение состояния.
- Интеграция с Gemini необязательна — оставьте устойчивые fallback-режимы для оффлайн-демонстраций.
Реализация даёт практичную основу для оркестрации нескольких агентов и демонстрирует, как декомпозиция, кооперация и синтез результатов работают вместе в среде ноутбука.