Skip to content
清晨的一缕阳光
返回

Agent 规划与任务分解实战

Agent 规划与任务分解实战

复杂任务的完成需要有效的规划和分解。如何让 Agent 制定合理的执行计划?如何管理任务依赖关系?本文深入解析 Agent 规划与任务分解的实战方案。

一、规划能力基础

1.1 规划层次

Agent 规划层次:

┌─────────────────────────────────────┐
│ 战略层(Strategic)                  │
│ - 长期目标设定                       │
│ - 资源分配策略                       │
│ - 风险预估与管理                     │
├─────────────────────────────────────┤
│ 战术层(Tactical)                   │
│ - 中期计划制定                       │
│ - 任务分解与分配                     │
│ - 进度跟踪与调整                     │
├─────────────────────────────────────┤
│ 执行层(Execution)                  │
│ - 具体任务执行                       │
│ - 实时问题处理                       │
│ - 结果反馈与修正                     │
└─────────────────────────────────────┘

1.2 规划模式

模式特点适用场景
前向规划从目标倒推步骤目标明确的任务
后向规划从现状推导路径探索性任务
分层规划多级任务分解复杂长期任务
增量规划逐步完善计划不确定性高的任务

二、任务分解机制

2.1 分解策略

# task_decomposition.py
from typing import List, Dict, Optional
from dataclasses import dataclass, field
from enum import Enum

class TaskType(Enum):
    """任务类型"""
    SEQUENTIAL = "sequential"  # 顺序执行
    PARALLEL = "parallel"      # 并行执行
    CONDITIONAL = "conditional"  # 条件执行

@dataclass
class SubTask:
    """子任务"""
    id: str
    description: str
    task_type: TaskType = TaskType.SEQUENTIAL
    dependencies: List[str] = field(default_factory=list)
    estimated_time: int = 0  # 分钟
    priority: int = 0  # 0-10
    status: str = "pending"
    result: Optional[Dict] = None

class TaskDecomposer:
    """任务分解器"""
    
    def __init__(self, llm):
        self.llm = llm
    
    def decompose(
        self,
        main_task: str,
        max_depth: int = 3,
        max_subtasks: int = 10
    ) -> List[SubTask]:
        """
        分解任务
        
        Args:
            main_task: 主任务
            max_depth: 最大分解深度
            max_subtasks: 每层最大子任务数
        
        Returns:
            子任务列表
        """
        # 使用 LLM 分解
        prompt = f"""
请将以下任务分解为可执行的子任务:

主任务:{main_task}

要求:
1. 每个子任务应该是独立可执行的
2. 标明任务之间的依赖关系
3. 估计每个任务所需时间
4. 按优先级排序

请按以下 JSON 格式输出:
{{
    "subtasks": [
        {{
            "id": "task_1",
            "description": "任务描述",
            "type": "sequential/parallel/conditional",
            "dependencies": ["task_id1", "task_id2"],
            "estimated_time": 30,
            "priority": 8
        }}
    ]
}}
"""
        
        response = self.llm.generate(prompt)
        result = self._parse_response(response)
        
        # 创建 SubTask 对象
        subtasks = []
        for task_data in result.get('subtasks', [])[:max_subtasks]:
            subtask = SubTask(
                id=task_data['id'],
                description=task_data['description'],
                task_type=TaskType(task_data.get('type', 'sequential')),
                dependencies=task_data.get('dependencies', []),
                estimated_time=task_data.get('estimated_time', 30),
                priority=task_data.get('priority', 5)
            )
            subtasks.append(subtask)
        
        # 递归分解(如果深度允许)
        if max_depth > 1:
            subtasks = self._recursive_decompose(
                subtasks,
                max_depth - 1,
                max_subtasks
            )
        
        return subtasks
    
    def _recursive_decompose(
        self,
        subtasks: List[SubTask],
        max_depth: int,
        max_subtasks: int
    ) -> List[SubTask]:
        """递归分解"""
        all_subtasks = []
        
        for subtask in subtasks:
            # 复杂任务继续分解
            if self._is_complex_task(subtask.description):
                nested = self.decompose(
                    subtask.description,
                    max_depth=max_depth,
                    max_subtasks=max_subtasks
                )
                
                # 更新依赖关系
                for nested_task in nested:
                    nested_task.dependencies = [
                        subtask.id if dep == subtask.id else dep
                        for dep in nested_task.dependencies
                    ]
                
                all_subtasks.extend(nested)
            else:
                all_subtasks.append(subtask)
        
        return all_subtasks
    
    def _is_complex_task(self, description: str) -> bool:
        """判断任务是否复杂"""
        complex_indicators = [
            '分析', '研究', '设计', '开发',
            '实现', '优化', '评估', '比较'
        ]
        return any(indicator in description for indicator in complex_indicators)
    
    def _parse_response(self, response: str) -> Dict:
        """解析响应"""
        import json
        import re
        
        match = re.search(r'\{.*\}', response, re.DOTALL)
        if match:
            return json.loads(match.group())
        return {}

