<RETURN_TO_BASE

Mastering Scalable Multi-Agent Communication with ACP and Google's Gemini API

Explore building a scalable multi-agent communication system using ACP and Google's Gemini API with detailed Python code examples and practical scenarios.

Implementing Agent Communication Protocol (ACP) in Python

This tutorial guides you through building a scalable and flexible multi-agent communication system compliant with the Agent Communication Protocol (ACP) using Python and Google’s Gemini API for natural language processing.

Setting Up the Environment

Begin by installing and configuring the google-generativeai library with your Gemini API key to enable interaction with the Gemini language model.

import google.generativeai as genai
GEMINI_API_KEY = "Use Your Gemini API Key"
genai.configure(api_key=GEMINI_API_KEY)

Essential Python modules such as json, time, uuid, enum, and dataclasses are used to handle message serialization, timing, unique identification, and structured data.

Defining Core Message Types and Performatives

The ACPMessageType enumeration standardizes message categories like requests, responses, queries, and error handling, ensuring consistent communication.

class ACPMessageType(Enum):
    REQUEST = "request"
    RESPONSE = "response"
    INFORM = "inform"
    QUERY = "query"
    SUBSCRIBE = "subscribe"
    UNSUBSCRIBE = "unsubscribe"
    ERROR = "error"
    ACK = "acknowledge"

The ACPPerformative enumeration specifies speech acts such as telling, asking, requesting actions, agreeing, or refusing, enabling semantic clarity in agent interactions.

class ACPPerformative(Enum):
    TELL = "tell"
    ASK = "ask"
    REPLY = "reply"
    REQUEST_ACTION = "request-action"
    AGREE = "agree"
    REFUSE = "refuse"
    PROPOSE = "propose"
    ACCEPT = "accept"
    REJECT = "reject"

Structuring ACP Messages

The ACPMessage data class encapsulates all necessary fields for a standardized message exchange including IDs, sender/receiver info, performative, content, protocol version, conversation tracking, and timestamps.

@dataclass
class ACPMessage:
    message_id: str
    sender: str
    receiver: str
    performative: str
    content: Dict[str, Any]
    protocol: str = "ACP-1.0"
    conversation_id: str = None
    reply_to: str = None
    language: str = "english"
    encoding: str = "json"
    timestamp: float = None
 
    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = time.time()
        if self.conversation_id is None:
            self.conversation_id = str(uuid.uuid4())
 
    def to_acp_format(self) -> str:
        acp_msg = {
            "message-id": self.message_id,
            "sender": self.sender,
            "receiver": self.receiver,
            "performative": self.performative,
            "content": self.content,
            "protocol": self.protocol,
            "conversation-id": self.conversation_id,
            "reply-to": self.reply_to,
            "language": self.language,
            "encoding": self.encoding,
            "timestamp": self.timestamp
        }
        return json.dumps(acp_msg, indent=2)
 
    @classmethod
    def from_acp_format(cls, acp_string: str) -> 'ACPMessage':
        data = json.loads(acp_string)
        return cls(
            message_id=data["message-id"],
            sender=data["sender"],
            receiver=data["receiver"],
            performative=data["performative"],
            content=data["content"],
            protocol=data.get("protocol", "ACP-1.0"),
            conversation_id=data.get("conversation-id"),
            reply_to=data.get("reply-to"),
            language=data.get("language", "english"),
            encoding=data.get("encoding", "json"),
            timestamp=data.get("timestamp", time.time())
        )

Defining Agents and Message Handling

The ACPAgent class represents autonomous agents capable of creating, sending, receiving, and processing ACP messages. It manages message queues, subscriptions, and conversations, and integrates with Gemini's language model to process queries and requests.

