Руководство по кодированию по созданию масштабируемых многоагентных систем связи с использованием протокола связи агента (ACP)

В этом уроке мы внедряем протокол связи агента (ACP) путем создания гибкой системы обмена сообщениями ACP в Python, используя Google Gemini API для обработки естественного языка. Начиная с установки и конфигурации библиотеки Google-Generativeai, учебник вводит основные абстракции, типы сообщений, работоспособность и класс данных ACPMessage, который стандартизирует межагентную связь. Определяя классы Acpagent и Acpmessagebroker, руководство демонстрирует, как создавать, отправлять, маршрут и обрабатывать структурированные сообщения среди нескольких автономных агентов. С помощью примеров четкого кода пользователи учатся реализовать запросы, запрашивать действия и информацию о вещании, сохраняя при этом потоки беседы, подтверждения и обработку ошибок.

import google.generativeai as genai
import json
import time
import uuid
from enum import Enum
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict


GEMINI_API_KEY = "Use Your Gemini API Key"
genai.configure(api_key=GEMINI_API_KEY)

Мы импортируем Essential Python модули, начиная от обработки JSON и времени до уникальной генерации и аннотаций идентификатора, для поддержки структурированной реализации ACP. Затем он получает заполнитель API API пользователя и настраивает клиента Google-Generativeai для последующих вызовов к модели языка Gemini.

class ACPMessageType(Enum):
    """Standard ACP message types"""
    REQUEST = "request"
    RESPONSE = "response"
    INFORM = "inform"
    QUERY = "query"
    SUBSCRIBE = "subscribe"
    UNSUBSCRIBE = "unsubscribe"
    ERROR = "error"
    ACK = "acknowledge"

Перечисление ACPMessAgeType определяет основные категории сообщений, используемые в протоколе связи агента, включая запросы, ответы, информационные трансляции, запросы и управление управлением подпиской, сигнализация ошибок и подтверждения. Централизируя эти типы сообщений, протокол обеспечивает последовательную обработку и маршрутизацию межагентных коммуникаций по всей системе.

class ACPPerformative(Enum):
    """ACP speech acts (performatives)"""
    TELL = "tell"
    ASK = "ask"
    REPLY = "reply"
    REQUEST_ACTION = "request-action"
    AGREE = "agree"
    REFUSE = "refuse"
    PROPOSE = "propose"
    ACCEPT = "accept"
    REJECT = "reject"

Акпперформативное перечисление фиксирует разнообразие агентов речевых действий, которые могут использовать при взаимодействии в рамках ACP-структуры, отображая намерения высокого уровня, такие как выполнение запросов, задание вопросов, предоставление команд или переговоры о соглашениях на стандартизированные метки. Эта четкая таксономия позволяет агентам интерпретировать и реагировать на сообщения в контекстном порядке, обеспечивая надежное и семантически богатое общение.

@dataclass
class ACPMessage:
    """Agent Communication Protocol Message Structure"""
    message_id: str
    sender: str
    receiver: str
    performative: str  
    content: Dict(str, Any)
    protocol: str = "ACP-1.0"
    conversation_id: str = None
    reply_to: str = None
    language: str = "english"
    encoding: str = "json"
    timestamp: float = None
   
    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = time.time()
        if self.conversation_id is None:
            self.conversation_id = str(uuid.uuid4())
   
    def to_acp_format(self) -> str:
        """Convert to standard ACP message format"""
        acp_msg = {
            "message-id": self.message_id,
            "sender": self.sender,
            "receiver": self.receiver,
            "performative": self.performative,
            "content": self.content,
            "protocol": self.protocol,
            "conversation-id": self.conversation_id,
            "reply-to": self.reply_to,
            "language": self.language,
            "encoding": self.encoding,
            "timestamp": self.timestamp
        }
        return json.dumps(acp_msg, indent=2)
   
    @classmethod
    def from_acp_format(cls, acp_string: str) -> 'ACPMessage':
        """Parse ACP message from string format"""
        data = json.loads(acp_string)
        return cls(
            message_id=data("message-id"),
            sender=data("sender"),
            receiver=data("receiver"),
            performative=data("performative"),
            content=data("content"),
            protocol=data.get("protocol", "ACP-1.0"),
            conversation_id=data.get("conversation-id"),
            reply_to=data.get("reply-to"),
            language=data.get("language", "english"),
            encoding=data.get("encoding", "json"),
            timestamp=data.get("timestamp", time.time())
        )