2.2 依赖管理

# dependency_manager.py
from typing import List, Dict, Set
from collections import defaultdict

class DependencyManager:
    """依赖管理器"""
    
    def __init__(self, tasks: List['SubTask']):
        self.tasks = {task.id: task for task in tasks}
        self.dependency_graph = self._build_graph()
        self.reverse_graph = self._build_reverse_graph()
    
    def _build_graph(self) -> Dict[str, List[str]]:
        """构建依赖图"""
        graph = defaultdict(list)
        for task_id, task in self.tasks.items():
            for dep in task.dependencies:
                graph[dep].append(task_id)
        return dict(graph)
    
    def _build_reverse_graph(self) -> Dict[str, List[str]]:
        """构建反向依赖图"""
        graph = defaultdict(list)
        for task_id, task in self.tasks.items():
            for dep in task.dependencies:
                graph[task_id].append(dep)
        return dict(graph)
    
    def get_execution_order(self) -> List[str]:
        """获取执行顺序(拓扑排序)"""
        in_degree = defaultdict(int)
        
        # 计算入度
        for task_id in self.tasks:
            for dep in self.tasks[task_id].dependencies:
                in_degree[task_id] += 1
        
        # 拓扑排序
        queue = [
            task_id for task_id in self.tasks
            if in_degree[task_id] == 0
        ]
        order = []
        
        while queue:
            # 按优先级排序
            queue.sort(
                key=lambda x: self.tasks[x].priority,
                reverse=True
            )
            
            current = queue.pop(0)
            order.append(current)
            
            # 更新后续任务入度
            for next_task in self.dependency_graph.get(current, []):
                in_degree[next_task] -= 1
                if in_degree[next_task] == 0:
                    queue.append(next_task)
        
        if len(order) != len(self.tasks):
            raise ValueError("存在循环依赖")
        
        return order
    
    def get_ready_tasks(self, completed_tasks: Set[str]) -> List[str]:
        """获取可执行的任务"""
        ready = []
        
        for task_id, task in self.tasks.items():
            if task_id in completed_tasks:
                continue
            
            # 检查所有依赖是否完成
            if all(dep in completed_tasks for dep in task.dependencies):
                ready.append(task_id)
        
        return ready
    
    def detect_cycles(self) -> List[List[str]]:
        """检测循环依赖"""
        cycles = []
        visited = set()
        rec_stack = set()
        
        def dfs(node: str, path: List[str]):
            visited.add(node)
            rec_stack.add(node)
            path.append(node)
            
            for neighbor in self.dependency_graph.get(node, []):
                if neighbor not in visited:
                    cycle = dfs(neighbor, path)
                    if cycle:
                        cycles.append(cycle)
                elif neighbor in rec_stack:
                    # 找到循环
                    cycle_start = path.index(neighbor)
                    cycles.append(path[cycle_start:] + [neighbor])
            
            path.pop()
            rec_stack.remove(node)
            return None
        
        for task_id in self.tasks:
            if task_id not in visited:
                dfs(task_id, [])
        
        return cycles
    
    def get_critical_path(self) -> List[str]:
        """获取关键路径"""
        # 使用动态规划计算最长路径
        order = self.get_execution_order()
        dist = {task_id: 0 for task_id in self.tasks}
        predecessor = {task_id: None for task_id in self.tasks}
        
        for task_id in order:
            task = self.tasks[task_id]
            for next_id in self.dependency_graph.get(task_id, []):
                new_dist = dist[task_id] + task.estimated_time
                if new_dist > dist[next_id]:
                    dist[next_id] = new_dist
                    predecessor[next_id] = task_id
        
        # 找到终点
        end_task = max(dist.keys(), key=lambda x: dist[x])
        
        # 回溯路径
        path = []
        current = end_task
        while current:
            path.append(current)
            current = predecessor[current]
        
        return list(reversed(path))