class ACPAgent:
    def __init__(self, agent_id: str, name: str, capabilities: List[str]):
        self.agent_id = agent_id
        self.name = name
        self.capabilities = capabilities
        self.model = genai.GenerativeModel("gemini-1.5-flash")
        self.message_queue: List[ACPMessage] = []
        self.subscriptions: Dict[str, List[str]] = {}
        self.conversations: Dict[str, List[ACPMessage]] = {}
 
    def create_message(self, receiver: str, performative: str, content: Dict[str, Any], conversation_id: str = None, reply_to: str = None) -> ACPMessage:
        return ACPMessage(
            message_id=str(uuid.uuid4()),
            sender=self.agent_id,
            receiver=receiver,
            performative=performative,
            content=content,
            conversation_id=conversation_id,
            reply_to=reply_to
        )
 
    def send_inform(self, receiver: str, fact: str, data: Any = None) -> ACPMessage:
        content = {"fact": fact, "data": data}
        return self.create_message(receiver, ACPPerformative.TELL.value, content)
 
    def send_query(self, receiver: str, question: str, query_type: str = "yes-no") -> ACPMessage:
        content = {"question": question, "query-type": query_type}
        return self.create_message(receiver, ACPPerformative.ASK.value, content)
 
    def send_request(self, receiver: str, action: str, parameters: Dict = None) -> ACPMessage:
        content = {"action": action, "parameters": parameters or {}}
        return self.create_message(receiver, ACPPerformative.REQUEST_ACTION.value, content)
 
    def send_reply(self, original_msg: ACPMessage, response_data: Any) -> ACPMessage:
        content = {"response": response_data, "original-question": original_msg.content}
        return self.create_message(
            original_msg.sender,
            ACPPerformative.REPLY.value,
            content,
            conversation_id=original_msg.conversation_id,
            reply_to=original_msg.message_id
        )
 
    def process_message(self, message: ACPMessage) -> Optional[ACPMessage]:
        self.message_queue.append(message)
        conv_id = message.conversation_id
        if conv_id not in self.conversations:
            self.conversations[conv_id] = []
        self.conversations[conv_id].append(message)
 
        if message.performative == ACPPerformative.ASK.value:
            return self._handle_query(message)
        elif message.performative == ACPPerformative.REQUEST_ACTION.value:
            return self._handle_request(message)
        elif message.performative == ACPPerformative.TELL.value:
            return self._handle_inform(message)
        return None
 
    def _handle_query(self, message: ACPMessage) -> ACPMessage:
        question = message.content.get("question", "")
        prompt = f"As agent {self.name} with capabilities {self.capabilities}, answer: {question}"
        try:
            response = self.model.generate_content(prompt)
            answer = response.text.strip()
        except:
            answer = "Unable to process query at this time"
        return self.send_reply(message, {"answer": answer, "confidence": 0.8})
 
    def _handle_request(self, message: ACPMessage) -> ACPMessage:
        action = message.content.get("action", "")
        parameters = message.content.get("parameters", {})
        if any(capability in action.lower() for capability in self.capabilities):
            result = f"Executing {action} with parameters {parameters}"
            status = "agreed"
        else:
            result = f"Cannot perform {action} - not in my capabilities"
            status = "refused"
        return self.send_reply(message, {"status": status, "result": result})
 
    def _handle_inform(self, message: ACPMessage) -> Optional[ACPMessage]:
        fact = message.content.get("fact", "")
        print(f"[{self.name}] Received information: {fact}")
        ack_content = {"status": "received", "fact": fact}
        return self.create_message(message.sender, "acknowledge", ack_content, conversation_id=message.conversation_id)

Message Broker for Routing and Delivery

The ACPMessageBroker manages agent registration, message routing, logging, and broadcasting messages to multiple recipients.

