Skip to content
清晨的一缕阳光
返回

A2A 通信协议与协作

A2A 通信协议与协作

A2A(Agent-to-Agent)协议是多 Agent 系统协作的基础,实现 Agent 间的高效通信和任务协调。

一、核心概念

1.1 什么是 A2A

A2A = Agent-to-Agent Protocol

目标:
- 标准化 Agent 间通信
- 支持任务分发和协作
- 实现结果聚合

1.2 通信模式

1. 请求 - 响应(Request-Response)
   Agent A → Agent B → 结果

2. 发布 - 订阅(Pub-Sub)
   Agent A → Topic → Agent B, C, D

3. 广播(Broadcast)
   Agent A → 所有 Agent

4. 点对点(P2P)
   Agent A ↔ Agent B

二、协议规范

2.1 消息格式

{
  "message_id": "msg_123",
  "sender": "agent_a",
  "receiver": "agent_b",
  "type": "task_request",
  "timestamp": "2025-01-10T10:00:00Z",
  "content": {
    "task": "search",
    "params": {
      "query": "AI news"
    }
  },
  "metadata": {
    "priority": "high",
    "timeout": 30
  }
}

2.2 消息类型

class MessageType:
    TASK_REQUEST = "task_request"      # 任务请求
    TASK_RESPONSE = "task_response"    # 任务响应
    STATUS_UPDATE = "status_update"    # 状态更新
    HEARTBEAT = "heartbeat"            # 心跳
    BROADCAST = "broadcast"            # 广播

2.3 通信接口

class A2AProtocol:
    async def send(self, message: Message) -> bool:
        """发送消息"""
        raise NotImplementedError
    
    async def receive(self) -> Message:
        """接收消息"""
        raise NotImplementedError
    
    async def broadcast(self, message: Message) -> int:
        """广播消息,返回接收的 Agent 数量"""
        raise NotImplementedError

三、任务分发

3.1 任务分解

class TaskDecomposer:
    def decompose(self, task: str) -> list[SubTask]:
        """将复杂任务分解为子任务"""
        prompt = f"""
请将以下任务分解为可并行执行的子任务:

任务:{task}

子任务列表:
"""
        response = llm.generate(prompt)
        return parse_subtasks(response)

3.2 任务分配

class TaskDispatcher:
    def __init__(self, agents: list[Agent]):
        self.agents = agents
        self.load_balancer = LoadBalancer()
    
    async def dispatch(self, subtasks: list[SubTask]) -> dict:
        """分配子任务给合适的 Agent"""
        assignments = {}
        
        for task in subtasks:
            # 选择最合适的 Agent
            agent = self.select_agent(task)
            
            # 发送任务
            await agent.execute(task)
            
            assignments[agent.id] = task
        
        return assignments
    
    def select_agent(self, task: SubTask) -> Agent:
        # 基于能力匹配
        capable_agents = [
            a for a in self.agents 
            if a.can_handle(task)
        ]
        
        # 基于负载均衡
        return self.load_balancer.select(capable_agents)

3.3 结果聚合

class ResultAggregator:
    def __init__(self, strategy: str = "merge"):
        self.strategy = strategy
    
    async def aggregate(
        self, 
        results: list[Result]
    ) -> Result:
        if self.strategy == "merge":
            return self.merge_results(results)
        elif self.strategy == "vote":
            return self.vote_results(results)
        elif self.strategy == "chain":
            return self.chain_results(results)
    
    def merge_results(self, results: list) -> Result:
        # 合并所有结果
        merged = {}
        for r in results:
            merged.update(r.data)
        return Result(data=merged)
    
    def vote_results(self, results: list) -> Result:
        # 投票选择
        from collections import Counter
        votes = Counter([r.answer for r in results])
        return Result(answer=votes.most_common(1)[0][0])

四、协作模式

4.1 链式协作

class ChainCollaboration:
    def __init__(self, agents: list[Agent]):
        self.agents = agents
    
    async def execute(self, initial_task: str) -> Result:
        result = None
        
        for agent in self.agents:
            task = self.prepare_task(initial_task, result)
            result = await agent.execute(task)
        
        return result

4.2 并行协作

class ParallelCollaboration:
    async def execute(
        self, 
        tasks: list[Task],
        agents: list[Agent]
    ) -> list[Result]:
        # 并行执行
        coroutines = [
            agent.execute(task)
            for agent, task in zip(agents, tasks)
        ]
        
        results = await asyncio.gather(*coroutines)
        return results

4.3 协商协作

class NegotiationCollaboration:
    async def negotiate(
        self, 
        task: Task,
        agents: list[Agent]
    ) -> Agent:
        # 招标
        bids = []
        for agent in agents:
            bid = await agent.bid(task)
            bids.append((agent, bid))
        
        # 选择最优投标
        best_agent, best_bid = max(
            bids, 
            key=lambda x: x[1].score
        )
        
        return best_agent

五、实现案例

5.1 消息总线

class MessageBus:
    def __init__(self):
        self.topics = defaultdict(list)
        self.subscribers = defaultdict(list)
    
    def publish(self, topic: str, message: Message):
        """发布消息到主题"""
        self.topics[topic].append(message)
        
        # 通知订阅者
        for subscriber in self.subscribers[topic]:
            subscriber.on_message(message)
    
    def subscribe(self, topic: str, agent: Agent):
        """订阅主题"""
        self.subscribers[topic].append(agent)

5.2 Agent 注册中心

class AgentRegistry:
    def __init__(self):
        self.agents = {}
    
    def register(self, agent: Agent):
        self.agents[agent.id] = {
            "agent": agent,
            "capabilities": agent.capabilities,
            "status": "active",
            "load": 0
        }
    
    def find_agents(self, capability: str) -> list[Agent]:
        return [
            info["agent"]
            for info in self.agents.values()
            if capability in info["capabilities"]
            and info["status"] == "active"
        ]

5.3 超时处理

async def execute_with_timeout(
    agent: Agent,
    task: Task,
    timeout: int = 30
) -> Result:
    try:
        return await asyncio.wait_for(
            agent.execute(task),
            timeout=timeout
        )
    except asyncio.TimeoutError:
        return Result(
            success=False,
            error=f"Timeout after {timeout}s"
        )

六、最佳实践

6.1 通信优化

# 批量发送
class BatchSender:
    def __init__(self, batch_size: int = 10):
        self.batch_size = batch_size
        self.buffer = []
    
    async def send(self, message: Message):
        self.buffer.append(message)
        
        if len(self.buffer) >= self.batch_size:
            await self.flush()
    
    async def flush(self):
        if self.buffer:
            await self.protocol.send_batch(self.buffer)
            self.buffer = []

6.2 错误恢复

class FaultTolerance:
    async def execute_with_fallback(
        self,
        primary_agent: Agent,
        fallback_agents: list[Agent],
        task: Task
    ) -> Result:
        # 尝试主 Agent
        try:
            return await primary_agent.execute(task)
        except Exception:
            pass
        
        # 尝试备用 Agent
        for agent in fallback_agents:
            try:
                return await agent.execute(task)
            except Exception:
                continue
        
        return Result(success=False, error="All agents failed")

七、总结

A2A 协议核心要点:

组件作用实现方式
消息格式标准化通信JSON Schema
任务分发分配子任务负载均衡
结果聚合合并结果Merge/Vote/Chain
协作模式多 Agent 协作链式/并行/协商

A2A 协议是多 Agent 系统的基础,良好的设计能实现高效的 Agent 协作。


分享这篇文章到:

上一篇文章
Spring Boot 代码质量与规范
下一篇文章
网关统一鉴权