Skip to content
清晨的一缕阳光
返回

多 Agent 协作框架实战

多 Agent 协作框架实战

复杂任务往往需要多个 Agent 协作完成。如何设计多 Agent 协作系统?如何实现 Agent 间的高效通信?本文深入解析多 Agent 协作框架的实战方案。

一、多 Agent 架构

1.1 协作模式

多 Agent 协作模式:

┌─────────────────────────────────────┐
│ 1. 层级模式(Hierarchical)          │
│    - 主管 Agent 分配任务             │
│    - 执行 Agent 完成任务             │
│    - 适合结构化任务                  │
├─────────────────────────────────────┤
│ 2. 对等模式(Peer-to-Peer)          │
│    - Agent 平等协作                  │
│    - 协商完成任务                    │
│    - 适合开放性任务                  │
├─────────────────────────────────────┤
│ 3. 流水线模式(Pipeline)            │
│    - Agent 按顺序处理                │
│    - 每个 Agent 负责一个环节         │
│    - 适合流程化任务                  │
├─────────────────────────────────────┤
│ 4. 委员会模式(Committee)           │
│    - 多个 Agent 独立决策             │
│    - 投票或整合结果                  │
│    - 适合高准确度要求                │
└─────────────────────────────────────┘

1.2 架构设计

graph TB
    A[用户请求] --> B[协调器]
    B --> C{任务类型}
    C -->|分析 | D[分析 Agent]
    C -->|编码 | E[编码 Agent]
    C -->|测试 | F[测试 Agent]
    D --> G[结果整合]
    E --> G
    F --> G
    G --> H[最终响应]

二、Agent 角色定义

2.1 基础 Agent 类

# agent_base.py
from abc import ABC, abstractmethod
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum

class AgentStatus(Enum):
    """Agent 状态"""
    IDLE = "idle"
    BUSY = "busy"
    ERROR = "error"

@dataclass
class AgentMessage:
    """Agent 消息"""
    sender: str
    content: str
    message_type: str  # request, response, notification
    metadata: Dict = None

class BaseAgent(ABC):
    """Agent 基类"""
    
    def __init__(self, name: str, llm):
        """
        初始化
        
        Args:
            name: Agent 名称
            llm: LLM 模型
        """
        self.name = name
        self.llm = llm
        self.status = AgentStatus.IDLE
        self.message_queue: List[AgentMessage] = []
        self.context: Dict = {}
    
    @abstractmethod
    def process(self, input_data: Dict) -> Dict:
        """处理任务"""
        pass
    
    def receive_message(self, message: AgentMessage):
        """接收消息"""
        self.message_queue.append(message)
    
    def send_message(
        self,
        receiver: str,
        content: str,
        message_type: str = "notification",
        metadata: Dict = None
    ) -> AgentMessage:
        """发送消息"""
        message = AgentMessage(
            sender=self.name,
            content=content,
            message_type=message_type,
            metadata=metadata or {}
        )
        return message
    
    def get_available(self) -> bool:
        """检查是否可用"""
        return self.status == AgentStatus.IDLE
    
    def set_status(self, status: AgentStatus):
        """设置状态"""
        self.status = status

2.2 专业 Agent 实现

# specialized_agents.py
from agent_base import BaseAgent, AgentStatus

class AnalystAgent(BaseAgent):
    """分析 Agent"""
    
    def __init__(self, llm):
        super().__init__("analyst", llm)
        self.specialties = [
            "数据分析",
            "问题诊断",
            "方案评估"
        ]
    
    def process(self, input_data: Dict) -> Dict:
        """分析处理"""
        self.set_status(AgentStatus.BUSY)
        
        prompt = f"""
请分析以下问题:

{input_data.get('problem', '')}

请从以下角度分析:
1. 问题本质是什么?
2. 可能的原因有哪些?
3. 建议的解决方案是什么?
"""
        
        analysis = self.llm.generate(prompt)
        
        self.set_status(AgentStatus.IDLE)
        
        return {
            'analysis': analysis,
            'agent': self.name
        }

class CoderAgent(BaseAgent):
    """编码 Agent"""
    
    def __init__(self, llm):
        super().__init__("coder", llm)
        self.languages = ["Python", "JavaScript", "SQL"]
    
    def process(self, input_data: Dict) -> Dict:
        """编码处理"""
        self.set_status(AgentStatus.BUSY)
        
        prompt = f"""
请编写代码实现以下功能:

需求:{input_data.get('requirement', '')}
语言:{input_data.get('language', 'Python')}

要求:
1. 代码简洁清晰
2. 包含必要的注释
3. 考虑边界情况
"""
        
        code = self.llm.generate(prompt)
        
        self.set_status(AgentStatus.IDLE)
        
        return {
            'code': code,
            'language': input_data.get('language', 'Python'),
            'agent': self.name
        }

