Agent 规划与任务分解实战
复杂任务的完成需要有效的规划和分解。如何让 Agent 制定合理的执行计划?如何管理任务依赖关系?本文深入解析 Agent 规划与任务分解的实战方案。
一、规划能力基础
1.1 规划层次
Agent 规划层次:
┌─────────────────────────────────────┐
│ 战略层(Strategic) │
│ - 长期目标设定 │
│ - 资源分配策略 │
│ - 风险预估与管理 │
├─────────────────────────────────────┤
│ 战术层(Tactical) │
│ - 中期计划制定 │
│ - 任务分解与分配 │
│ - 进度跟踪与调整 │
├─────────────────────────────────────┤
│ 执行层(Execution) │
│ - 具体任务执行 │
│ - 实时问题处理 │
│ - 结果反馈与修正 │
└─────────────────────────────────────┘
1.2 规划模式
| 模式 | 特点 | 适用场景 |
|---|---|---|
| 前向规划 | 从目标倒推步骤 | 目标明确的任务 |
| 后向规划 | 从现状推导路径 | 探索性任务 |
| 分层规划 | 多级任务分解 | 复杂长期任务 |
| 增量规划 | 逐步完善计划 | 不确定性高的任务 |
二、任务分解机制
2.1 分解策略
# task_decomposition.py
from typing import List, Dict, Optional
from dataclasses import dataclass, field
from enum import Enum
class TaskType(Enum):
"""任务类型"""
SEQUENTIAL = "sequential" # 顺序执行
PARALLEL = "parallel" # 并行执行
CONDITIONAL = "conditional" # 条件执行
@dataclass
class SubTask:
"""子任务"""
id: str
description: str
task_type: TaskType = TaskType.SEQUENTIAL
dependencies: List[str] = field(default_factory=list)
estimated_time: int = 0 # 分钟
priority: int = 0 # 0-10
status: str = "pending"
result: Optional[Dict] = None
class TaskDecomposer:
"""任务分解器"""
def __init__(self, llm):
self.llm = llm
def decompose(
self,
main_task: str,
max_depth: int = 3,
max_subtasks: int = 10
) -> List[SubTask]:
"""
分解任务
Args:
main_task: 主任务
max_depth: 最大分解深度
max_subtasks: 每层最大子任务数
Returns:
子任务列表
"""
# 使用 LLM 分解
prompt = f"""
请将以下任务分解为可执行的子任务:
主任务:{main_task}
要求:
1. 每个子任务应该是独立可执行的
2. 标明任务之间的依赖关系
3. 估计每个任务所需时间
4. 按优先级排序
请按以下 JSON 格式输出:
{{
"subtasks": [
{{
"id": "task_1",
"description": "任务描述",
"type": "sequential/parallel/conditional",
"dependencies": ["task_id1", "task_id2"],
"estimated_time": 30,
"priority": 8
}}
]
}}
"""
response = self.llm.generate(prompt)
result = self._parse_response(response)
# 创建 SubTask 对象
subtasks = []
for task_data in result.get('subtasks', [])[:max_subtasks]:
subtask = SubTask(
id=task_data['id'],
description=task_data['description'],
task_type=TaskType(task_data.get('type', 'sequential')),
dependencies=task_data.get('dependencies', []),
estimated_time=task_data.get('estimated_time', 30),
priority=task_data.get('priority', 5)
)
subtasks.append(subtask)
# 递归分解(如果深度允许)
if max_depth > 1:
subtasks = self._recursive_decompose(
subtasks,
max_depth - 1,
max_subtasks
)
return subtasks
def _recursive_decompose(
self,
subtasks: List[SubTask],
max_depth: int,
max_subtasks: int
) -> List[SubTask]:
"""递归分解"""
all_subtasks = []
for subtask in subtasks:
# 复杂任务继续分解
if self._is_complex_task(subtask.description):
nested = self.decompose(
subtask.description,
max_depth=max_depth,
max_subtasks=max_subtasks
)
# 更新依赖关系
for nested_task in nested:
nested_task.dependencies = [
subtask.id if dep == subtask.id else dep
for dep in nested_task.dependencies
]
all_subtasks.extend(nested)
else:
all_subtasks.append(subtask)
return all_subtasks
def _is_complex_task(self, description: str) -> bool:
"""判断任务是否复杂"""
complex_indicators = [
'分析', '研究', '设计', '开发',
'实现', '优化', '评估', '比较'
]
return any(indicator in description for indicator in complex_indicators)
def _parse_response(self, response: str) -> Dict:
"""解析响应"""
import json
import re
match = re.search(r'\{.*\}', response, re.DOTALL)
if match:
return json.loads(match.group())
return {}
2.2 依赖管理
# dependency_manager.py
from typing import List, Dict, Set
from collections import defaultdict
class DependencyManager:
"""依赖管理器"""
def __init__(self, tasks: List['SubTask']):
self.tasks = {task.id: task for task in tasks}
self.dependency_graph = self._build_graph()
self.reverse_graph = self._build_reverse_graph()
def _build_graph(self) -> Dict[str, List[str]]:
"""构建依赖图"""
graph = defaultdict(list)
for task_id, task in self.tasks.items():
for dep in task.dependencies:
graph[dep].append(task_id)
return dict(graph)
def _build_reverse_graph(self) -> Dict[str, List[str]]:
"""构建反向依赖图"""
graph = defaultdict(list)
for task_id, task in self.tasks.items():
for dep in task.dependencies:
graph[task_id].append(dep)
return dict(graph)
def get_execution_order(self) -> List[str]:
"""获取执行顺序(拓扑排序)"""
in_degree = defaultdict(int)
# 计算入度
for task_id in self.tasks:
for dep in self.tasks[task_id].dependencies:
in_degree[task_id] += 1
# 拓扑排序
queue = [
task_id for task_id in self.tasks
if in_degree[task_id] == 0
]
order = []
while queue:
# 按优先级排序
queue.sort(
key=lambda x: self.tasks[x].priority,
reverse=True
)
current = queue.pop(0)
order.append(current)
# 更新后续任务入度
for next_task in self.dependency_graph.get(current, []):
in_degree[next_task] -= 1
if in_degree[next_task] == 0:
queue.append(next_task)
if len(order) != len(self.tasks):
raise ValueError("存在循环依赖")
return order
def get_ready_tasks(self, completed_tasks: Set[str]) -> List[str]:
"""获取可执行的任务"""
ready = []
for task_id, task in self.tasks.items():
if task_id in completed_tasks:
continue
# 检查所有依赖是否完成
if all(dep in completed_tasks for dep in task.dependencies):
ready.append(task_id)
return ready
def detect_cycles(self) -> List[List[str]]:
"""检测循环依赖"""
cycles = []
visited = set()
rec_stack = set()
def dfs(node: str, path: List[str]):
visited.add(node)
rec_stack.add(node)
path.append(node)
for neighbor in self.dependency_graph.get(node, []):
if neighbor not in visited:
cycle = dfs(neighbor, path)
if cycle:
cycles.append(cycle)
elif neighbor in rec_stack:
# 找到循环
cycle_start = path.index(neighbor)
cycles.append(path[cycle_start:] + [neighbor])
path.pop()
rec_stack.remove(node)
return None
for task_id in self.tasks:
if task_id not in visited:
dfs(task_id, [])
return cycles
def get_critical_path(self) -> List[str]:
"""获取关键路径"""
# 使用动态规划计算最长路径
order = self.get_execution_order()
dist = {task_id: 0 for task_id in self.tasks}
predecessor = {task_id: None for task_id in self.tasks}
for task_id in order:
task = self.tasks[task_id]
for next_id in self.dependency_graph.get(task_id, []):
new_dist = dist[task_id] + task.estimated_time
if new_dist > dist[next_id]:
dist[next_id] = new_dist
predecessor[next_id] = task_id
# 找到终点
end_task = max(dist.keys(), key=lambda x: dist[x])
# 回溯路径
path = []
current = end_task
while current:
path.append(current)
current = predecessor[current]
return list(reversed(path))
三、规划执行监控
3.1 执行跟踪器
# execution_tracker.py
from typing import Dict, List, Optional
from datetime import datetime
from dataclasses import dataclass
@dataclass
class TaskStatus:
"""任务状态"""
task_id: str
status: str # pending, running, completed, failed
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
progress: float = 0.0 # 0-1
error_message: Optional[str] = None
class ExecutionTracker:
"""执行跟踪器"""
def __init__(self):
self.task_statuses: Dict[str, TaskStatus] = {}
self.execution_log: List[Dict] = []
def start_task(self, task_id: str):
"""开始任务"""
self.task_statuses[task_id] = TaskStatus(
task_id=task_id,
status='running',
start_time=datetime.now()
)
self._log_event('task_started', {
'task_id': task_id,
'timestamp': datetime.now().isoformat()
})
def update_progress(
self,
task_id: str,
progress: float,
metadata: Dict = None
):
"""更新进度"""
if task_id in self.task_statuses:
self.task_statuses[task_id].progress = progress
self._log_event('progress_update', {
'task_id': task_id,
'progress': progress,
'metadata': metadata or {},
'timestamp': datetime.now().isoformat()
})
def complete_task(
self,
task_id: str,
result: Dict = None
):
"""完成任务"""
if task_id in self.task_statuses:
status = self.task_statuses[task_id]
status.status = 'completed'
status.end_time = datetime.now()
status.progress = 1.0
self._log_event('task_completed', {
'task_id': task_id,
'result': result or {},
'duration': (status.end_time - status.start_time).total_seconds(),
'timestamp': datetime.now().isoformat()
})
def fail_task(
self,
task_id: str,
error_message: str
):
"""标记任务失败"""
if task_id in self.task_statuses:
status = self.task_statuses[task_id]
status.status = 'failed'
status.end_time = datetime.now()
status.error_message = error_message
self._log_event('task_failed', {
'task_id': task_id,
'error': error_message,
'timestamp': datetime.now().isoformat()
})
def _log_event(self, event_type: str, data: Dict):
"""记录事件"""
self.execution_log.append({
'event_type': event_type,
'data': data
})
def get_overall_progress(self) -> float:
"""获取整体进度"""
if not self.task_statuses:
return 0.0
total_progress = sum(
s.progress for s in self.task_statuses.values()
)
return total_progress / len(self.task_statuses)
def get_summary(self) -> Dict:
"""获取执行摘要"""
status_counts = {}
for status in self.task_statuses.values():
status_counts[status.status] = status_counts.get(status.status, 0) + 1
return {
'total_tasks': len(self.task_statuses),
'status_counts': status_counts,
'overall_progress': self.get_overall_progress(),
'start_time': min(
(s.start_time for s in self.task_statuses.values() if s.start_time),
default=None
),
'estimated_end_time': self._estimate_end_time()
}
def _estimate_end_time(self) -> Optional[datetime]:
"""估计结束时间"""
running_tasks = [
s for s in self.task_statuses.values()
if s.status == 'running'
]
if not running_tasks:
return None
# 简单估计:当前时间 + 平均剩余时间
avg_progress = sum(s.progress for s in running_tasks) / len(running_tasks)
if avg_progress > 0:
elapsed = (datetime.now() - running_tasks[0].start_time).total_seconds()
estimated_total = elapsed / avg_progress
remaining = estimated_total - elapsed
return datetime.now() + timedelta(seconds=remaining)
return None
3.2 动态调整
# dynamic_adjustment.py
from typing import List, Dict
class DynamicAdjuster:
"""动态调整器"""
def __init__(self, decomposer, tracker):
self.decomposer = decomposer
self.tracker = tracker
def adjust_plan(
self,
original_tasks: List['SubTask'],
completed_tasks: List[str],
failed_tasks: List[Dict]
) -> List['SubTask']:
"""
调整计划
Args:
original_tasks: 原始任务
completed_tasks: 已完成任务
failed_tasks: 失败任务信息
Returns:
调整后的任务列表
"""
# 分析失败原因
failure_patterns = self._analyze_failures(failed_tasks)
# 重新分解未完成的任务
remaining_tasks = [
t for t in original_tasks
if t.id not in completed_tasks
]
adjusted_tasks = []
for task in remaining_tasks:
# 检查是否需要重新分解
if self._needs_redecomposition(task, failure_patterns):
# 重新分解
new_subtasks = self.decomposer.decompose(
task.description,
max_depth=2
)
adjusted_tasks.extend(new_subtasks)
else:
adjusted_tasks.append(task)
# 更新依赖关系
self._update_dependencies(adjusted_tasks, completed_tasks)
return adjusted_tasks
def _analyze_failures(
self,
failed_tasks: List[Dict]
) -> Dict:
"""分析失败模式"""
patterns = {
'complexity_issues': 0,
'dependency_issues': 0,
'resource_issues': 0,
'other_issues': 0
}
for failure in failed_tasks:
error = failure.get('error', '').lower()
if '复杂' in error or '困难' in error:
patterns['complexity_issues'] += 1
elif '依赖' in error:
patterns['dependency_issues'] += 1
elif '资源' in error or '时间' in error:
patterns['resource_issues'] += 1
else:
patterns['other_issues'] += 1
return patterns
def _needs_redecomposition(
self,
task: 'SubTask',
failure_patterns: Dict
) -> bool:
"""判断是否需要重新分解"""
# 如果失败主要是复杂度问题,重新分解
if failure_patterns['complexity_issues'] > 0:
return True
# 如果任务估计时间过长,重新分解
if task.estimated_time > 120: # 2 小时
return True
return False
def _update_dependencies(
self,
tasks: List['SubTask'],
completed_tasks: List[str]
):
"""更新依赖关系"""
completed_set = set(completed_tasks)
for task in tasks:
# 移除已完成的依赖
task.dependencies = [
dep for dep in task.dependencies
if dep not in completed_set
]
四、LLM 规划增强
4.1 Chain of Thought 规划
# cot_planning.py
from typing import List, Dict
class CoTPlanner:
"""思维链规划器"""
def __init__(self, llm):
self.llm = llm
def plan_with_cot(self, task: str) -> Dict:
"""
使用思维链规划
Args:
task: 任务描述
Returns:
规划结果
"""
prompt = f"""
请逐步思考以下任务的执行计划:
任务:{task}
请按以下步骤思考:
1. **理解任务**
- 任务的核心目标是什么?
- 有哪些关键约束条件?
- 成功的标准是什么?
2. **分析现状**
- 当前有什么资源可用?
- 已有哪些信息?
- 还缺少什么?
3. **制定策略**
- 完成任务需要哪些主要步骤?
- 步骤之间的依赖关系如何?
- 哪些步骤可以并行?
4. **细化计划**
- 每个步骤具体做什么?
- 每个步骤预计需要多长时间?
- 可能的风险有哪些?
5. **制定备选方案**
- 如果某步骤失败,如何应对?
- 有无替代方案?
请详细输出以上思考过程,最后给出可执行的计划。
"""
response = self.llm.generate(prompt)
return self._parse_cot_response(response)
def _parse_cot_response(self, response: str) -> Dict:
"""解析思维链响应"""
sections = {
'understanding': '',
'analysis': '',
'strategy': '',
'plan': '',
'contingency': ''
}
current_section = None
for line in response.split('\n'):
if '理解任务' in line:
current_section = 'understanding'
elif '分析现状' in line:
current_section = 'analysis'
elif '制定策略' in line:
current_section = 'strategy'
elif '细化计划' in line:
current_section = 'plan'
elif '备选方案' in line:
current_section = 'contingency'
elif current_section:
sections[current_section] += line + '\n'
return sections
4.2 Tree of Thoughts 规划
# tot_planning.py
from typing import List, Dict
class ToTPlanner:
"""思维树规划器"""
def __init__(self, llm, branching_factor: int = 3):
self.llm = llm
self.branching_factor = branching_factor
def plan_with_tot(self, task: str, depth: int = 3) -> Dict:
"""
使用思维树规划
Args:
task: 任务描述
depth: 树的深度
"""
# 生成初始思路
initial_thoughts = self._generate_thoughts(task, 1)
# 构建树
tree = self._build_tree(initial_thoughts, depth)
# 评估和选择最佳路径
best_path = self._evaluate_and_select(tree)
return {
'task': task,
'tree': tree,
'best_path': best_path
}
def _generate_thoughts(
self,
context: str,
num_thoughts: int
) -> List[Dict]:
"""生成思路"""
prompt = f"""
基于以下上下文,生成{num_thoughts}个不同的执行思路:
上下文:{context}
每个思路应该:
1. 有明确的方法论
2. 考虑不同的角度
3. 有可行性评估
按 JSON 格式输出:
[
{{"thought": "思路描述", "feasibility": 0.0-1.0}}
]
"""
response = self.llm.generate(prompt)
return self._parse_json(response)
def _build_tree(
self,
initial_thoughts: List[Dict],
depth: int
) -> Dict:
"""构建树"""
tree = {
'thoughts': initial_thoughts,
'children': []
}
for thought in initial_thoughts:
if depth > 1:
child_thoughts = self._generate_thoughts(
thought['thought'],
self.branching_factor
)
child_tree = self._build_tree(child_thoughts, depth - 1)
tree['children'].append(child_tree)
else:
tree['children'].append(None)
return tree
def _evaluate_and_select(self, tree: Dict) -> List[Dict]:
"""评估并选择最佳路径"""
# 简化实现:选择可行性最高的路径
best_path = []
current = tree
while current and current.get('thoughts'):
best_thought = max(
current['thoughts'],
key=lambda x: x.get('feasibility', 0)
)
best_path.append(best_thought)
# 移动到子节点
if current.get('children'):
idx = current['thoughts'].index(best_thought)
current = current['children'][idx]
else:
break
return best_path
def _parse_json(self, text: str) -> List[Dict]:
"""解析 JSON"""
import json
import re
match = re.search(r'\[.*\]', text, re.DOTALL)
if match:
return json.loads(match.group())
return []
五、实战案例
5.1 复杂项目规划
# project_planning.py
class ProjectPlanningAgent:
"""项目规划 Agent"""
def __init__(self, llm):
self.llm = llm
self.decomposer = TaskDecomposer(llm)
self.dependency_mgr = None
self.tracker = ExecutionTracker()
def plan_project(
self,
project_description: str,
timeline_weeks: int,
team_size: int
) -> Dict:
"""
规划项目
Args:
project_description: 项目描述
timeline_weeks: 时间线(周)
team_size: 团队规模
"""
# 1. 任务分解
tasks = self.decomposer.decompose(
project_description,
max_depth=3,
max_subtasks=20
)
# 2. 依赖管理
self.dependency_mgr = DependencyManager(tasks)
# 3. 执行顺序
execution_order = self.dependency_mgr.get_execution_order()
# 4. 关键路径
critical_path = self.dependency_mgr.get_critical_path()
# 5. 时间分配
total_time = sum(t.estimated_time for t in tasks)
available_time = timeline_weeks * 40 * team_size # 小时
if total_time > available_time:
# 需要调整
adjustment_ratio = available_time / total_time
for task in tasks:
task.estimated_time = int(task.estimated_time * adjustment_ratio)
# 6. 里程碑设置
milestones = self._set_milestones(tasks, timeline_weeks)
return {
'tasks': tasks,
'execution_order': execution_order,
'critical_path': critical_path,
'milestones': milestones,
'total_estimated_hours': total_time,
'available_hours': available_time
}
def _set_milestones(
self,
tasks: List[SubTask],
timeline_weeks: int
) -> List[Dict]:
"""设置里程碑"""
milestones = []
# 按周设置里程碑
weeks = timeline_weeks
tasks_per_week = len(tasks) // weeks
for week in range(1, weeks + 1):
start_idx = (week - 1) * tasks_per_week
end_idx = min(week * tasks_per_week, len(tasks))
milestones.append({
'week': week,
'tasks': tasks[start_idx:end_idx],
'deliverable': f'第{week}周交付物'
})
return milestones
5.2 研究任务规划
# research_planning.py
class ResearchPlanningAgent:
"""研究任务规划 Agent"""
def __init__(self, llm):
self.llm = llm
self.decomposer = TaskDecomposer(llm)
def plan_research(self, research_topic: str) -> Dict:
"""
规划研究任务
Args:
research_topic: 研究主题
"""
# 1. 分解研究任务
tasks = self.decomposer.decompose(
f"研究课题:{research_topic}",
max_depth=2
)
# 2. 分类任务
categorized = {
'literature_review': [],
'data_collection': [],
'analysis': [],
'writing': []
}
for task in tasks:
category = self._categorize_task(task.description)
categorized[category].append(task)
# 3. 设置研究阶段
phases = [
{
'name': '文献调研',
'tasks': categorized['literature_review'],
'duration_weeks': 2
},
{
'name': '数据收集',
'tasks': categorized['data_collection'],
'duration_weeks': 3
},
{
'name': '分析研究',
'tasks': categorized['analysis'],
'duration_weeks': 3
},
{
'name': '论文撰写',
'tasks': categorized['writing'],
'duration_weeks': 2
}
]
return {
'topic': research_topic,
'tasks': tasks,
'categorized_tasks': categorized,
'phases': phases,
'total_duration_weeks': 10
}
def _categorize_task(self, description: str) -> str:
"""分类任务"""
if any(kw in description for kw in ['文献', '调研', '综述']):
return 'literature_review'
elif any(kw in description for kw in ['数据', '收集', '调查']):
return 'data_collection'
elif any(kw in description for kw in ['分析', '研究', '实验']):
return 'analysis'
else:
return 'writing'
六、总结
6.1 核心要点
-
任务分解
- 分层分解复杂任务
- 明确依赖关系
- 合理估计时间
-
依赖管理
- 构建依赖图
- 拓扑排序
- 识别关键路径
-
执行监控
- 实时跟踪进度
- 动态调整计划
- 处理异常情况
6.2 最佳实践
-
渐进式规划
- 先粗后细
- 边执行边调整
- 保持灵活性
-
风险预判
- 识别潜在风险
- 制定备选方案
- 预留缓冲时间
-
持续优化
- 记录执行数据
- 分析失败原因
- 改进规划策略
参考资料