三、规划执行监控

3.1 执行跟踪器

# execution_tracker.py
from typing import Dict, List, Optional
from datetime import datetime
from dataclasses import dataclass

@dataclass
class TaskStatus:
    """任务状态"""
    task_id: str
    status: str  # pending, running, completed, failed
    start_time: Optional[datetime] = None
    end_time: Optional[datetime] = None
    progress: float = 0.0  # 0-1
    error_message: Optional[str] = None

class ExecutionTracker:
    """执行跟踪器"""
    
    def __init__(self):
        self.task_statuses: Dict[str, TaskStatus] = {}
        self.execution_log: List[Dict] = []
    
    def start_task(self, task_id: str):
        """开始任务"""
        self.task_statuses[task_id] = TaskStatus(
            task_id=task_id,
            status='running',
            start_time=datetime.now()
        )
        
        self._log_event('task_started', {
            'task_id': task_id,
            'timestamp': datetime.now().isoformat()
        })
    
    def update_progress(
        self,
        task_id: str,
        progress: float,
        metadata: Dict = None
    ):
        """更新进度"""
        if task_id in self.task_statuses:
            self.task_statuses[task_id].progress = progress
            
            self._log_event('progress_update', {
                'task_id': task_id,
                'progress': progress,
                'metadata': metadata or {},
                'timestamp': datetime.now().isoformat()
            })
    
    def complete_task(
        self,
        task_id: str,
        result: Dict = None
    ):
        """完成任务"""
        if task_id in self.task_statuses:
            status = self.task_statuses[task_id]
            status.status = 'completed'
            status.end_time = datetime.now()
            status.progress = 1.0
            
            self._log_event('task_completed', {
                'task_id': task_id,
                'result': result or {},
                'duration': (status.end_time - status.start_time).total_seconds(),
                'timestamp': datetime.now().isoformat()
            })
    
    def fail_task(
        self,
        task_id: str,
        error_message: str
    ):
        """标记任务失败"""
        if task_id in self.task_statuses:
            status = self.task_statuses[task_id]
            status.status = 'failed'
            status.end_time = datetime.now()
            status.error_message = error_message
            
            self._log_event('task_failed', {
                'task_id': task_id,
                'error': error_message,
                'timestamp': datetime.now().isoformat()
            })
    
    def _log_event(self, event_type: str, data: Dict):
        """记录事件"""
        self.execution_log.append({
            'event_type': event_type,
            'data': data
        })
    
    def get_overall_progress(self) -> float:
        """获取整体进度"""
        if not self.task_statuses:
            return 0.0
        
        total_progress = sum(
            s.progress for s in self.task_statuses.values()
        )
        return total_progress / len(self.task_statuses)
    
    def get_summary(self) -> Dict:
        """获取执行摘要"""
        status_counts = {}
        for status in self.task_statuses.values():
            status_counts[status.status] = status_counts.get(status.status, 0) + 1
        
        return {
            'total_tasks': len(self.task_statuses),
            'status_counts': status_counts,
            'overall_progress': self.get_overall_progress(),
            'start_time': min(
                (s.start_time for s in self.task_statuses.values() if s.start_time),
                default=None
            ),
            'estimated_end_time': self._estimate_end_time()
        }
    
    def _estimate_end_time(self) -> Optional[datetime]:
        """估计结束时间"""
        running_tasks = [
            s for s in self.task_statuses.values()
            if s.status == 'running'
        ]
        
        if not running_tasks:
            return None
        
        # 简单估计:当前时间 + 平均剩余时间
        avg_progress = sum(s.progress for s in running_tasks) / len(running_tasks)
        if avg_progress > 0:
            elapsed = (datetime.now() - running_tasks[0].start_time).total_seconds()
            estimated_total = elapsed / avg_progress
            remaining = estimated_total - elapsed
            return datetime.now() + timedelta(seconds=remaining)
        
        return None