class ReviewerAgent(BaseAgent):
    """审查 Agent"""
    
    def __init__(self, llm):
        super().__init__("reviewer", llm)
        self.check_categories = [
            "代码质量",
            "安全性",
            "性能",
            "可维护性"
        ]
    
    def process(self, input_data: Dict) -> Dict:
        """审查处理"""
        self.set_status(AgentStatus.BUSY)
        
        prompt = f"""
请审查以下内容:

内容:
{input_data.get('content', '')}

类型:{input_data.get('content_type', 'code')}

请从以下角度审查:
1. 是否存在错误?
2. 是否有改进空间?
3. 具体建议是什么?
"""
        
        review = self.llm.generate(prompt)
        
        self.set_status(AgentStatus.IDLE)
        
        return {
            'review': review,
            'issues_found': self._count_issues(review),
            'agent': self.name
        }
    
    def _count_issues(self, review: str) -> int:
        """统计问题数"""
        # 简单统计
        return review.count("问题") + review.count("建议")

三、通信协议

3.1 消息总线

# message_bus.py
from typing import Dict, List, Callable
from queue import Queue
import threading

class MessageBus:
    """消息总线"""
    
    def __init__(self):
        self.subscribers: Dict[str, List[Callable]] = {}
        self.message_queues: Dict[str, Queue] = {}
        self.lock = threading.Lock()
    
    def subscribe(
        self,
        agent_name: str,
        callback: Callable
    ):
        """订阅消息"""
        with self.lock:
            if agent_name not in self.subscribers:
                self.subscribers[agent_name] = []
                self.message_queues[agent_name] = Queue()
            
            self.subscribers[agent_name].append(callback)
    
    def publish(
        self,
        topic: str,
        message: Dict
    ):
        """发布消息"""
        with self.lock:
            if topic in self.subscribers:
                for callback in self.subscribers[topic]:
                    callback(message)
    
    def send_to_agent(
        self,
        agent_name: str,
        message: Dict
    ):
        """发送消息给 Agent"""
        if agent_name in self.message_queues:
            self.message_queues[agent_name].put(message)
    
    def get_message(
        self,
        agent_name: str,
        timeout: float = 1.0
    ) -> Optional[Dict]:
        """获取消息"""
        if agent_name in self.message_queues:
            try:
                return self.message_queues[agent_name].get(timeout=timeout)
            except:
                return None
        return None

3.2 通信协议定义

# communication_protocol.py
from typing import Dict, List, Optional
from dataclasses import dataclass, asdict
from enum import Enum
import json

class MessageType(Enum):
    """消息类型"""
    TASK_ASSIGN = "task_assign"
    TASK_COMPLETE = "task_complete"
    HELP_REQUEST = "help_request"
    INFO_SHARE = "info_share"
    DECISION_REQUEST = "decision_request"
    DECISION_RESPONSE = "decision_response"

@dataclass
class ProtocolMessage:
    """协议消息"""
    message_id: str
    message_type: MessageType
    sender: str
    receiver: str
    content: Dict
    timestamp: float
    in_reply_to: Optional[str] = None

class CommunicationProtocol:
    """通信协议"""
    
    def __init__(self):
        self.message_history: List[ProtocolMessage] = []
        self.pending_requests: Dict[str, ProtocolMessage] = {}
    
    def create_message(
        self,
        message_type: MessageType,
        sender: str,
        receiver: str,
        content: Dict,
        in_reply_to: Optional[str] = None
    ) -> ProtocolMessage:
        """创建消息"""
        import uuid
        import time
        
        message = ProtocolMessage(
            message_id=str(uuid.uuid4()),
            message_type=message_type,
            sender=sender,
            receiver=receiver,
            content=content,
            timestamp=time.time(),
            in_reply_to=in_reply_to
        )
        
        self.message_history.append(message)
        
        if in_reply_to and in_reply_to in self.pending_requests:
            del self.pending_requests[in_reply_to]
        
        return message
    
    def serialize(self, message: ProtocolMessage) -> str:
        """序列化消息"""
        return json.dumps(asdict(message))
    
    def deserialize(self, json_str: str) -> ProtocolMessage:
        """反序列化消息"""
        data = json.loads(json_str)
        data['message_type'] = MessageType(data['message_type'])
        return ProtocolMessage(**data)
    
    def get_conversation_thread(
        self,
        message_id: str
    ) -> List[ProtocolMessage]:
        """获取对话线索"""
        thread = []
        current_id = message_id
        
        while current_id:
            for msg in self.message_history:
                if msg.message_id == current_id:
                    thread.insert(0, msg)
                    current_id = msg.in_reply_to
                    break
            else:
                break
        
        return thread

四、任务分配

4.1 任务分解器

# task_decomposer.py
from typing import List, Dict

