A2A 通信协议与协作
A2A(Agent-to-Agent)协议是多 Agent 系统协作的基础,实现 Agent 间的高效通信和任务协调。
一、核心概念
1.1 什么是 A2A
A2A = Agent-to-Agent Protocol
目标:
- 标准化 Agent 间通信
- 支持任务分发和协作
- 实现结果聚合
1.2 通信模式
1. 请求 - 响应(Request-Response)
Agent A → Agent B → 结果
2. 发布 - 订阅(Pub-Sub)
Agent A → Topic → Agent B, C, D
3. 广播(Broadcast)
Agent A → 所有 Agent
4. 点对点(P2P)
Agent A ↔ Agent B
二、协议规范
2.1 消息格式
{
"message_id": "msg_123",
"sender": "agent_a",
"receiver": "agent_b",
"type": "task_request",
"timestamp": "2025-01-10T10:00:00Z",
"content": {
"task": "search",
"params": {
"query": "AI news"
}
},
"metadata": {
"priority": "high",
"timeout": 30
}
}
2.2 消息类型
class MessageType:
TASK_REQUEST = "task_request" # 任务请求
TASK_RESPONSE = "task_response" # 任务响应
STATUS_UPDATE = "status_update" # 状态更新
HEARTBEAT = "heartbeat" # 心跳
BROADCAST = "broadcast" # 广播
2.3 通信接口
class A2AProtocol:
async def send(self, message: Message) -> bool:
"""发送消息"""
raise NotImplementedError
async def receive(self) -> Message:
"""接收消息"""
raise NotImplementedError
async def broadcast(self, message: Message) -> int:
"""广播消息,返回接收的 Agent 数量"""
raise NotImplementedError
三、任务分发
3.1 任务分解
class TaskDecomposer:
def decompose(self, task: str) -> list[SubTask]:
"""将复杂任务分解为子任务"""
prompt = f"""
请将以下任务分解为可并行执行的子任务:
任务:{task}
子任务列表:
"""
response = llm.generate(prompt)
return parse_subtasks(response)
3.2 任务分配
class TaskDispatcher:
def __init__(self, agents: list[Agent]):
self.agents = agents
self.load_balancer = LoadBalancer()
async def dispatch(self, subtasks: list[SubTask]) -> dict:
"""分配子任务给合适的 Agent"""
assignments = {}
for task in subtasks:
# 选择最合适的 Agent
agent = self.select_agent(task)
# 发送任务
await agent.execute(task)
assignments[agent.id] = task
return assignments
def select_agent(self, task: SubTask) -> Agent:
# 基于能力匹配
capable_agents = [
a for a in self.agents
if a.can_handle(task)
]
# 基于负载均衡
return self.load_balancer.select(capable_agents)
3.3 结果聚合
class ResultAggregator:
def __init__(self, strategy: str = "merge"):
self.strategy = strategy
async def aggregate(
self,
results: list[Result]
) -> Result:
if self.strategy == "merge":
return self.merge_results(results)
elif self.strategy == "vote":
return self.vote_results(results)
elif self.strategy == "chain":
return self.chain_results(results)
def merge_results(self, results: list) -> Result:
# 合并所有结果
merged = {}
for r in results:
merged.update(r.data)
return Result(data=merged)
def vote_results(self, results: list) -> Result:
# 投票选择
from collections import Counter
votes = Counter([r.answer for r in results])
return Result(answer=votes.most_common(1)[0][0])
四、协作模式
4.1 链式协作
class ChainCollaboration:
def __init__(self, agents: list[Agent]):
self.agents = agents
async def execute(self, initial_task: str) -> Result:
result = None
for agent in self.agents:
task = self.prepare_task(initial_task, result)
result = await agent.execute(task)
return result
4.2 并行协作
class ParallelCollaboration:
async def execute(
self,
tasks: list[Task],
agents: list[Agent]
) -> list[Result]:
# 并行执行
coroutines = [
agent.execute(task)
for agent, task in zip(agents, tasks)
]
results = await asyncio.gather(*coroutines)
return results
4.3 协商协作
class NegotiationCollaboration:
async def negotiate(
self,
task: Task,
agents: list[Agent]
) -> Agent:
# 招标
bids = []
for agent in agents:
bid = await agent.bid(task)
bids.append((agent, bid))
# 选择最优投标
best_agent, best_bid = max(
bids,
key=lambda x: x[1].score
)
return best_agent
五、实现案例
5.1 消息总线
class MessageBus:
def __init__(self):
self.topics = defaultdict(list)
self.subscribers = defaultdict(list)
def publish(self, topic: str, message: Message):
"""发布消息到主题"""
self.topics[topic].append(message)
# 通知订阅者
for subscriber in self.subscribers[topic]:
subscriber.on_message(message)
def subscribe(self, topic: str, agent: Agent):
"""订阅主题"""
self.subscribers[topic].append(agent)
5.2 Agent 注册中心
class AgentRegistry:
def __init__(self):
self.agents = {}
def register(self, agent: Agent):
self.agents[agent.id] = {
"agent": agent,
"capabilities": agent.capabilities,
"status": "active",
"load": 0
}
def find_agents(self, capability: str) -> list[Agent]:
return [
info["agent"]
for info in self.agents.values()
if capability in info["capabilities"]
and info["status"] == "active"
]
5.3 超时处理
async def execute_with_timeout(
agent: Agent,
task: Task,
timeout: int = 30
) -> Result:
try:
return await asyncio.wait_for(
agent.execute(task),
timeout=timeout
)
except asyncio.TimeoutError:
return Result(
success=False,
error=f"Timeout after {timeout}s"
)
六、最佳实践
6.1 通信优化
# 批量发送
class BatchSender:
def __init__(self, batch_size: int = 10):
self.batch_size = batch_size
self.buffer = []
async def send(self, message: Message):
self.buffer.append(message)
if len(self.buffer) >= self.batch_size:
await self.flush()
async def flush(self):
if self.buffer:
await self.protocol.send_batch(self.buffer)
self.buffer = []
6.2 错误恢复
class FaultTolerance:
async def execute_with_fallback(
self,
primary_agent: Agent,
fallback_agents: list[Agent],
task: Task
) -> Result:
# 尝试主 Agent
try:
return await primary_agent.execute(task)
except Exception:
pass
# 尝试备用 Agent
for agent in fallback_agents:
try:
return await agent.execute(task)
except Exception:
continue
return Result(success=False, error="All agents failed")
七、总结
A2A 协议核心要点:
| 组件 | 作用 | 实现方式 |
|---|---|---|
| 消息格式 | 标准化通信 | JSON Schema |
| 任务分发 | 分配子任务 | 负载均衡 |
| 结果聚合 | 合并结果 | Merge/Vote/Chain |
| 协作模式 | 多 Agent 协作 | 链式/并行/协商 |
A2A 协议是多 Agent 系统的基础,良好的设计能实现高效的 Agent 协作。