3.2 动态调整

# dynamic_adjustment.py
from typing import List, Dict

class DynamicAdjuster:
    """动态调整器"""
    
    def __init__(self, decomposer, tracker):
        self.decomposer = decomposer
        self.tracker = tracker
    
    def adjust_plan(
        self,
        original_tasks: List['SubTask'],
        completed_tasks: List[str],
        failed_tasks: List[Dict]
    ) -> List['SubTask']:
        """
        调整计划
        
        Args:
            original_tasks: 原始任务
            completed_tasks: 已完成任务
            failed_tasks: 失败任务信息
        
        Returns:
            调整后的任务列表
        """
        # 分析失败原因
        failure_patterns = self._analyze_failures(failed_tasks)
        
        # 重新分解未完成的任务
        remaining_tasks = [
            t for t in original_tasks
            if t.id not in completed_tasks
        ]
        
        adjusted_tasks = []
        
        for task in remaining_tasks:
            # 检查是否需要重新分解
            if self._needs_redecomposition(task, failure_patterns):
                # 重新分解
                new_subtasks = self.decomposer.decompose(
                    task.description,
                    max_depth=2
                )
                adjusted_tasks.extend(new_subtasks)
            else:
                adjusted_tasks.append(task)
        
        # 更新依赖关系
        self._update_dependencies(adjusted_tasks, completed_tasks)
        
        return adjusted_tasks
    
    def _analyze_failures(
        self,
        failed_tasks: List[Dict]
    ) -> Dict:
        """分析失败模式"""
        patterns = {
            'complexity_issues': 0,
            'dependency_issues': 0,
            'resource_issues': 0,
            'other_issues': 0
        }
        
        for failure in failed_tasks:
            error = failure.get('error', '').lower()
            
            if '复杂' in error or '困难' in error:
                patterns['complexity_issues'] += 1
            elif '依赖' in error:
                patterns['dependency_issues'] += 1
            elif '资源' in error or '时间' in error:
                patterns['resource_issues'] += 1
            else:
                patterns['other_issues'] += 1
        
        return patterns
    
    def _needs_redecomposition(
        self,
        task: 'SubTask',
        failure_patterns: Dict
    ) -> bool:
        """判断是否需要重新分解"""
        # 如果失败主要是复杂度问题,重新分解
        if failure_patterns['complexity_issues'] > 0:
            return True
        
        # 如果任务估计时间过长,重新分解
        if task.estimated_time > 120:  # 2 小时
            return True
        
        return False
    
    def _update_dependencies(
        self,
        tasks: List['SubTask'],
        completed_tasks: List[str]
    ):
        """更新依赖关系"""
        completed_set = set(completed_tasks)
        
        for task in tasks:
            # 移除已完成的依赖
            task.dependencies = [
                dep for dep in task.dependencies
                if dep not in completed_set
            ]

四、LLM 规划增强

4.1 Chain of Thought 规划

# cot_planning.py
from typing import List, Dict

class CoTPlanner:
    """思维链规划器"""
    
    def __init__(self, llm):
        self.llm = llm
    
    def plan_with_cot(self, task: str) -> Dict:
        """
        使用思维链规划
        
        Args:
            task: 任务描述
        
        Returns:
            规划结果
        """
        prompt = f"""
请逐步思考以下任务的执行计划:

任务:{task}

请按以下步骤思考:

1. **理解任务**
   - 任务的核心目标是什么?
   - 有哪些关键约束条件?
   - 成功的标准是什么?

2. **分析现状**
   - 当前有什么资源可用?
   - 已有哪些信息?
   - 还缺少什么?

3. **制定策略**
   - 完成任务需要哪些主要步骤?
   - 步骤之间的依赖关系如何?
   - 哪些步骤可以并行?

4. **细化计划**
   - 每个步骤具体做什么?
   - 每个步骤预计需要多长时间?
   - 可能的风险有哪些?

5. **制定备选方案**
   - 如果某步骤失败,如何应对?
   - 有无替代方案?

请详细输出以上思考过程,最后给出可执行的计划。
"""
        
        response = self.llm.generate(prompt)
        
        return self._parse_cot_response(response)
    
    def _parse_cot_response(self, response: str) -> Dict:
        """解析思维链响应"""
        sections = {
            'understanding': '',
            'analysis': '',
            'strategy': '',
            'plan': '',
            'contingency': ''
        }
        
        current_section = None
        
        for line in response.split('\n'):
            if '理解任务' in line:
                current_section = 'understanding'
            elif '分析现状' in line:
                current_section = 'analysis'
            elif '制定策略' in line:
                current_section = 'strategy'
            elif '细化计划' in line:
                current_section = 'plan'
            elif '备选方案' in line:
                current_section = 'contingency'
            elif current_section:
                sections[current_section] += line + '\n'
        
        return sections