class TaskDecomposer:
    """任务分解器"""
    
    def __init__(self, llm):
        self.llm = llm
    
    def decompose(
        self,
        task: str,
        available_agents: List[str]
    ) -> List[Dict]:
        """
        分解任务
        
        Args:
            task: 原始任务
            available_agents: 可用 Agent 列表
        
        Returns:
            子任务列表
        """
        prompt = f"""
请将以下任务分解为可并行执行的子任务:

任务:{task}

可用 Agent 角色:{', '.join(available_agents)}

请按以下 JSON 格式输出:
{{
    "subtasks": [
        {{
            "id": "task_1",
            "description": "任务描述",
            "assigned_to": "agent_name",
            "dependencies": [],
            "estimated_time": "minutes"
        }}
    ]
}}
"""
        
        response = self.llm.generate(prompt)
        result = self._parse_json(response)
        
        return result.get('subtasks', [])
    
    def _parse_json(self, text: str) -> Dict:
        """解析 JSON"""
        import json
        import re
        
        match = re.search(r'\{.*\}', text, re.DOTALL)
        if match:
            return json.loads(match.group())
        return {}

4.2 任务调度器

# task_scheduler.py
from typing import List, Dict, Optional
from queue import PriorityQueue
import threading

class TaskScheduler:
    """任务调度器"""
    
    def __init__(self):
        self.task_queue = PriorityQueue()
        self.agent_status: Dict[str, str] = {}
        self.task_results: Dict[str, Dict] = {}
        self.lock = threading.Lock()
    
    def add_task(
        self,
        task_id: str,
        task_data: Dict,
        priority: int = 0
    ):
        """添加任务"""
        self.task_queue.put((
            -priority,  # 优先级高的先执行
            task_id,
            task_data
        ))
    
    def assign_task(
        self,
        task_id: str,
        agent_name: str
    ):
        """分配任务给 Agent"""
        with self.lock:
            self.agent_status[agent_name] = 'busy'
    
    def complete_task(
        self,
        task_id: str,
        agent_name: str,
        result: Dict
    ):
        """完成任务"""
        with self.lock:
            self.agent_status[agent_name] = 'idle'
            self.task_results[task_id] = result
    
    def get_next_task(
        self,
        agent_name: str
    ) -> Optional[tuple]:
        """获取下一个任务"""
        if self.agent_status.get(agent_name) == 'busy':
            return None
        
        try:
            priority, task_id, task_data = self.task_queue.get_nowait()
            return (task_id, task_data)
        except:
            return None
    
    def get_all_results(self) -> Dict:
        """获取所有结果"""
        return self.task_results.copy()

五、协作流程

5.1 协调器实现

# coordinator.py
from typing import List, Dict
from agent_base import BaseAgent
from task_scheduler import TaskScheduler
from message_bus import MessageBus

class Coordinator:
    """协调器"""
    
    def __init__(self, agents: List[BaseAgent]):
        """
        初始化
        
        Args:
            agents: Agent 列表
        """
        self.agents = {agent.name: agent for agent in agents}
        self.scheduler = TaskScheduler()
        self.message_bus = MessageBus()
        self.decomposer = TaskDecomposer(self.agents[list(self.agents.keys())[0]].llm)
        
        self._setup_message_handlers()
    
    def _setup_message_handlers(self):
        """设置消息处理器"""
        for agent_name in self.agents:
            self.message_bus.subscribe(
                agent_name,
                lambda msg, name=agent_name: self._handle_message(name, msg)
            )
    
    def _handle_message(self, agent_name: str, message: Dict):
        """处理消息"""
        agent = self.agents.get(agent_name)
        if agent:
            agent.receive_message(message)
    
    def execute_task(self, task: str) -> Dict:
        """
        执行任务
        
        Args:
            task: 任务描述
        
        Returns:
            执行结果
        """
        # 1. 分解任务
        subtasks = self.decomposer.decompose(
            task,
            list(self.agents.keys())
        )
        
        # 2. 添加任务到调度器
        for i, subtask in enumerate(subtasks):
            self.scheduler.add_task(
                subtask['id'],
                subtask,
                priority=i
            )
        
        # 3. 分配和执行任务
        results = []
        for agent_name, agent in self.agents.items():
            while True:
                task_info = self.scheduler.get_next_task(agent_name)
                if not task_info:
                    break
                
                task_id, task_data = task_info
                self.scheduler.assign_task(task_id, agent_name)
                
                # 执行任务
                result = agent.process(task_data)
                self.scheduler.complete_task(task_id, agent_name, result)
                results.append(result)
        
        # 4. 整合结果
        final_result = self._integrate_results(results)
        
        return final_result
    
    def _integrate_results(self, results: List[Dict]) -> Dict:
        """整合结果"""
        # 使用 LLM 整合
        llm = self.agents[list(self.agents.keys())[0]].llm
        
        results_text = '\n\n'.join([
            f"Agent: {r.get('agent', 'unknown')}\nResult: {r}"
            for r in results
        ])
        
        prompt = f"""
请整合以下 Agent 的执行结果:

{results_text}

请综合所有结果,给出完整的最终答案。
"""
        
        integrated = llm.generate(prompt)
        
        return {
            'integrated_result': integrated,
            'individual_results': results
        }