class ACPMessageBroker:
    def __init__(self):
        self.agents: Dict[str, ACPAgent] = {}
        self.message_log: List[ACPMessage] = []
        self.routing_table: Dict[str, str] = {}
 
    def register_agent(self, agent: ACPAgent):
        self.agents[agent.agent_id] = agent
        self.routing_table[agent.agent_id] = "local"
        print(f"✓ Registered agent: {agent.name} ({agent.agent_id})")
 
    def route_message(self, message: ACPMessage) -> bool:
        if message.receiver not in self.agents:
            print(f"✗ Receiver {message.receiver} not found")
            return False
        print(f"\n ACP MESSAGE ROUTING:")
        print(f"From: {message.sender} → To: {message.receiver}")
        print(f"Performative: {message.performative}")
        print(f"Content: {json.dumps(message.content, indent=2)}")
        receiver_agent = self.agents[message.receiver]
        response = receiver_agent.process_message(message)
        self.message_log.append(message)
        if response:
            print(f"\n GENERATED RESPONSE:")
            print(f"From: {response.sender} → To: {response.receiver}")
            print(f"Content: {json.dumps(response.content, indent=2)}")
            if response.receiver in self.agents:
                self.agents[response.receiver].process_message(response)
                self.message_log.append(response)
        return True
 
    def broadcast_message(self, message: ACPMessage, recipients: List[str]):
        for recipient in recipients:
            msg_copy = ACPMessage(
                message_id=str(uuid.uuid4()),
                sender=message.sender,
                receiver=recipient,
                performative=message.performative,
                content=message.content.copy(),
                conversation_id=message.conversation_id
            )
            self.route_message(msg_copy)

Demonstration of ACP in Action

A demonstration function initializes agents with different capabilities, registers them with the broker, and simulates scenarios including queries, action requests, and information sharing.

def demonstrate_acp():
    print(" AGENT COMMUNICATION PROTOCOL (ACP) DEMONSTRATION")
    print("=" * 60)
    broker = ACPMessageBroker()
    researcher = ACPAgent("agent-001", "Dr. Research", ["analysis", "research", "data-processing"])
    assistant = ACPAgent("agent-002", "AI Assistant", ["information", "scheduling", "communication"])
    calculator = ACPAgent("agent-003", "MathBot", ["calculation", "mathematics", "computation"])
    broker.register_agent(researcher)
    broker.register_agent(assistant)
    broker.register_agent(calculator)
    print(f"\n REGISTERED AGENTS:")
    for agent_id, agent in broker.agents.items():
        print(f"  • {agent.name} ({agent_id}): {', '.join(agent.capabilities)}")
    print(f"\n SCENARIO 1: Information Query (ASK performative)")
    query_msg = assistant.send_query("agent-001", "What are the key factors in AI research?")
    broker.route_message(query_msg)
    print(f"\n SCENARIO 2: Action Request (REQUEST-ACTION performative)")
    calc_request = researcher.send_request("agent-003", "calculate", {"expression": "sqrt(144) + 10"})
    broker.route_message(calc_request)
    print(f"\n SCENARIO 3: Information Sharing (TELL performative)")
    info_msg = researcher.send_inform("agent-002", "New research paper published on quantum computing")
    broker.route_message(info_msg)
    print(f"\n PROTOCOL STATISTICS:")
    print(f"  • Total messages processed: {len(broker.message_log)}")
    print(f"  • Active conversations: {len(set(msg.conversation_id for msg in broker.message_log))}")
    print(f"  • Message types used: {len(set(msg.performative for msg in broker.message_log))}")
    print(f"\n SAMPLE ACP MESSAGE FORMAT:")
    sample_msg = assistant.send_query("agent-001", "Sample question for format demonstration")
    print(sample_msg.to_acp_format())

Quick Start Guide for Google Colab

Instructions to obtain a Gemini API key, set it up, and run the demonstration are provided, along with a summary of protocol features and code snippets for extending the framework.

# Create custom agent
my_agent = ACPAgent("my-001", "CustomBot", ["custom-capability"])
broker.register_agent(my_agent)
 
# Send custom message
msg = my_agent.send_query("agent-001", "Your question here")
broker.route_message(msg)

This comprehensive tutorial equips developers to build scalable multi-agent systems capable of sophisticated communication, collaboration, and task execution using ACP and Google’s Gemini API.

🇷🇺

Сменить язык

Читать эту статью на русском

Переключить на Русский