Класс данных ACPMessage инкапсулирует все поля, необходимые для структурированного обмена ACP, включая идентификаторы, участники, перформативную, полезную нагрузку и метаданные, такие как версия протокола, язык и временные метки. Его метод __post_init__ Автопуляция пропущенной временной метки и значения разговора_ид, обеспечивая уникальное отслеживание каждого сообщения. Методы полезности TO_ACP_FORMAT и FROM_ACP_FORMAT СЕРИАЛИЗАЦИИ СЕРИАЛИЗАЦИИ В СТАНДАЗИВАНИЕ ИСПОРТАЦИИ для бесшовной передачи и анализа.

class ACPAgent:
    """Agent implementing Agent Communication Protocol"""
   
    def __init__(self, agent_id: str, name: str, capabilities: List(str)):
        self.agent_id = agent_id
        self.name = name
        self.capabilities = capabilities
        self.model = genai.GenerativeModel("gemini-1.5-flash")
        self.message_queue: List(ACPMessage) = ()
        self.subscriptions: Dict(str, List(str)) = {}  
        self.conversations: Dict(str, List(ACPMessage)) = {}
   
    def create_message(self, receiver: str, performative: str,
                      content: Dict(str, Any), conversation_id: str = None,
                      reply_to: str = None) -> ACPMessage:
        """Create a new ACP-compliant message"""
        return ACPMessage(
            message_id=str(uuid.uuid4()),
            sender=self.agent_id,
            receiver=receiver,
            performative=performative,
            content=content,
            conversation_id=conversation_id,
            reply_to=reply_to
        )
   
    def send_inform(self, receiver: str, fact: str, data: Any = None) -> ACPMessage:
        """Send an INFORM message (telling someone a fact)"""
        content = {"fact": fact, "data": data}
        return self.create_message(receiver, ACPPerformative.TELL.value, content)
   
    def send_query(self, receiver: str, question: str, query_type: str = "yes-no") -> ACPMessage:
        """Send a QUERY message (asking for information)"""
        content = {"question": question, "query-type": query_type}
        return self.create_message(receiver, ACPPerformative.ASK.value, content)
   
    def send_request(self, receiver: str, action: str, parameters: Dict = None) -> ACPMessage:
        """Send a REQUEST message (asking someone to perform an action)"""
        content = {"action": action, "parameters": parameters or {}}
        return self.create_message(receiver, ACPPerformative.REQUEST_ACTION.value, content)
   
    def send_reply(self, original_msg: ACPMessage, response_data: Any) -> ACPMessage:
        """Send a REPLY message in response to another message"""
        content = {"response": response_data, "original-question": original_msg.content}
        return self.create_message(
            original_msg.sender,
            ACPPerformative.REPLY.value,
            content,
            conversation_id=original_msg.conversation_id,
            reply_to=original_msg.message_id
        )
   
    def process_message(self, message: ACPMessage) -> Optional(ACPMessage):
        """Process incoming ACP message and generate appropriate response"""
        self.message_queue.append(message)
       
        conv_id = message.conversation_id
        if conv_id not in self.conversations:
            self.conversations(conv_id) = ()
        self.conversations(conv_id).append(message)
       
        if message.performative == ACPPerformative.ASK.value:
            return self._handle_query(message)
        elif message.performative == ACPPerformative.REQUEST_ACTION.value:
            return self._handle_request(message)
        elif message.performative == ACPPerformative.TELL.value:
            return self._handle_inform(message)
       
        return None
   
    def _handle_query(self, message: ACPMessage) -> ACPMessage:
        """Handle incoming query messages"""
        question = message.content.get("question", "")
       
        prompt = f"As agent {self.name} with capabilities {self.capabilities}, answer: {question}"
        try:
            response = self.model.generate_content(prompt)
            answer = response.text.strip()
        except:
            answer = "Unable to process query at this time"
       
        return self.send_reply(message, {"answer": answer, "confidence": 0.8})
   
    def _handle_request(self, message: ACPMessage) -> ACPMessage:
        """Handle incoming action requests"""
        action = message.content.get("action", "")
        parameters = message.content.get("parameters", {})
       
        if any(capability in action.lower() for capability in self.capabilities):
            result = f"Executing {action} with parameters {parameters}"
            status = "agreed"
        else:
            result = f"Cannot perform {action} - not in my capabilities"
            status = "refused"
       
        return self.send_reply(message, {"status": status, "result": result})
   
    def _handle_inform(self, message: ACPMessage) -> Optional(ACPMessage):
        """Handle incoming information messages"""
        fact = message.content.get("fact", "")
        print(f"({self.name}) Received information: {fact}")
       
        ack_content = {"status": "received", "fact": fact}
        return self.create_message(message.sender, "acknowledge", ack_content,
                                 conversation_id=message.conversation_id)