5.2 流水线模式

# pipeline_mode.py
from typing import List, Dict

class PipelineAgent:
    """流水线 Agent"""
    
    def __init__(self, agents: List[BaseAgent]):
        """
        初始化
        
        Args:
            agents: Agent 列表(按顺序)
        """
        self.agents = agents
    
    def process(self, input_data: Dict) -> Dict:
        """流水线处理"""
        current_data = input_data
        
        for i, agent in enumerate(self.agents):
            # 执行当前环节
            result = agent.process(current_data)
            
            # 传递到下一环节
            current_data = {
                **current_data,
                **result,
                'stage': i + 1,
                'stage_result': result
            }
        
        return current_data

# 使用示例:代码开发流水线
def create_code_pipeline(llm):
    """创建代码开发流水线"""
    agents = [
        AnalystAgent(llm),    # 需求分析
        CoderAgent(llm),      # 代码编写
        ReviewerAgent(llm)    # 代码审查
    ]
    
    return PipelineAgent(agents)

# 执行
pipeline = create_code_pipeline(llm)
result = pipeline.process({
    'requirement': '实现一个快速排序算法',
    'language': 'Python'
})

六、实战案例

6.1 软件开发协作

# software_dev_collaboration.py
class SoftwareDevelopmentTeam:
    """软件开发团队"""
    
    def __init__(self, llm):
        self.agents = {
            'analyst': AnalystAgent(llm),
            'architect': ArchitectAgent(llm),
            'coder': CoderAgent(llm),
            'reviewer': ReviewerAgent(llm),
            'tester': TesterAgent(llm)
        }
        
        self.coordinator = Coordinator(list(self.agents.values()))
    
    def develop_feature(self, requirement: str) -> Dict:
        """开发功能"""
        # 1. 需求分析
        analysis = self.agents['analyst'].process({
            'problem': requirement
        })
        
        # 2. 架构设计
        design = self.agents['architect'].process({
            'requirement': requirement,
            'analysis': analysis['analysis']
        })
        
        # 3. 代码实现
        code = self.agents['coder'].process({
            'requirement': requirement,
            'design': design['design']
        })
        
        # 4. 代码审查
        review = self.agents['reviewer'].process({
            'content': code['code'],
            'content_type': 'code'
        })
        
        # 5. 测试
        test = self.agents['tester'].process({
            'code': code['code'],
            'requirement': requirement
        })
        
        return {
            'analysis': analysis,
            'design': design,
            'code': code,
            'review': review,
            'test': test
        }

6.2 问题诊断协作

# troubleshooting_collaboration.py
class TroubleshootingTeam:
    """问题诊断团队"""
    
    def __init__(self, llm):
        self.agents = {
            'diagnoser': DiagnoserAgent(llm),
            'researcher': ResearcherAgent(llm),
            'solver': SolverAgent(llm)
        }
    
    def diagnose_and_solve(self, problem: str) -> Dict:
        """诊断并解决问题"""
        # 1. 问题诊断
        diagnosis = self.agents['diagnoser'].process({
            'problem': problem
        })
        
        # 2. 方案研究
        research = self.agents['researcher'].process({
            'problem': problem,
            'diagnosis': diagnosis['diagnosis']
        })
        
        # 3. 方案实施
        solution = self.agents['solver'].process({
            'problem': problem,
            'diagnosis': diagnosis['diagnosis'],
            'research': research['research']
        })
        
        return {
            'diagnosis': diagnosis,
            'research': research,
            'solution': solution
        }

七、总结

7.1 核心要点

  1. 角色分工

    • 明确每个 Agent 的职责
    • 专业化提升效率
    • 避免职责重叠
  2. 通信协议

    • 标准化消息格式
    • 异步通信机制
    • 消息追踪和回溯
  3. 任务分配

    • 智能任务分解
    • 动态任务调度
    • 依赖关系管理

7.2 最佳实践

  1. 合理设计 Agent 角色

    • 根据任务需求定义角色
    • 保持角色独立性
    • 预留扩展空间
  2. 建立有效通信机制

    • 使用消息总线
    • 定义清晰协议
    • 处理通信异常
  3. 优化协作流程

    • 减少等待时间
    • 并行处理任务
    • 结果有效整合

参考资料


分享这篇文章到:

上一篇文章
RocketMQ 顺序消息详解与实战
下一篇文章
Kafka 消费者组与重平衡机制详解