<НА ГЛАВНУЮ

Осваиваем масштабируемую многoагентную коммуникацию с ACP и Google Gemini API

Подробное руководство по созданию масштабируемой многoагентной системы коммуникации с использованием ACP и Google Gemini API с примерами на Python и практическими сценариями.

Реализация протокола агентского общения (ACP) на Python

В этом руководстве показано, как создать масштабируемую и гибкую систему коммуникации многoагентных систем, совместимую с протоколом агентского общения (ACP), используя Python и API Google Gemini для обработки естественного языка.

Настройка окружения

Начните с установки и конфигурации библиотеки google-generativeai с вашим ключом API Gemini для взаимодействия с языковой моделью Gemini.

import google.generativeai as genai
GEMINI_API_KEY = "Use Your Gemini API Key"
genai.configure(api_key=GEMINI_API_KEY)

Для поддержки структурированной реализации ACP используются стандартные модули Python, такие как json, time, uuid, enum и dataclasses.

Основные типы сообщений и перформативы

Перечисление ACPMessageType стандартизирует категории сообщений, такие как запросы, ответы, запросы информации и обработка ошибок, что обеспечивает последовательность коммуникации.

class ACPMessageType(Enum):
    REQUEST = "request"
    RESPONSE = "response"
    INFORM = "inform"
    QUERY = "query"
    SUBSCRIBE = "subscribe"
    UNSUBSCRIBE = "unsubscribe"
    ERROR = "error"
    ACK = "acknowledge"

Перечисление ACPPerformative описывает речевые действия агентов — информирование, вопросы, запросы действий, согласия и отказы, что обеспечивает семантическую ясность взаимодействия.

class ACPPerformative(Enum):
    TELL = "tell"
    ASK = "ask"
    REPLY = "reply"
    REQUEST_ACTION = "request-action"
    AGREE = "agree"
    REFUSE = "refuse"
    PROPOSE = "propose"
    ACCEPT = "accept"
    REJECT = "reject"

Структура сообщений ACP

Класс данных ACPMessage содержит все необходимые поля для стандартизированного обмена сообщениями: идентификаторы, отправитель и получатель, перформатив, содержимое, версию протокола, отслеживание беседы и метки времени.

@dataclass
class ACPMessage:
    message_id: str
    sender: str
    receiver: str
    performative: str
    content: Dict[str, Any]
    protocol: str = "ACP-1.0"
    conversation_id: str = None
    reply_to: str = None
    language: str = "english"
    encoding: str = "json"
    timestamp: float = None
 
    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = time.time()
        if self.conversation_id is None:
            self.conversation_id = str(uuid.uuid4())
 
    def to_acp_format(self) -> str:
        acp_msg = {
            "message-id": self.message_id,
            "sender": self.sender,
            "receiver": self.receiver,
            "performative": self.performative,
            "content": self.content,
            "protocol": self.protocol,
            "conversation-id": self.conversation_id,
            "reply-to": self.reply_to,
            "language": self.language,
            "encoding": self.encoding,
            "timestamp": self.timestamp
        }
        return json.dumps(acp_msg, indent=2)
 
    @classmethod
    def from_acp_format(cls, acp_string: str) -> 'ACPMessage':
        data = json.loads(acp_string)
        return cls(
            message_id=data["message-id"],
            sender=data["sender"],
            receiver=data["receiver"],
            performative=data["performative"],
            content=data["content"],
            protocol=data.get("protocol", "ACP-1.0"),
            conversation_id=data.get("conversation-id"),
            reply_to=data.get("reply-to"),
            language=data.get("language", "english"),
            encoding=data.get("encoding", "json"),
            timestamp=data.get("timestamp", time.time())
        )

Определение агентов и обработка сообщений

Класс ACPAgent описывает автономных агентов, способных создавать, отправлять, принимать и обрабатывать сообщения ACP. Агент управляет очередью сообщений, подписками и историей бесед, используя модель Gemini для обработки запросов и действий.