Акпагентный класс инкапсулирует автономную сущность, способную отправлять, получать и обрабатывать сообщения, соответствующие ACP, с использованием языковой модели Gemini. Он управляет своей собственной очередью сообщений, истории разговоров и подпискими и предоставляет вспомогательные методы (send_inform, send_query, send_request, send_reply) для строительства правильно отформатированных экземпляров acpmessage. Входящие сообщения направляются через Process_message, который делегирует специализированные обработчики для запросов, запросов действий и информационных сообщений.

class ACPMessageBroker:
    """Message broker implementing ACP routing and delivery"""
   
    def __init__(self):
        self.agents: Dict(str, ACPAgent) = {}
        self.message_log: List(ACPMessage) = ()
        self.routing_table: Dict(str, str) = {}  
   
    def register_agent(self, agent: ACPAgent):
        """Register an agent with the message broker"""
        self.agents(agent.agent_id) = agent
        self.routing_table(agent.agent_id) = "local"
        print(f"✓ Registered agent: {agent.name} ({agent.agent_id})")
   
    def route_message(self, message: ACPMessage) -> bool:
        """Route ACP message to appropriate recipient"""
        if message.receiver not in self.agents:
            print(f"✗ Receiver {message.receiver} not found")
            return False
       
        print(f"\n📨 ACP MESSAGE ROUTING:")
        print(f"From: {message.sender} → To: {message.receiver}")
        print(f"Performative: {message.performative}")
        print(f"Content: {json.dumps(message.content, indent=2)}")
       
        receiver_agent = self.agents(message.receiver)
        response = receiver_agent.process_message(message)
       
        self.message_log.append(message)
       
        if response:
            print(f"\n📤 GENERATED RESPONSE:")
            print(f"From: {response.sender} → To: {response.receiver}")
            print(f"Content: {json.dumps(response.content, indent=2)}")
           
            if response.receiver in self.agents:
                self.agents(response.receiver).process_message(response)
                self.message_log.append(response)
       
        return True
   
    def broadcast_message(self, message: ACPMessage, recipients: List(str)):
        """Broadcast message to multiple recipients"""
        for recipient in recipients:
            msg_copy = ACPMessage(
                message_id=str(uuid.uuid4()),
                sender=message.sender,
                receiver=recipient,
                performative=message.performative,
                content=message.content.copy(),
                conversation_id=message.conversation_id
            )
            self.route_message(msg_copy)

Acpmessagebroker служит центральным маршрутизатором для сообщений ACP, поддерживая реестр агентов и журнал сообщений. Он предоставляет методы для регистрации агентов, доставки отдельных сообщений через route_message, которые обрабатывают поиск, регистрацию и цепочку ответов, и отправлять одно и то же сообщение нескольким получателям с помощью Broadcast_message.

def demonstrate_acp():
    """Comprehensive demonstration of Agent Communication Protocol"""
   
    print("🤖 AGENT COMMUNICATION PROTOCOL (ACP) DEMONSTRATION")
    print("=" * 60)
   
    broker = ACPMessageBroker()
   
    researcher = ACPAgent("agent-001", "Dr. Research", ("analysis", "research", "data-processing"))
    assistant = ACPAgent("agent-002", "AI Assistant", ("information", "scheduling", "communication"))
    calculator = ACPAgent("agent-003", "MathBot", ("calculation", "mathematics", "computation"))
   
    broker.register_agent(researcher)
    broker.register_agent(assistant)
    broker.register_agent(calculator)
   
    print(f"\n📋 REGISTERED AGENTS:")
    for agent_id, agent in broker.agents.items():
        print(f"  • {agent.name} ({agent_id}): {', '.join(agent.capabilities)}")
   
    print(f"\n🔬 SCENARIO 1: Information Query (ASK performative)")
    query_msg = assistant.send_query("agent-001", "What are the key factors in AI research?")
    broker.route_message(query_msg)
   
    print(f"\n🔢 SCENARIO 2: Action Request (REQUEST-ACTION performative)")
    calc_request = researcher.send_request("agent-003", "calculate", {"expression": "sqrt(144) + 10"})
    broker.route_message(calc_request)
   
    print(f"\n📢 SCENARIO 3: Information Sharing (TELL performative)")
    info_msg = researcher.send_inform("agent-002", "New research paper published on quantum computing")
    broker.route_message(info_msg)
   
    print(f"\n📊 PROTOCOL STATISTICS:")
    print(f"  • Total messages processed: {len(broker.message_log)}")
    print(f"  • Active conversations: {len(set(msg.conversation_id for msg in broker.message_log))}")
    print(f"  • Message types used: {len(set(msg.performative for msg in broker.message_log))}")
   
    print(f"\n📋 SAMPLE ACP MESSAGE FORMAT:")
    sample_msg = assistant.send_query("agent-001", "Sample question for format demonstration")
    print(sample_msg.to_acp_format())

