多 Agent 协作框架实战
复杂任务往往需要多个 Agent 协作完成。如何设计多 Agent 协作系统?如何实现 Agent 间的高效通信?本文深入解析多 Agent 协作框架的实战方案。
一、多 Agent 架构
1.1 协作模式
多 Agent 协作模式:
┌─────────────────────────────────────┐
│ 1. 层级模式(Hierarchical) │
│ - 主管 Agent 分配任务 │
│ - 执行 Agent 完成任务 │
│ - 适合结构化任务 │
├─────────────────────────────────────┤
│ 2. 对等模式(Peer-to-Peer) │
│ - Agent 平等协作 │
│ - 协商完成任务 │
│ - 适合开放性任务 │
├─────────────────────────────────────┤
│ 3. 流水线模式(Pipeline) │
│ - Agent 按顺序处理 │
│ - 每个 Agent 负责一个环节 │
│ - 适合流程化任务 │
├─────────────────────────────────────┤
│ 4. 委员会模式(Committee) │
│ - 多个 Agent 独立决策 │
│ - 投票或整合结果 │
│ - 适合高准确度要求 │
└─────────────────────────────────────┘
1.2 架构设计
graph TB
A[用户请求] --> B[协调器]
B --> C{任务类型}
C -->|分析 | D[分析 Agent]
C -->|编码 | E[编码 Agent]
C -->|测试 | F[测试 Agent]
D --> G[结果整合]
E --> G
F --> G
G --> H[最终响应]
二、Agent 角色定义
2.1 基础 Agent 类
# agent_base.py
from abc import ABC, abstractmethod
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum
class AgentStatus(Enum):
"""Agent 状态"""
IDLE = "idle"
BUSY = "busy"
ERROR = "error"
@dataclass
class AgentMessage:
"""Agent 消息"""
sender: str
content: str
message_type: str # request, response, notification
metadata: Dict = None
class BaseAgent(ABC):
"""Agent 基类"""
def __init__(self, name: str, llm):
"""
初始化
Args:
name: Agent 名称
llm: LLM 模型
"""
self.name = name
self.llm = llm
self.status = AgentStatus.IDLE
self.message_queue: List[AgentMessage] = []
self.context: Dict = {}
@abstractmethod
def process(self, input_data: Dict) -> Dict:
"""处理任务"""
pass
def receive_message(self, message: AgentMessage):
"""接收消息"""
self.message_queue.append(message)
def send_message(
self,
receiver: str,
content: str,
message_type: str = "notification",
metadata: Dict = None
) -> AgentMessage:
"""发送消息"""
message = AgentMessage(
sender=self.name,
content=content,
message_type=message_type,
metadata=metadata or {}
)
return message
def get_available(self) -> bool:
"""检查是否可用"""
return self.status == AgentStatus.IDLE
def set_status(self, status: AgentStatus):
"""设置状态"""
self.status = status
2.2 专业 Agent 实现
# specialized_agents.py
from agent_base import BaseAgent, AgentStatus
class AnalystAgent(BaseAgent):
"""分析 Agent"""
def __init__(self, llm):
super().__init__("analyst", llm)
self.specialties = [
"数据分析",
"问题诊断",
"方案评估"
]
def process(self, input_data: Dict) -> Dict:
"""分析处理"""
self.set_status(AgentStatus.BUSY)
prompt = f"""
请分析以下问题:
{input_data.get('problem', '')}
请从以下角度分析:
1. 问题本质是什么?
2. 可能的原因有哪些?
3. 建议的解决方案是什么?
"""
analysis = self.llm.generate(prompt)
self.set_status(AgentStatus.IDLE)
return {
'analysis': analysis,
'agent': self.name
}
class CoderAgent(BaseAgent):
"""编码 Agent"""
def __init__(self, llm):
super().__init__("coder", llm)
self.languages = ["Python", "JavaScript", "SQL"]
def process(self, input_data: Dict) -> Dict:
"""编码处理"""
self.set_status(AgentStatus.BUSY)
prompt = f"""
请编写代码实现以下功能:
需求:{input_data.get('requirement', '')}
语言:{input_data.get('language', 'Python')}
要求:
1. 代码简洁清晰
2. 包含必要的注释
3. 考虑边界情况
"""
code = self.llm.generate(prompt)
self.set_status(AgentStatus.IDLE)
return {
'code': code,
'language': input_data.get('language', 'Python'),
'agent': self.name
}
class ReviewerAgent(BaseAgent):
"""审查 Agent"""
def __init__(self, llm):
super().__init__("reviewer", llm)
self.check_categories = [
"代码质量",
"安全性",
"性能",
"可维护性"
]
def process(self, input_data: Dict) -> Dict:
"""审查处理"""
self.set_status(AgentStatus.BUSY)
prompt = f"""
请审查以下内容:
内容:
{input_data.get('content', '')}
类型:{input_data.get('content_type', 'code')}
请从以下角度审查:
1. 是否存在错误?
2. 是否有改进空间?
3. 具体建议是什么?
"""
review = self.llm.generate(prompt)
self.set_status(AgentStatus.IDLE)
return {
'review': review,
'issues_found': self._count_issues(review),
'agent': self.name
}
def _count_issues(self, review: str) -> int:
"""统计问题数"""
# 简单统计
return review.count("问题") + review.count("建议")
三、通信协议
3.1 消息总线
# message_bus.py
from typing import Dict, List, Callable
from queue import Queue
import threading
class MessageBus:
"""消息总线"""
def __init__(self):
self.subscribers: Dict[str, List[Callable]] = {}
self.message_queues: Dict[str, Queue] = {}
self.lock = threading.Lock()
def subscribe(
self,
agent_name: str,
callback: Callable
):
"""订阅消息"""
with self.lock:
if agent_name not in self.subscribers:
self.subscribers[agent_name] = []
self.message_queues[agent_name] = Queue()
self.subscribers[agent_name].append(callback)
def publish(
self,
topic: str,
message: Dict
):
"""发布消息"""
with self.lock:
if topic in self.subscribers:
for callback in self.subscribers[topic]:
callback(message)
def send_to_agent(
self,
agent_name: str,
message: Dict
):
"""发送消息给 Agent"""
if agent_name in self.message_queues:
self.message_queues[agent_name].put(message)
def get_message(
self,
agent_name: str,
timeout: float = 1.0
) -> Optional[Dict]:
"""获取消息"""
if agent_name in self.message_queues:
try:
return self.message_queues[agent_name].get(timeout=timeout)
except:
return None
return None
3.2 通信协议定义
# communication_protocol.py
from typing import Dict, List, Optional
from dataclasses import dataclass, asdict
from enum import Enum
import json
class MessageType(Enum):
"""消息类型"""
TASK_ASSIGN = "task_assign"
TASK_COMPLETE = "task_complete"
HELP_REQUEST = "help_request"
INFO_SHARE = "info_share"
DECISION_REQUEST = "decision_request"
DECISION_RESPONSE = "decision_response"
@dataclass
class ProtocolMessage:
"""协议消息"""
message_id: str
message_type: MessageType
sender: str
receiver: str
content: Dict
timestamp: float
in_reply_to: Optional[str] = None
class CommunicationProtocol:
"""通信协议"""
def __init__(self):
self.message_history: List[ProtocolMessage] = []
self.pending_requests: Dict[str, ProtocolMessage] = {}
def create_message(
self,
message_type: MessageType,
sender: str,
receiver: str,
content: Dict,
in_reply_to: Optional[str] = None
) -> ProtocolMessage:
"""创建消息"""
import uuid
import time
message = ProtocolMessage(
message_id=str(uuid.uuid4()),
message_type=message_type,
sender=sender,
receiver=receiver,
content=content,
timestamp=time.time(),
in_reply_to=in_reply_to
)
self.message_history.append(message)
if in_reply_to and in_reply_to in self.pending_requests:
del self.pending_requests[in_reply_to]
return message
def serialize(self, message: ProtocolMessage) -> str:
"""序列化消息"""
return json.dumps(asdict(message))
def deserialize(self, json_str: str) -> ProtocolMessage:
"""反序列化消息"""
data = json.loads(json_str)
data['message_type'] = MessageType(data['message_type'])
return ProtocolMessage(**data)
def get_conversation_thread(
self,
message_id: str
) -> List[ProtocolMessage]:
"""获取对话线索"""
thread = []
current_id = message_id
while current_id:
for msg in self.message_history:
if msg.message_id == current_id:
thread.insert(0, msg)
current_id = msg.in_reply_to
break
else:
break
return thread
四、任务分配
4.1 任务分解器
# task_decomposer.py
from typing import List, Dict
class TaskDecomposer:
"""任务分解器"""
def __init__(self, llm):
self.llm = llm
def decompose(
self,
task: str,
available_agents: List[str]
) -> List[Dict]:
"""
分解任务
Args:
task: 原始任务
available_agents: 可用 Agent 列表
Returns:
子任务列表
"""
prompt = f"""
请将以下任务分解为可并行执行的子任务:
任务:{task}
可用 Agent 角色:{', '.join(available_agents)}
请按以下 JSON 格式输出:
{{
"subtasks": [
{{
"id": "task_1",
"description": "任务描述",
"assigned_to": "agent_name",
"dependencies": [],
"estimated_time": "minutes"
}}
]
}}
"""
response = self.llm.generate(prompt)
result = self._parse_json(response)
return result.get('subtasks', [])
def _parse_json(self, text: str) -> Dict:
"""解析 JSON"""
import json
import re
match = re.search(r'\{.*\}', text, re.DOTALL)
if match:
return json.loads(match.group())
return {}
4.2 任务调度器
# task_scheduler.py
from typing import List, Dict, Optional
from queue import PriorityQueue
import threading
class TaskScheduler:
"""任务调度器"""
def __init__(self):
self.task_queue = PriorityQueue()
self.agent_status: Dict[str, str] = {}
self.task_results: Dict[str, Dict] = {}
self.lock = threading.Lock()
def add_task(
self,
task_id: str,
task_data: Dict,
priority: int = 0
):
"""添加任务"""
self.task_queue.put((
-priority, # 优先级高的先执行
task_id,
task_data
))
def assign_task(
self,
task_id: str,
agent_name: str
):
"""分配任务给 Agent"""
with self.lock:
self.agent_status[agent_name] = 'busy'
def complete_task(
self,
task_id: str,
agent_name: str,
result: Dict
):
"""完成任务"""
with self.lock:
self.agent_status[agent_name] = 'idle'
self.task_results[task_id] = result
def get_next_task(
self,
agent_name: str
) -> Optional[tuple]:
"""获取下一个任务"""
if self.agent_status.get(agent_name) == 'busy':
return None
try:
priority, task_id, task_data = self.task_queue.get_nowait()
return (task_id, task_data)
except:
return None
def get_all_results(self) -> Dict:
"""获取所有结果"""
return self.task_results.copy()
五、协作流程
5.1 协调器实现
# coordinator.py
from typing import List, Dict
from agent_base import BaseAgent
from task_scheduler import TaskScheduler
from message_bus import MessageBus
class Coordinator:
"""协调器"""
def __init__(self, agents: List[BaseAgent]):
"""
初始化
Args:
agents: Agent 列表
"""
self.agents = {agent.name: agent for agent in agents}
self.scheduler = TaskScheduler()
self.message_bus = MessageBus()
self.decomposer = TaskDecomposer(self.agents[list(self.agents.keys())[0]].llm)
self._setup_message_handlers()
def _setup_message_handlers(self):
"""设置消息处理器"""
for agent_name in self.agents:
self.message_bus.subscribe(
agent_name,
lambda msg, name=agent_name: self._handle_message(name, msg)
)
def _handle_message(self, agent_name: str, message: Dict):
"""处理消息"""
agent = self.agents.get(agent_name)
if agent:
agent.receive_message(message)
def execute_task(self, task: str) -> Dict:
"""
执行任务
Args:
task: 任务描述
Returns:
执行结果
"""
# 1. 分解任务
subtasks = self.decomposer.decompose(
task,
list(self.agents.keys())
)
# 2. 添加任务到调度器
for i, subtask in enumerate(subtasks):
self.scheduler.add_task(
subtask['id'],
subtask,
priority=i
)
# 3. 分配和执行任务
results = []
for agent_name, agent in self.agents.items():
while True:
task_info = self.scheduler.get_next_task(agent_name)
if not task_info:
break
task_id, task_data = task_info
self.scheduler.assign_task(task_id, agent_name)
# 执行任务
result = agent.process(task_data)
self.scheduler.complete_task(task_id, agent_name, result)
results.append(result)
# 4. 整合结果
final_result = self._integrate_results(results)
return final_result
def _integrate_results(self, results: List[Dict]) -> Dict:
"""整合结果"""
# 使用 LLM 整合
llm = self.agents[list(self.agents.keys())[0]].llm
results_text = '\n\n'.join([
f"Agent: {r.get('agent', 'unknown')}\nResult: {r}"
for r in results
])
prompt = f"""
请整合以下 Agent 的执行结果:
{results_text}
请综合所有结果,给出完整的最终答案。
"""
integrated = llm.generate(prompt)
return {
'integrated_result': integrated,
'individual_results': results
}
5.2 流水线模式
# pipeline_mode.py
from typing import List, Dict
class PipelineAgent:
"""流水线 Agent"""
def __init__(self, agents: List[BaseAgent]):
"""
初始化
Args:
agents: Agent 列表(按顺序)
"""
self.agents = agents
def process(self, input_data: Dict) -> Dict:
"""流水线处理"""
current_data = input_data
for i, agent in enumerate(self.agents):
# 执行当前环节
result = agent.process(current_data)
# 传递到下一环节
current_data = {
**current_data,
**result,
'stage': i + 1,
'stage_result': result
}
return current_data
# 使用示例:代码开发流水线
def create_code_pipeline(llm):
"""创建代码开发流水线"""
agents = [
AnalystAgent(llm), # 需求分析
CoderAgent(llm), # 代码编写
ReviewerAgent(llm) # 代码审查
]
return PipelineAgent(agents)
# 执行
pipeline = create_code_pipeline(llm)
result = pipeline.process({
'requirement': '实现一个快速排序算法',
'language': 'Python'
})
六、实战案例
6.1 软件开发协作
# software_dev_collaboration.py
class SoftwareDevelopmentTeam:
"""软件开发团队"""
def __init__(self, llm):
self.agents = {
'analyst': AnalystAgent(llm),
'architect': ArchitectAgent(llm),
'coder': CoderAgent(llm),
'reviewer': ReviewerAgent(llm),
'tester': TesterAgent(llm)
}
self.coordinator = Coordinator(list(self.agents.values()))
def develop_feature(self, requirement: str) -> Dict:
"""开发功能"""
# 1. 需求分析
analysis = self.agents['analyst'].process({
'problem': requirement
})
# 2. 架构设计
design = self.agents['architect'].process({
'requirement': requirement,
'analysis': analysis['analysis']
})
# 3. 代码实现
code = self.agents['coder'].process({
'requirement': requirement,
'design': design['design']
})
# 4. 代码审查
review = self.agents['reviewer'].process({
'content': code['code'],
'content_type': 'code'
})
# 5. 测试
test = self.agents['tester'].process({
'code': code['code'],
'requirement': requirement
})
return {
'analysis': analysis,
'design': design,
'code': code,
'review': review,
'test': test
}
6.2 问题诊断协作
# troubleshooting_collaboration.py
class TroubleshootingTeam:
"""问题诊断团队"""
def __init__(self, llm):
self.agents = {
'diagnoser': DiagnoserAgent(llm),
'researcher': ResearcherAgent(llm),
'solver': SolverAgent(llm)
}
def diagnose_and_solve(self, problem: str) -> Dict:
"""诊断并解决问题"""
# 1. 问题诊断
diagnosis = self.agents['diagnoser'].process({
'problem': problem
})
# 2. 方案研究
research = self.agents['researcher'].process({
'problem': problem,
'diagnosis': diagnosis['diagnosis']
})
# 3. 方案实施
solution = self.agents['solver'].process({
'problem': problem,
'diagnosis': diagnosis['diagnosis'],
'research': research['research']
})
return {
'diagnosis': diagnosis,
'research': research,
'solution': solution
}
七、总结
7.1 核心要点
-
角色分工
- 明确每个 Agent 的职责
- 专业化提升效率
- 避免职责重叠
-
通信协议
- 标准化消息格式
- 异步通信机制
- 消息追踪和回溯
-
任务分配
- 智能任务分解
- 动态任务调度
- 依赖关系管理
7.2 最佳实践
-
合理设计 Agent 角色
- 根据任务需求定义角色
- 保持角色独立性
- 预留扩展空间
-
建立有效通信机制
- 使用消息总线
- 定义清晰协议
- 处理通信异常
-
优化协作流程
- 减少等待时间
- 并行处理任务
- 结果有效整合
参考资料