class ACPAgent:
    def __init__(self, agent_id: str, name: str, capabilities: List[str]):
        self.agent_id = agent_id
        self.name = name
        self.capabilities = capabilities
        self.model = genai.GenerativeModel("gemini-1.5-flash")
        self.message_queue: List[ACPMessage] = []
        self.subscriptions: Dict[str, List[str]] = {}
        self.conversations: Dict[str, List[ACPMessage]] = {}
 
    def create_message(self, receiver: str, performative: str, content: Dict[str, Any], conversation_id: str = None, reply_to: str = None) -> ACPMessage:
        return ACPMessage(
            message_id=str(uuid.uuid4()),
            sender=self.agent_id,
            receiver=receiver,
            performative=performative,
            content=content,
            conversation_id=conversation_id,
            reply_to=reply_to
        )
 
    def send_inform(self, receiver: str, fact: str, data: Any = None) -> ACPMessage:
        content = {"fact": fact, "data": data}
        return self.create_message(receiver, ACPPerformative.TELL.value, content)
 
    def send_query(self, receiver: str, question: str, query_type: str = "yes-no") -> ACPMessage:
        content = {"question": question, "query-type": query_type}
        return self.create_message(receiver, ACPPerformative.ASK.value, content)
 
    def send_request(self, receiver: str, action: str, parameters: Dict = None) -> ACPMessage:
        content = {"action": action, "parameters": parameters or {}}
        return self.create_message(receiver, ACPPerformative.REQUEST_ACTION.value, content)
 
    def send_reply(self, original_msg: ACPMessage, response_data: Any) -> ACPMessage:
        content = {"response": response_data, "original-question": original_msg.content}
        return self.create_message(
            original_msg.sender,
            ACPPerformative.REPLY.value,
            content,
            conversation_id=original_msg.conversation_id,
            reply_to=original_msg.message_id
        )
 
    def process_message(self, message: ACPMessage) -> Optional[ACPMessage]:
        self.message_queue.append(message)
        conv_id = message.conversation_id
        if conv_id not in self.conversations:
            self.conversations[conv_id] = []
        self.conversations[conv_id].append(message)
 
        if message.performative == ACPPerformative.ASK.value:
            return self._handle_query(message)
        elif message.performative == ACPPerformative.REQUEST_ACTION.value:
            return self._handle_request(message)
        elif message.performative == ACPPerformative.TELL.value:
            return self._handle_inform(message)
        return None
 
    def _handle_query(self, message: ACPMessage) -> ACPMessage:
        question = message.content.get("question", "")
        prompt = f"Как агент {self.name} с возможностями {self.capabilities}, ответьте на: {question}"
        try:
            response = self.model.generate_content(prompt)
            answer = response.text.strip()
        except:
            answer = "Не удалось обработать запрос в данный момент"
        return self.send_reply(message, {"answer": answer, "confidence": 0.8})
 
    def _handle_request(self, message: ACPMessage) -> ACPMessage:
        action = message.content.get("action", "")
        parameters = message.content.get("parameters", {})
        if any(capability in action.lower() for capability in self.capabilities):
            result = f"Выполнение {action} с параметрами {parameters}"
            status = "agreed"
        else:
            result = f"Не могу выполнить {action} - нет в моих возможностях"
            status = "refused"
        return self.send_reply(message, {"status": status, "result": result})
 
    def _handle_inform(self, message: ACPMessage) -> Optional[ACPMessage]:
        fact = message.content.get("fact", "")
        print(f"[{self.name}] Получена информация: {fact}")
        ack_content = {"status": "получено", "fact": fact}
        return self.create_message(message.sender, "acknowledge", ack_content, conversation_id=message.conversation_id)

Брокер сообщений для маршрутизации и доставки

Класс ACPMessageBroker управляет регистрацией агентов, маршрутизацией сообщений, ведением журнала и рассылкой сообщений нескольким получателям.