Функция DEMONATE_ACP организует практическое прохождение всей структуры ACP: она инициализирует брокера и три различных агента (исследователь, помощник искусственного интеллекта и Mathbot), регистрирует их и иллюстрирует три сценария ключевых взаимодействий, запросы на информацию, запрос вычисления и обмен обновлением. После маршрутизации каждого сообщения и обработки ответов он печатает сводную статистику по потоку сообщений. Он демонстрирует форматированное сообщение ACP, предоставляя пользователям четкий, сквозной пример того, как агенты общаются в соответствии с протоколом.

def setup_guide():
    print("""
    🚀 GOOGLE COLAB SETUP GUIDE:
   
    1. Get Gemini API Key: https://makersuite.google.com/app/apikey
    2. Replace: GEMINI_API_KEY = "YOUR_ACTUAL_API_KEY"
    3. Run: demonstrate_acp()
   
    🔧 ACP PROTOCOL FEATURES:
   
    • Standardized message format with required fields
    • Speech act performatives (TELL, ASK, REQUEST-ACTION, etc.)
    • Conversation tracking and message threading
    • Error handling and acknowledgments
    • Message routing and delivery confirmation
   
    📝 EXTEND THE PROTOCOL:
    ```python
    # Create custom agent
    my_agent = ACPAgent("my-001", "CustomBot", ("custom-capability"))
    broker.register_agent(my_agent)
   
    # Send custom message
    msg = my_agent.send_query("agent-001", "Your question here")
    broker.route_message(msg)
    ```
    """)


if __name__ == "__main__":
    setup_guide()
    demonstrate_acp() 

Наконец, функция setup_guide предоставляет справочник быстрого начала для запуска демонстрации ACP в Google Colab, в котором изложено, как получить и настроить ключ API Gemini и вызвать подпрограмму Demonate_acp. Он также суммирует функции протокола ключей, такие как стандартизированные форматы сообщений, перформанс и маршрутизацию сообщений. Он предоставляет краткий фрагмент кода, иллюстрирующий, как зарегистрировать пользовательские агенты и отправлять адаптированные сообщения.

В заключение, это руководство реализует многоагентные системы на основе ACP, способные к исследованиям, вычислению и совместной работе. Предоставленные примеры сценариев иллюстрируют общие варианты использования, информационные запросы, вычислительные запросы и обмен фактами, в то время как брокер обеспечивает надежную доставку сообщений и ведение журнала. Читателям рекомендуется расширить структуру, добавляя новые возможности агента, интегрируя действия, специфичные для домена, или включив более сложные механизмы подписки и уведомлений.


Загрузите ноутбук на GitHubПолем Весь кредит на это исследование направлено на исследователей этого проекта. Кроме того, не стесняйтесь следить за нами Twitter И не забудьте присоединиться к нашему 95K+ ML Subreddit и подписаться на Наша информационный бюллетеньПолем


ASIF Razzaq является генеральным директором Marktechpost Media Inc. как дальновидного предпринимателя и инженера, ASIF стремится использовать потенциал искусственного интеллекта для социального блага. Его последнее усилие-запуск медиа-платформы искусственного интеллекта, Marktechpost, которая выделяется благодаря глубокому освещению машинного обучения и новостей о глубоком обучении, которое является технически обоснованным и легко понятным для широкой аудитории. Платформа может похвастаться более чем 2 миллионами ежемесячных просмотров, иллюстрируя свою популярность среди зрителей.

Source link

Scroll to Top