4.2 Tree of Thoughts 规划

# tot_planning.py
from typing import List, Dict

class ToTPlanner:
    """思维树规划器"""
    
    def __init__(self, llm, branching_factor: int = 3):
        self.llm = llm
        self.branching_factor = branching_factor
    
    def plan_with_tot(self, task: str, depth: int = 3) -> Dict:
        """
        使用思维树规划
        
        Args:
            task: 任务描述
            depth: 树的深度
        """
        # 生成初始思路
        initial_thoughts = self._generate_thoughts(task, 1)
        
        # 构建树
        tree = self._build_tree(initial_thoughts, depth)
        
        # 评估和选择最佳路径
        best_path = self._evaluate_and_select(tree)
        
        return {
            'task': task,
            'tree': tree,
            'best_path': best_path
        }
    
    def _generate_thoughts(
        self,
        context: str,
        num_thoughts: int
    ) -> List[Dict]:
        """生成思路"""
        prompt = f"""
基于以下上下文,生成{num_thoughts}个不同的执行思路:

上下文:{context}

每个思路应该:
1. 有明确的方法论
2. 考虑不同的角度
3. 有可行性评估

按 JSON 格式输出:
[
    {{"thought": "思路描述", "feasibility": 0.0-1.0}}
]
"""
        
        response = self.llm.generate(prompt)
        return self._parse_json(response)
    
    def _build_tree(
        self,
        initial_thoughts: List[Dict],
        depth: int
    ) -> Dict:
        """构建树"""
        tree = {
            'thoughts': initial_thoughts,
            'children': []
        }
        
        for thought in initial_thoughts:
            if depth > 1:
                child_thoughts = self._generate_thoughts(
                    thought['thought'],
                    self.branching_factor
                )
                child_tree = self._build_tree(child_thoughts, depth - 1)
                tree['children'].append(child_tree)
            else:
                tree['children'].append(None)
        
        return tree
    
    def _evaluate_and_select(self, tree: Dict) -> List[Dict]:
        """评估并选择最佳路径"""
        # 简化实现:选择可行性最高的路径
        best_path = []
        current = tree
        
        while current and current.get('thoughts'):
            best_thought = max(
                current['thoughts'],
                key=lambda x: x.get('feasibility', 0)
            )
            best_path.append(best_thought)
            
            # 移动到子节点
            if current.get('children'):
                idx = current['thoughts'].index(best_thought)
                current = current['children'][idx]
            else:
                break
        
        return best_path
    
    def _parse_json(self, text: str) -> List[Dict]:
        """解析 JSON"""
        import json
        import re
        
        match = re.search(r'\[.*\]', text, re.DOTALL)
        if match:
            return json.loads(match.group())
        return []

五、实战案例

5.1 复杂项目规划

