Осваиваем масштабируемую многoагентную коммуникацию с ACP и Google Gemini API
Подробное руководство по созданию масштабируемой многoагентной системы коммуникации с использованием ACP и Google Gemini API с примерами на Python и практическими сценариями.
Реализация протокола агентского общения (ACP) на Python
В этом руководстве показано, как создать масштабируемую и гибкую систему коммуникации многoагентных систем, совместимую с протоколом агентского общения (ACP), используя Python и API Google Gemini для обработки естественного языка.
Настройка окружения
Начните с установки и конфигурации библиотеки google-generativeai с вашим ключом API Gemini для взаимодействия с языковой моделью Gemini.
import google.generativeai as genai
GEMINI_API_KEY = "Use Your Gemini API Key"
genai.configure(api_key=GEMINI_API_KEY)Для поддержки структурированной реализации ACP используются стандартные модули Python, такие как json, time, uuid, enum и dataclasses.
Основные типы сообщений и перформативы
Перечисление ACPMessageType стандартизирует категории сообщений, такие как запросы, ответы, запросы информации и обработка ошибок, что обеспечивает последовательность коммуникации.
class ACPMessageType(Enum):
REQUEST = "request"
RESPONSE = "response"
INFORM = "inform"
QUERY = "query"
SUBSCRIBE = "subscribe"
UNSUBSCRIBE = "unsubscribe"
ERROR = "error"
ACK = "acknowledge"Перечисление ACPPerformative описывает речевые действия агентов — информирование, вопросы, запросы действий, согласия и отказы, что обеспечивает семантическую ясность взаимодействия.
class ACPPerformative(Enum):
TELL = "tell"
ASK = "ask"
REPLY = "reply"
REQUEST_ACTION = "request-action"
AGREE = "agree"
REFUSE = "refuse"
PROPOSE = "propose"
ACCEPT = "accept"
REJECT = "reject"Структура сообщений ACP
Класс данных ACPMessage содержит все необходимые поля для стандартизированного обмена сообщениями: идентификаторы, отправитель и получатель, перформатив, содержимое, версию протокола, отслеживание беседы и метки времени.
@dataclass
class ACPMessage:
message_id: str
sender: str
receiver: str
performative: str
content: Dict[str, Any]
protocol: str = "ACP-1.0"
conversation_id: str = None
reply_to: str = None
language: str = "english"
encoding: str = "json"
timestamp: float = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = time.time()
if self.conversation_id is None:
self.conversation_id = str(uuid.uuid4())
def to_acp_format(self) -> str:
acp_msg = {
"message-id": self.message_id,
"sender": self.sender,
"receiver": self.receiver,
"performative": self.performative,
"content": self.content,
"protocol": self.protocol,
"conversation-id": self.conversation_id,
"reply-to": self.reply_to,
"language": self.language,
"encoding": self.encoding,
"timestamp": self.timestamp
}
return json.dumps(acp_msg, indent=2)
@classmethod
def from_acp_format(cls, acp_string: str) -> 'ACPMessage':
data = json.loads(acp_string)
return cls(
message_id=data["message-id"],
sender=data["sender"],
receiver=data["receiver"],
performative=data["performative"],
content=data["content"],
protocol=data.get("protocol", "ACP-1.0"),
conversation_id=data.get("conversation-id"),
reply_to=data.get("reply-to"),
language=data.get("language", "english"),
encoding=data.get("encoding", "json"),
timestamp=data.get("timestamp", time.time())
)Определение агентов и обработка сообщений
Класс ACPAgent описывает автономных агентов, способных создавать, отправлять, принимать и обрабатывать сообщения ACP. Агент управляет очередью сообщений, подписками и историей бесед, используя модель Gemini для обработки запросов и действий.
class ACPAgent:
def __init__(self, agent_id: str, name: str, capabilities: List[str]):
self.agent_id = agent_id
self.name = name
self.capabilities = capabilities
self.model = genai.GenerativeModel("gemini-1.5-flash")
self.message_queue: List[ACPMessage] = []
self.subscriptions: Dict[str, List[str]] = {}
self.conversations: Dict[str, List[ACPMessage]] = {}
def create_message(self, receiver: str, performative: str, content: Dict[str, Any], conversation_id: str = None, reply_to: str = None) -> ACPMessage:
return ACPMessage(
message_id=str(uuid.uuid4()),
sender=self.agent_id,
receiver=receiver,
performative=performative,
content=content,
conversation_id=conversation_id,
reply_to=reply_to
)
def send_inform(self, receiver: str, fact: str, data: Any = None) -> ACPMessage:
content = {"fact": fact, "data": data}
return self.create_message(receiver, ACPPerformative.TELL.value, content)
def send_query(self, receiver: str, question: str, query_type: str = "yes-no") -> ACPMessage:
content = {"question": question, "query-type": query_type}
return self.create_message(receiver, ACPPerformative.ASK.value, content)
def send_request(self, receiver: str, action: str, parameters: Dict = None) -> ACPMessage:
content = {"action": action, "parameters": parameters or {}}
return self.create_message(receiver, ACPPerformative.REQUEST_ACTION.value, content)
def send_reply(self, original_msg: ACPMessage, response_data: Any) -> ACPMessage:
content = {"response": response_data, "original-question": original_msg.content}
return self.create_message(
original_msg.sender,
ACPPerformative.REPLY.value,
content,
conversation_id=original_msg.conversation_id,
reply_to=original_msg.message_id
)
def process_message(self, message: ACPMessage) -> Optional[ACPMessage]:
self.message_queue.append(message)
conv_id = message.conversation_id
if conv_id not in self.conversations:
self.conversations[conv_id] = []
self.conversations[conv_id].append(message)
if message.performative == ACPPerformative.ASK.value:
return self._handle_query(message)
elif message.performative == ACPPerformative.REQUEST_ACTION.value:
return self._handle_request(message)
elif message.performative == ACPPerformative.TELL.value:
return self._handle_inform(message)
return None
def _handle_query(self, message: ACPMessage) -> ACPMessage:
question = message.content.get("question", "")
prompt = f"Как агент {self.name} с возможностями {self.capabilities}, ответьте на: {question}"
try:
response = self.model.generate_content(prompt)
answer = response.text.strip()
except:
answer = "Не удалось обработать запрос в данный момент"
return self.send_reply(message, {"answer": answer, "confidence": 0.8})
def _handle_request(self, message: ACPMessage) -> ACPMessage:
action = message.content.get("action", "")
parameters = message.content.get("parameters", {})
if any(capability in action.lower() for capability in self.capabilities):
result = f"Выполнение {action} с параметрами {parameters}"
status = "agreed"
else:
result = f"Не могу выполнить {action} - нет в моих возможностях"
status = "refused"
return self.send_reply(message, {"status": status, "result": result})
def _handle_inform(self, message: ACPMessage) -> Optional[ACPMessage]:
fact = message.content.get("fact", "")
print(f"[{self.name}] Получена информация: {fact}")
ack_content = {"status": "получено", "fact": fact}
return self.create_message(message.sender, "acknowledge", ack_content, conversation_id=message.conversation_id)Брокер сообщений для маршрутизации и доставки
Класс ACPMessageBroker управляет регистрацией агентов, маршрутизацией сообщений, ведением журнала и рассылкой сообщений нескольким получателям.
class ACPMessageBroker:
def __init__(self):
self.agents: Dict[str, ACPAgent] = {}
self.message_log: List[ACPMessage] = []
self.routing_table: Dict[str, str] = {}
def register_agent(self, agent: ACPAgent):
self.agents[agent.agent_id] = agent
self.routing_table[agent.agent_id] = "local"
print(f"✓ Зарегистрирован агент: {agent.name} ({agent.agent_id})")
def route_message(self, message: ACPMessage) -> bool:
if message.receiver not in self.agents:
print(f"✗ Получатель {message.receiver} не найден")
return False
print(f"\n МАРШРУТИЗАЦИЯ СОБЩЕНИЯ ACP:")
print(f"От: {message.sender} → Кому: {message.receiver}")
print(f"Перформатив: {message.performative}")
print(f"Содержимое: {json.dumps(message.content, indent=2)}")
receiver_agent = self.agents[message.receiver]
response = receiver_agent.process_message(message)
self.message_log.append(message)
if response:
print(f"\n СГЕНЕРИРОВАН ОТВЕТ:")
print(f"От: {response.sender} → Кому: {response.receiver}")
print(f"Содержимое: {json.dumps(response.content, indent=2)}")
if response.receiver in self.agents:
self.agents[response.receiver].process_message(response)
self.message_log.append(response)
return True
def broadcast_message(self, message: ACPMessage, recipients: List[str]):
for recipient in recipients:
msg_copy = ACPMessage(
message_id=str(uuid.uuid4()),
sender=message.sender,
receiver=recipient,
performative=message.performative,
content=message.content.copy(),
conversation_id=message.conversation_id
)
self.route_message(msg_copy)Демонстрация работы ACP
Функция демонстрации инициализирует агентов с разными возможностями, регистрирует их и моделирует сценарии: запрос информации, запрос действия и обмен информацией.
def demonstrate_acp():
print(" ДЕМОНСТРАЦИЯ ПРОТОКОЛА АГЕНТСКОГО ОБЩЕНИЯ (ACP)")
print("=" * 60)
broker = ACPMessageBroker()
researcher = ACPAgent("agent-001", "Dr. Research", ["analysis", "research", "data-processing"])
assistant = ACPAgent("agent-002", "AI Assistant", ["information", "scheduling", "communication"])
calculator = ACPAgent("agent-003", "MathBot", ["calculation", "mathematics", "computation"])
broker.register_agent(researcher)
broker.register_agent(assistant)
broker.register_agent(calculator)
print(f"\n ЗАРЕГИСТРИРОВАННЫЕ АГЕНТЫ:")
for agent_id, agent in broker.agents.items():
print(f" • {agent.name} ({agent_id}): {', '.join(agent.capabilities)}")
print(f"\n СЦЕНАРИЙ 1: Запрос информации (перформатив ASK)")
query_msg = assistant.send_query("agent-001", "Каковы ключевые факторы в исследовании ИИ?")
broker.route_message(query_msg)
print(f"\n СЦЕНАРИЙ 2: Запрос действия (перформатив REQUEST-ACTION)")
calc_request = researcher.send_request("agent-003", "calculate", {"expression": "sqrt(144) + 10"})
broker.route_message(calc_request)
print(f"\n СЦЕНАРИЙ 3: Обмен информацией (перформатив TELL)")
info_msg = researcher.send_inform("agent-002", "Опубликована новая статья по квантовым вычислениям")
broker.route_message(info_msg)
print(f"\n СТАТИСТИКА ПРОТОКОЛА:")
print(f" • Всего обработанных сообщений: {len(broker.message_log)}")
print(f" • Активных бесед: {len(set(msg.conversation_id for msg in broker.message_log))}")
print(f" • Использованных типов сообщений: {len(set(msg.performative for msg in broker.message_log))}")
print(f"\n ПРИМЕР ФОРМАТА СОБЩЕНИЯ ACP:")
sample_msg = assistant.send_query("agent-001", "Пример вопроса для демонстрации формата")
print(sample_msg.to_acp_format())Быстрый старт в Google Colab
Инструкции по получению ключа Gemini API, его настройке и запуску демонстрации, а также краткое описание возможностей протокола и пример кода для расширения функционала.
# Создание кастомного агента
my_agent = ACPAgent("my-001", "CustomBot", ["custom-capability"])
broker.register_agent(my_agent)
# Отправка кастомного сообщения
msg = my_agent.send_query("agent-001", "Ваш вопрос здесь")
broker.route_message(msg)Это подробное руководство поможет разработчикам создавать масштабируемые многoагентные системы с продвинутой коммуникацией, сотрудничеством и выполнением задач с помощью ACP и Google Gemini API.
Switch Language
Read this article in English