В этом уроке мы внедряем протокол связи агента (ACP) путем создания гибкой системы обмена сообщениями ACP в Python, используя Google Gemini API для обработки естественного языка. Начиная с установки и конфигурации библиотеки Google-Generativeai, учебник вводит основные абстракции, типы сообщений, работоспособность и класс данных ACPMessage, который стандартизирует межагентную связь. Определяя классы Acpagent и Acpmessagebroker, руководство демонстрирует, как создавать, отправлять, маршрут и обрабатывать структурированные сообщения среди нескольких автономных агентов. С помощью примеров четкого кода пользователи учатся реализовать запросы, запрашивать действия и информацию о вещании, сохраняя при этом потоки беседы, подтверждения и обработку ошибок.
import google.generativeai as genai
import json
import time
import uuid
from enum import Enum
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
GEMINI_API_KEY = "Use Your Gemini API Key"
genai.configure(api_key=GEMINI_API_KEY)
Мы импортируем Essential Python модули, начиная от обработки JSON и времени до уникальной генерации и аннотаций идентификатора, для поддержки структурированной реализации ACP. Затем он получает заполнитель API API пользователя и настраивает клиента Google-Generativeai для последующих вызовов к модели языка Gemini.
class ACPMessageType(Enum):
"""Standard ACP message types"""
REQUEST = "request"
RESPONSE = "response"
INFORM = "inform"
QUERY = "query"
SUBSCRIBE = "subscribe"
UNSUBSCRIBE = "unsubscribe"
ERROR = "error"
ACK = "acknowledge"
Перечисление ACPMessAgeType определяет основные категории сообщений, используемые в протоколе связи агента, включая запросы, ответы, информационные трансляции, запросы и управление управлением подпиской, сигнализация ошибок и подтверждения. Централизируя эти типы сообщений, протокол обеспечивает последовательную обработку и маршрутизацию межагентных коммуникаций по всей системе.
class ACPPerformative(Enum):
"""ACP speech acts (performatives)"""
TELL = "tell"
ASK = "ask"
REPLY = "reply"
REQUEST_ACTION = "request-action"
AGREE = "agree"
REFUSE = "refuse"
PROPOSE = "propose"
ACCEPT = "accept"
REJECT = "reject"
Акпперформативное перечисление фиксирует разнообразие агентов речевых действий, которые могут использовать при взаимодействии в рамках ACP-структуры, отображая намерения высокого уровня, такие как выполнение запросов, задание вопросов, предоставление команд или переговоры о соглашениях на стандартизированные метки. Эта четкая таксономия позволяет агентам интерпретировать и реагировать на сообщения в контекстном порядке, обеспечивая надежное и семантически богатое общение.
@dataclass
class ACPMessage:
"""Agent Communication Protocol Message Structure"""
message_id: str
sender: str
receiver: str
performative: str
content: Dict(str, Any)
protocol: str = "ACP-1.0"
conversation_id: str = None
reply_to: str = None
language: str = "english"
encoding: str = "json"
timestamp: float = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = time.time()
if self.conversation_id is None:
self.conversation_id = str(uuid.uuid4())
def to_acp_format(self) -> str:
"""Convert to standard ACP message format"""
acp_msg = {
"message-id": self.message_id,
"sender": self.sender,
"receiver": self.receiver,
"performative": self.performative,
"content": self.content,
"protocol": self.protocol,
"conversation-id": self.conversation_id,
"reply-to": self.reply_to,
"language": self.language,
"encoding": self.encoding,
"timestamp": self.timestamp
}
return json.dumps(acp_msg, indent=2)
@classmethod
def from_acp_format(cls, acp_string: str) -> 'ACPMessage':
"""Parse ACP message from string format"""
data = json.loads(acp_string)
return cls(
message_id=data("message-id"),
sender=data("sender"),
receiver=data("receiver"),
performative=data("performative"),
content=data("content"),
protocol=data.get("protocol", "ACP-1.0"),
conversation_id=data.get("conversation-id"),
reply_to=data.get("reply-to"),
language=data.get("language", "english"),
encoding=data.get("encoding", "json"),
timestamp=data.get("timestamp", time.time())
)
Класс данных ACPMessage инкапсулирует все поля, необходимые для структурированного обмена ACP, включая идентификаторы, участники, перформативную, полезную нагрузку и метаданные, такие как версия протокола, язык и временные метки. Его метод __post_init__ Автопуляция пропущенной временной метки и значения разговора_ид, обеспечивая уникальное отслеживание каждого сообщения. Методы полезности TO_ACP_FORMAT и FROM_ACP_FORMAT СЕРИАЛИЗАЦИИ СЕРИАЛИЗАЦИИ В СТАНДАЗИВАНИЕ ИСПОРТАЦИИ для бесшовной передачи и анализа.
class ACPAgent:
"""Agent implementing Agent Communication Protocol"""
def __init__(self, agent_id: str, name: str, capabilities: List(str)):
self.agent_id = agent_id
self.name = name
self.capabilities = capabilities
self.model = genai.GenerativeModel("gemini-1.5-flash")
self.message_queue: List(ACPMessage) = ()
self.subscriptions: Dict(str, List(str)) = {}
self.conversations: Dict(str, List(ACPMessage)) = {}
def create_message(self, receiver: str, performative: str,
content: Dict(str, Any), conversation_id: str = None,
reply_to: str = None) -> ACPMessage:
"""Create a new ACP-compliant message"""
return ACPMessage(
message_id=str(uuid.uuid4()),
sender=self.agent_id,
receiver=receiver,
performative=performative,
content=content,
conversation_id=conversation_id,
reply_to=reply_to
)
def send_inform(self, receiver: str, fact: str, data: Any = None) -> ACPMessage:
"""Send an INFORM message (telling someone a fact)"""
content = {"fact": fact, "data": data}
return self.create_message(receiver, ACPPerformative.TELL.value, content)
def send_query(self, receiver: str, question: str, query_type: str = "yes-no") -> ACPMessage:
"""Send a QUERY message (asking for information)"""
content = {"question": question, "query-type": query_type}
return self.create_message(receiver, ACPPerformative.ASK.value, content)
def send_request(self, receiver: str, action: str, parameters: Dict = None) -> ACPMessage:
"""Send a REQUEST message (asking someone to perform an action)"""
content = {"action": action, "parameters": parameters or {}}
return self.create_message(receiver, ACPPerformative.REQUEST_ACTION.value, content)
def send_reply(self, original_msg: ACPMessage, response_data: Any) -> ACPMessage:
"""Send a REPLY message in response to another message"""
content = {"response": response_data, "original-question": original_msg.content}
return self.create_message(
original_msg.sender,
ACPPerformative.REPLY.value,
content,
conversation_id=original_msg.conversation_id,
reply_to=original_msg.message_id
)
def process_message(self, message: ACPMessage) -> Optional(ACPMessage):
"""Process incoming ACP message and generate appropriate response"""
self.message_queue.append(message)
conv_id = message.conversation_id
if conv_id not in self.conversations:
self.conversations(conv_id) = ()
self.conversations(conv_id).append(message)
if message.performative == ACPPerformative.ASK.value:
return self._handle_query(message)
elif message.performative == ACPPerformative.REQUEST_ACTION.value:
return self._handle_request(message)
elif message.performative == ACPPerformative.TELL.value:
return self._handle_inform(message)
return None
def _handle_query(self, message: ACPMessage) -> ACPMessage:
"""Handle incoming query messages"""
question = message.content.get("question", "")
prompt = f"As agent {self.name} with capabilities {self.capabilities}, answer: {question}"
try:
response = self.model.generate_content(prompt)
answer = response.text.strip()
except:
answer = "Unable to process query at this time"
return self.send_reply(message, {"answer": answer, "confidence": 0.8})
def _handle_request(self, message: ACPMessage) -> ACPMessage:
"""Handle incoming action requests"""
action = message.content.get("action", "")
parameters = message.content.get("parameters", {})
if any(capability in action.lower() for capability in self.capabilities):
result = f"Executing {action} with parameters {parameters}"
status = "agreed"
else:
result = f"Cannot perform {action} - not in my capabilities"
status = "refused"
return self.send_reply(message, {"status": status, "result": result})
def _handle_inform(self, message: ACPMessage) -> Optional(ACPMessage):
"""Handle incoming information messages"""
fact = message.content.get("fact", "")
print(f"({self.name}) Received information: {fact}")
ack_content = {"status": "received", "fact": fact}
return self.create_message(message.sender, "acknowledge", ack_content,
conversation_id=message.conversation_id)
Акпагентный класс инкапсулирует автономную сущность, способную отправлять, получать и обрабатывать сообщения, соответствующие ACP, с использованием языковой модели Gemini. Он управляет своей собственной очередью сообщений, истории разговоров и подпискими и предоставляет вспомогательные методы (send_inform, send_query, send_request, send_reply) для строительства правильно отформатированных экземпляров acpmessage. Входящие сообщения направляются через Process_message, который делегирует специализированные обработчики для запросов, запросов действий и информационных сообщений.
class ACPMessageBroker:
"""Message broker implementing ACP routing and delivery"""
def __init__(self):
self.agents: Dict(str, ACPAgent) = {}
self.message_log: List(ACPMessage) = ()
self.routing_table: Dict(str, str) = {}
def register_agent(self, agent: ACPAgent):
"""Register an agent with the message broker"""
self.agents(agent.agent_id) = agent
self.routing_table(agent.agent_id) = "local"
print(f"✓ Registered agent: {agent.name} ({agent.agent_id})")
def route_message(self, message: ACPMessage) -> bool:
"""Route ACP message to appropriate recipient"""
if message.receiver not in self.agents:
print(f"✗ Receiver {message.receiver} not found")
return False
print(f"\n📨 ACP MESSAGE ROUTING:")
print(f"From: {message.sender} → To: {message.receiver}")
print(f"Performative: {message.performative}")
print(f"Content: {json.dumps(message.content, indent=2)}")
receiver_agent = self.agents(message.receiver)
response = receiver_agent.process_message(message)
self.message_log.append(message)
if response:
print(f"\n📤 GENERATED RESPONSE:")
print(f"From: {response.sender} → To: {response.receiver}")
print(f"Content: {json.dumps(response.content, indent=2)}")
if response.receiver in self.agents:
self.agents(response.receiver).process_message(response)
self.message_log.append(response)
return True
def broadcast_message(self, message: ACPMessage, recipients: List(str)):
"""Broadcast message to multiple recipients"""
for recipient in recipients:
msg_copy = ACPMessage(
message_id=str(uuid.uuid4()),
sender=message.sender,
receiver=recipient,
performative=message.performative,
content=message.content.copy(),
conversation_id=message.conversation_id
)
self.route_message(msg_copy)
Acpmessagebroker служит центральным маршрутизатором для сообщений ACP, поддерживая реестр агентов и журнал сообщений. Он предоставляет методы для регистрации агентов, доставки отдельных сообщений через route_message, которые обрабатывают поиск, регистрацию и цепочку ответов, и отправлять одно и то же сообщение нескольким получателям с помощью Broadcast_message.
def demonstrate_acp():
"""Comprehensive demonstration of Agent Communication Protocol"""
print("🤖 AGENT COMMUNICATION PROTOCOL (ACP) DEMONSTRATION")
print("=" * 60)
broker = ACPMessageBroker()
researcher = ACPAgent("agent-001", "Dr. Research", ("analysis", "research", "data-processing"))
assistant = ACPAgent("agent-002", "AI Assistant", ("information", "scheduling", "communication"))
calculator = ACPAgent("agent-003", "MathBot", ("calculation", "mathematics", "computation"))
broker.register_agent(researcher)
broker.register_agent(assistant)
broker.register_agent(calculator)
print(f"\n📋 REGISTERED AGENTS:")
for agent_id, agent in broker.agents.items():
print(f" • {agent.name} ({agent_id}): {', '.join(agent.capabilities)}")
print(f"\n🔬 SCENARIO 1: Information Query (ASK performative)")
query_msg = assistant.send_query("agent-001", "What are the key factors in AI research?")
broker.route_message(query_msg)
print(f"\n🔢 SCENARIO 2: Action Request (REQUEST-ACTION performative)")
calc_request = researcher.send_request("agent-003", "calculate", {"expression": "sqrt(144) + 10"})
broker.route_message(calc_request)
print(f"\n📢 SCENARIO 3: Information Sharing (TELL performative)")
info_msg = researcher.send_inform("agent-002", "New research paper published on quantum computing")
broker.route_message(info_msg)
print(f"\n📊 PROTOCOL STATISTICS:")
print(f" • Total messages processed: {len(broker.message_log)}")
print(f" • Active conversations: {len(set(msg.conversation_id for msg in broker.message_log))}")
print(f" • Message types used: {len(set(msg.performative for msg in broker.message_log))}")
print(f"\n📋 SAMPLE ACP MESSAGE FORMAT:")
sample_msg = assistant.send_query("agent-001", "Sample question for format demonstration")
print(sample_msg.to_acp_format())
Функция DEMONATE_ACP организует практическое прохождение всей структуры ACP: она инициализирует брокера и три различных агента (исследователь, помощник искусственного интеллекта и Mathbot), регистрирует их и иллюстрирует три сценария ключевых взаимодействий, запросы на информацию, запрос вычисления и обмен обновлением. После маршрутизации каждого сообщения и обработки ответов он печатает сводную статистику по потоку сообщений. Он демонстрирует форматированное сообщение ACP, предоставляя пользователям четкий, сквозной пример того, как агенты общаются в соответствии с протоколом.
def setup_guide():
print("""
🚀 GOOGLE COLAB SETUP GUIDE:
1. Get Gemini API Key: https://makersuite.google.com/app/apikey
2. Replace: GEMINI_API_KEY = "YOUR_ACTUAL_API_KEY"
3. Run: demonstrate_acp()
🔧 ACP PROTOCOL FEATURES:
• Standardized message format with required fields
• Speech act performatives (TELL, ASK, REQUEST-ACTION, etc.)
• Conversation tracking and message threading
• Error handling and acknowledgments
• Message routing and delivery confirmation
📝 EXTEND THE PROTOCOL:
```python
# Create custom agent
my_agent = ACPAgent("my-001", "CustomBot", ("custom-capability"))
broker.register_agent(my_agent)
# Send custom message
msg = my_agent.send_query("agent-001", "Your question here")
broker.route_message(msg)
```
""")
if __name__ == "__main__":
setup_guide()
demonstrate_acp()
Наконец, функция setup_guide предоставляет справочник быстрого начала для запуска демонстрации ACP в Google Colab, в котором изложено, как получить и настроить ключ API Gemini и вызвать подпрограмму Demonate_acp. Он также суммирует функции протокола ключей, такие как стандартизированные форматы сообщений, перформанс и маршрутизацию сообщений. Он предоставляет краткий фрагмент кода, иллюстрирующий, как зарегистрировать пользовательские агенты и отправлять адаптированные сообщения.
В заключение, это руководство реализует многоагентные системы на основе ACP, способные к исследованиям, вычислению и совместной работе. Предоставленные примеры сценариев иллюстрируют общие варианты использования, информационные запросы, вычислительные запросы и обмен фактами, в то время как брокер обеспечивает надежную доставку сообщений и ведение журнала. Читателям рекомендуется расширить структуру, добавляя новые возможности агента, интегрируя действия, специфичные для домена, или включив более сложные механизмы подписки и уведомлений.
Загрузите ноутбук на GitHubПолем Весь кредит на это исследование направлено на исследователей этого проекта. Кроме того, не стесняйтесь следить за нами Twitter И не забудьте присоединиться к нашему 95K+ ML Subreddit и подписаться на Наша информационный бюллетеньПолем
ASIF Razzaq является генеральным директором Marktechpost Media Inc. как дальновидного предпринимателя и инженера, ASIF стремится использовать потенциал искусственного интеллекта для социального блага. Его последнее усилие-запуск медиа-платформы искусственного интеллекта, Marktechpost, которая выделяется благодаря глубокому освещению машинного обучения и новостей о глубоком обучении, которое является технически обоснованным и легко понятным для широкой аудитории. Платформа может похвастаться более чем 2 миллионами ежемесячных просмотров, иллюстрируя свою популярность среди зрителей.