# project_planning.py
class ProjectPlanningAgent:
    """项目规划 Agent"""
    
    def __init__(self, llm):
        self.llm = llm
        self.decomposer = TaskDecomposer(llm)
        self.dependency_mgr = None
        self.tracker = ExecutionTracker()
    
    def plan_project(
        self,
        project_description: str,
        timeline_weeks: int,
        team_size: int
    ) -> Dict:
        """
        规划项目
        
        Args:
            project_description: 项目描述
            timeline_weeks: 时间线(周)
            team_size: 团队规模
        """
        # 1. 任务分解
        tasks = self.decomposer.decompose(
            project_description,
            max_depth=3,
            max_subtasks=20
        )
        
        # 2. 依赖管理
        self.dependency_mgr = DependencyManager(tasks)
        
        # 3. 执行顺序
        execution_order = self.dependency_mgr.get_execution_order()
        
        # 4. 关键路径
        critical_path = self.dependency_mgr.get_critical_path()
        
        # 5. 时间分配
        total_time = sum(t.estimated_time for t in tasks)
        available_time = timeline_weeks * 40 * team_size  # 小时
        
        if total_time > available_time:
            # 需要调整
            adjustment_ratio = available_time / total_time
            for task in tasks:
                task.estimated_time = int(task.estimated_time * adjustment_ratio)
        
        # 6. 里程碑设置
        milestones = self._set_milestones(tasks, timeline_weeks)
        
        return {
            'tasks': tasks,
            'execution_order': execution_order,
            'critical_path': critical_path,
            'milestones': milestones,
            'total_estimated_hours': total_time,
            'available_hours': available_time
        }
    
    def _set_milestones(
        self,
        tasks: List[SubTask],
        timeline_weeks: int
    ) -> List[Dict]:
        """设置里程碑"""
        milestones = []
        
        # 按周设置里程碑
        weeks = timeline_weeks
        tasks_per_week = len(tasks) // weeks
        
        for week in range(1, weeks + 1):
            start_idx = (week - 1) * tasks_per_week
            end_idx = min(week * tasks_per_week, len(tasks))
            
            milestones.append({
                'week': week,
                'tasks': tasks[start_idx:end_idx],
                'deliverable': f'第{week}周交付物'
            })
        
        return milestones

5.2 研究任务规划

# research_planning.py
class ResearchPlanningAgent:
    """研究任务规划 Agent"""
    
    def __init__(self, llm):
        self.llm = llm
        self.decomposer = TaskDecomposer(llm)
    
    def plan_research(self, research_topic: str) -> Dict:
        """
        规划研究任务
        
        Args:
            research_topic: 研究主题
        """
        # 1. 分解研究任务
        tasks = self.decomposer.decompose(
            f"研究课题:{research_topic}",
            max_depth=2
        )
        
        # 2. 分类任务
        categorized = {
            'literature_review': [],
            'data_collection': [],
            'analysis': [],
            'writing': []
        }
        
        for task in tasks:
            category = self._categorize_task(task.description)
            categorized[category].append(task)
        
        # 3. 设置研究阶段
        phases = [
            {
                'name': '文献调研',
                'tasks': categorized['literature_review'],
                'duration_weeks': 2
            },
            {
                'name': '数据收集',
                'tasks': categorized['data_collection'],
                'duration_weeks': 3
            },
            {
                'name': '分析研究',
                'tasks': categorized['analysis'],
                'duration_weeks': 3
            },
            {
                'name': '论文撰写',
                'tasks': categorized['writing'],
                'duration_weeks': 2
            }
        ]
        
        return {
            'topic': research_topic,
            'tasks': tasks,
            'categorized_tasks': categorized,
            'phases': phases,
            'total_duration_weeks': 10
        }
    
    def _categorize_task(self, description: str) -> str:
        """分类任务"""
        if any(kw in description for kw in ['文献', '调研', '综述']):
            return 'literature_review'
        elif any(kw in description for kw in ['数据', '收集', '调查']):
            return 'data_collection'
        elif any(kw in description for kw in ['分析', '研究', '实验']):
            return 'analysis'
        else:
            return 'writing'

六、总结

6.1 核心要点

  1. 任务分解

    • 分层分解复杂任务
    • 明确依赖关系
    • 合理估计时间
  2. 依赖管理

    • 构建依赖图
    • 拓扑排序
    • 识别关键路径
  3. 执行监控

    • 实时跟踪进度
    • 动态调整计划
    • 处理异常情况

6.2 最佳实践

  1. 渐进式规划

    • 先粗后细
    • 边执行边调整
    • 保持灵活性
  2. 风险预判

    • 识别潜在风险
    • 制定备选方案
    • 预留缓冲时间
  3. 持续优化

    • 记录执行数据
    • 分析失败原因
    • 改进规划策略

参考资料


分享这篇文章到:

上一篇文章
Redis 云原生部署方案
下一篇文章
Kafka MirrorMaker 跨集群数据同步实战