class ACPMessageBroker:
    def __init__(self):
        self.agents: Dict[str, ACPAgent] = {}
        self.message_log: List[ACPMessage] = []
        self.routing_table: Dict[str, str] = {}
 
    def register_agent(self, agent: ACPAgent):
        self.agents[agent.agent_id] = agent
        self.routing_table[agent.agent_id] = "local"
        print(f"✓ Зарегистрирован агент: {agent.name} ({agent.agent_id})")
 
    def route_message(self, message: ACPMessage) -> bool:
        if message.receiver not in self.agents:
            print(f"✗ Получатель {message.receiver} не найден")
            return False
        print(f"\n МАРШРУТИЗАЦИЯ СОБЩЕНИЯ ACP:")
        print(f"От: {message.sender} → Кому: {message.receiver}")
        print(f"Перформатив: {message.performative}")
        print(f"Содержимое: {json.dumps(message.content, indent=2)}")
        receiver_agent = self.agents[message.receiver]
        response = receiver_agent.process_message(message)
        self.message_log.append(message)
        if response:
            print(f"\n СГЕНЕРИРОВАН ОТВЕТ:")
            print(f"От: {response.sender} → Кому: {response.receiver}")
            print(f"Содержимое: {json.dumps(response.content, indent=2)}")
            if response.receiver in self.agents:
                self.agents[response.receiver].process_message(response)
                self.message_log.append(response)
        return True
 
    def broadcast_message(self, message: ACPMessage, recipients: List[str]):
        for recipient in recipients:
            msg_copy = ACPMessage(
                message_id=str(uuid.uuid4()),
                sender=message.sender,
                receiver=recipient,
                performative=message.performative,
                content=message.content.copy(),
                conversation_id=message.conversation_id
            )
            self.route_message(msg_copy)

Демонстрация работы ACP

Функция демонстрации инициализирует агентов с разными возможностями, регистрирует их и моделирует сценарии: запрос информации, запрос действия и обмен информацией.

def demonstrate_acp():
    print(" ДЕМОНСТРАЦИЯ ПРОТОКОЛА АГЕНТСКОГО ОБЩЕНИЯ (ACP)")
    print("=" * 60)
    broker = ACPMessageBroker()
    researcher = ACPAgent("agent-001", "Dr. Research", ["analysis", "research", "data-processing"])
    assistant = ACPAgent("agent-002", "AI Assistant", ["information", "scheduling", "communication"])
    calculator = ACPAgent("agent-003", "MathBot", ["calculation", "mathematics", "computation"])
    broker.register_agent(researcher)
    broker.register_agent(assistant)
    broker.register_agent(calculator)
    print(f"\n ЗАРЕГИСТРИРОВАННЫЕ АГЕНТЫ:")
    for agent_id, agent in broker.agents.items():
        print(f"  • {agent.name} ({agent_id}): {', '.join(agent.capabilities)}")
    print(f"\n СЦЕНАРИЙ 1: Запрос информации (перформатив ASK)")
    query_msg = assistant.send_query("agent-001", "Каковы ключевые факторы в исследовании ИИ?")
    broker.route_message(query_msg)
    print(f"\n СЦЕНАРИЙ 2: Запрос действия (перформатив REQUEST-ACTION)")
    calc_request = researcher.send_request("agent-003", "calculate", {"expression": "sqrt(144) + 10"})
    broker.route_message(calc_request)
    print(f"\n СЦЕНАРИЙ 3: Обмен информацией (перформатив TELL)")
    info_msg = researcher.send_inform("agent-002", "Опубликована новая статья по квантовым вычислениям")
    broker.route_message(info_msg)
    print(f"\n СТАТИСТИКА ПРОТОКОЛА:")
    print(f"  • Всего обработанных сообщений: {len(broker.message_log)}")
    print(f"  • Активных бесед: {len(set(msg.conversation_id for msg in broker.message_log))}")
    print(f"  • Использованных типов сообщений: {len(set(msg.performative for msg in broker.message_log))}")
    print(f"\n ПРИМЕР ФОРМАТА СОБЩЕНИЯ ACP:")
    sample_msg = assistant.send_query("agent-001", "Пример вопроса для демонстрации формата")
    print(sample_msg.to_acp_format())

Быстрый старт в Google Colab

Инструкции по получению ключа Gemini API, его настройке и запуску демонстрации, а также краткое описание возможностей протокола и пример кода для расширения функционала.

# Создание кастомного агента
my_agent = ACPAgent("my-001", "CustomBot", ["custom-capability"])
broker.register_agent(my_agent)
 
# Отправка кастомного сообщения
msg = my_agent.send_query("agent-001", "Ваш вопрос здесь")
broker.route_message(msg)

Это подробное руководство поможет разработчикам создавать масштабируемые многoагентные системы с продвинутой коммуникацией, сотрудничеством и выполнением задач с помощью ACP и Google Gemini API.

🇬🇧

Switch Language

Read this article in English

Switch to English