Agent 评估与调试实战
评估和调试是保障 Agent 质量的关键环节。如何科学评估 Agent 性能?如何高效排查 Agent 问题?本文深入解析 Agent 评估与调试的实战方案。
一、评估框架
1.1 评估维度
Agent 评估维度:
┌─────────────────────────────────────┐
│ 1. 功能正确性(Correctness) │
│ - 任务完成率 │
│ - 输出准确性 │
│ - 边界条件处理 │
├─────────────────────────────────────┤
│ 2. 性能指标(Performance) │
│ - 响应延迟 │
│ - Token 效率 │
│ - 资源消耗 │
├─────────────────────────────────────┤
│ 3. 可靠性(Reliability) │
│ - 错误率 │
│ - 稳定性 │
│ - 异常处理 │
├─────────────────────────────────────┤
│ 4. 用户体验(User Experience) │
│ - 响应质量 │
│ - 交互流畅度 │
│ - 用户满意度 │
└─────────────────────────────────────┘
1.2 评估指标体系
# evaluation_metrics.py
from typing import Dict, List
from dataclasses import dataclass
from enum import Enum
class MetricType(Enum):
"""指标类型"""
ACCURACY = "accuracy"
EFFICIENCY = "efficiency"
RELIABILITY = "reliability"
USER_SATISFACTION = "user_satisfaction"
@dataclass
class Metric:
"""评估指标"""
name: str
metric_type: MetricType
value: float
target: float
weight: float
class AgentEvaluator:
"""Agent 评估器"""
def __init__(self):
self.metrics: List[Metric] = []
def add_metric(
self,
name: str,
metric_type: MetricType,
value: float,
target: float,
weight: float = 1.0
):
"""添加指标"""
self.metrics.append(Metric(
name=name,
metric_type=metric_type,
value=value,
target=target,
weight=weight
))
def calculate_overall_score(self) -> float:
"""计算综合评分"""
if not self.metrics:
return 0.0
# 按类型分组
type_scores = {}
type_weights = {}
for metric in self.metrics:
if metric.metric_type not in type_scores:
type_scores[metric.metric_type] = 0
type_weights[metric.metric_type] = 0
# 计算得分(相对于目标)
score = min(metric.value / metric.target, 1.0) if metric.target > 0 else 0
type_scores[metric.metric_type] += score * metric.weight
type_weights[metric.metric_type] += metric.weight
# 计算类型平均分
type_averages = []
for metric_type in type_scores:
if type_weights[metric_type] > 0:
type_averages.append(
type_scores[metric_type] / type_weights[metric_type]
)
# 综合评分
return sum(type_averages) / len(type_averages) if type_averages else 0.0
def get_report(self) -> Dict:
"""生成评估报告"""
report = {
'overall_score': self.calculate_overall_score(),
'metrics': [],
'by_type': {}
}
# 按类型汇总
for metric in self.metrics:
report['metrics'].append({
'name': metric.name,
'value': metric.value,
'target': metric.target,
'achievement': metric.value / metric.target if metric.target > 0 else 0
})
metric_type = metric.metric_type.value
if metric_type not in report['by_type']:
report['by_type'][metric_type] = []
report['by_type'][metric_type].append(metric.name)
return report
二、性能评估
2.1 基准测试
# benchmark_testing.py
from typing import List, Dict
import time
from statistics import mean, median, stdev
class BenchmarkTester:
"""基准测试器"""
def __init__(self, agent):
self.agent = agent
self.test_cases: List[Dict] = []
def add_test_case(
self,
name: str,
input_data: Dict,
expected_output: Dict = None,
difficulty: str = 'medium'
):
"""添加测试用例"""
self.test_cases.append({
'name': name,
'input': input_data,
'expected': expected_output,
'difficulty': difficulty
})
def run_benchmark(
self,
iterations: int = 10
) -> Dict:
"""
运行基准测试
Args:
iterations: 每用例迭代次数
"""
results = {
'test_cases': [],
'summary': {}
}
all_latencies = []
all_accuracies = []
for test_case in self.test_cases:
case_result = self._run_test_case(
test_case,
iterations
)
results['test_cases'].append(case_result)
all_latencies.extend(case_result['latencies'])
if 'accuracy' in case_result:
all_accuracies.append(case_result['accuracy'])
# 汇总统计
results['summary'] = {
'total_cases': len(self.test_cases),
'latency': {
'mean': mean(all_latencies),
'median': median(all_latencies),
'p95': sorted(all_latencies)[int(len(all_latencies) * 0.95)],
'p99': sorted(all_latencies)[int(len(all_latencies) * 0.99)],
'stdev': stdev(all_latencies) if len(all_latencies) > 1 else 0
},
'accuracy': {
'mean': mean(all_accuracies) if all_accuracies else 0
}
}
return results
def _run_test_case(
self,
test_case: Dict,
iterations: int
) -> Dict:
"""运行单个测试用例"""
latencies = []
outputs = []
for _ in range(iterations):
start = time.perf_counter()
output = self.agent.process(test_case['input'])
end = time.perf_counter()
latencies.append((end - start) * 1000) # 毫秒
outputs.append(output)
result = {
'name': test_case['name'],
'difficulty': test_case['difficulty'],
'latencies': latencies,
'mean_latency': mean(latencies)
}
# 如果有期望输出,计算准确率
if test_case.get('expected'):
accuracies = [
self._calculate_accuracy(output, test_case['expected'])
for output in outputs
]
result['accuracy'] = mean(accuracies)
return result
def _calculate_accuracy(
self,
output: Dict,
expected: Dict
) -> float:
"""计算准确率"""
# 简化实现
matches = 0
total = 0
for key in expected:
if key in output:
total += 1
if output[key] == expected[key]:
matches += 1
return matches / total if total > 0 else 0
2.2 负载测试
# load_testing.py
from typing import List, Dict
from concurrent.futures import ThreadPoolExecutor
import time
class LoadTester:
"""负载测试器"""
def __init__(self, agent):
self.agent = agent
def test_concurrent_load(
self,
test_input: Dict,
concurrent_users: List[int],
duration_seconds: int = 60
) -> Dict:
"""
并发负载测试
Args:
test_input: 测试输入
concurrent_users: 并发用户数列表
duration_seconds: 测试时长
"""
results = {}
for num_users in concurrent_users:
result = self._test_load_level(
test_input,
num_users,
duration_seconds
)
results[f'{num_users}_users'] = result
return results
def _test_load_level(
self,
test_input: Dict,
num_users: int,
duration: int
) -> Dict:
"""测试负载级别"""
start_time = time.time()
request_count = 0
error_count = 0
latencies = []
def make_request():
nonlocal request_count, error_count
try:
start = time.perf_counter()
self.agent.process(test_input)
end = time.perf_counter()
latencies.append((end - start) * 1000)
request_count += 1
except Exception as e:
error_count += 1
# 并发执行
with ThreadPoolExecutor(max_workers=num_users) as executor:
while time.time() - start_time < duration:
futures = [
executor.submit(make_request)
for _ in range(num_users)
]
for future in futures:
future.result()
# 计算指标
elapsed = time.time() - start_time
return {
'duration': elapsed,
'total_requests': request_count,
'error_count': error_count,
'error_rate': error_count / request_count if request_count > 0 else 0,
'qps': request_count / elapsed,
'latency': {
'mean': mean(latencies) if latencies else 0,
'p95': sorted(latencies)[int(len(latencies) * 0.95)] if latencies else 0,
'p99': sorted(latencies)[int(len(latencies) * 0.99)] if latencies else 0
}
}
三、行为分析
3.1 行为追踪
# behavior_tracing.py
from typing import Dict, List, Optional
from dataclasses import dataclass, asdict
from datetime import datetime
@dataclass
class TraceEvent:
"""追踪事件"""
timestamp: str
event_type: str
agent_id: str
action: str
input_data: Optional[Dict]
output_data: Optional[Dict]
duration_ms: float
metadata: Optional[Dict]
class BehaviorTracer:
"""行为追踪器"""
def __init__(self):
self.traces: List[TraceEvent] = []
def record(
self,
event_type: str,
agent_id: str,
action: str,
input_data: Dict = None,
output_data: Dict = None,
duration_ms: float = 0,
metadata: Dict = None
):
"""记录事件"""
event = TraceEvent(
timestamp=datetime.now().isoformat(),
event_type=event_type,
agent_id=agent_id,
action=action,
input_data=input_data,
output_data=output_data,
duration_ms=duration_ms,
metadata=metadata
)
self.traces.append(event)
def get_agent_trace(self, agent_id: str) -> List[Dict]:
"""获取 Agent 追踪"""
return [
asdict(t) for t in self.traces
if t.agent_id == agent_id
]
def analyze_behavior(self, agent_id: str) -> Dict:
"""分析行为模式"""
agent_traces = self.get_agent_trace(agent_id)
if not agent_traces:
return {}
# 行动统计
action_counts = {}
for trace in agent_traces:
action = trace['action']
action_counts[action] = action_counts.get(action, 0) + 1
# 延迟统计
durations = [t['duration_ms'] for t in agent_traces]
# 错误统计
error_count = sum(
1 for t in agent_traces
if t.get('metadata', {}).get('error')
)
return {
'agent_id': agent_id,
'total_actions': len(agent_traces),
'action_distribution': action_counts,
'avg_duration_ms': mean(durations) if durations else 0,
'error_rate': error_count / len(agent_traces) if agent_traces else 0,
'most_common_action': max(action_counts, key=action_counts.get) if action_counts else None
}
def get_sequence_patterns(
self,
agent_id: str,
pattern_length: int = 3
) -> List[Dict]:
"""获取行动序列模式"""
agent_traces = self.get_agent_trace(agent_id)
if len(agent_traces) < pattern_length:
return []
# 提取行动序列
actions = [t['action'] for t in agent_traces]
# 统计模式
patterns = {}
for i in range(len(actions) - pattern_length + 1):
pattern = tuple(actions[i:i + pattern_length])
patterns[pattern] = patterns.get(pattern, 0) + 1
# 排序
sorted_patterns = sorted(
patterns.items(),
key=lambda x: x[1],
reverse=True
)
return [
{'pattern': list(p), 'count': c}
for p, c in sorted_patterns[:10]
]
3.2 异常检测
# anomaly_detection.py
from typing import Dict, List
import statistics
class AnomalyDetector:
"""异常检测器"""
def __init__(self, baseline_traces: List[Dict]):
"""
初始化
Args:
baseline_traces: 基线追踪数据
"""
self.baseline = self._compute_baseline(baseline_traces)
def _compute_baseline(
self,
traces: List[Dict]
) -> Dict:
"""计算基线"""
durations = [t['duration_ms'] for t in traces]
error_rates = [
t.get('metadata', {}).get('error_rate', 0)
for t in traces
]
return {
'duration': {
'mean': statistics.mean(durations),
'stdev': statistics.stdev(durations) if len(durations) > 1 else 0
},
'error_rate': {
'mean': statistics.mean(error_rates),
'stdev': statistics.stdev(error_rates) if len(error_rates) > 1 else 0
}
}
def detect_anomalies(
self,
current_traces: List[Dict],
threshold: float = 2.0
) -> List[Dict]:
"""
检测异常
Args:
current_traces: 当前追踪
threshold: 阈值(标准差倍数)
"""
anomalies = []
for trace in current_traces:
anomaly_reasons = []
# 检查延迟异常
duration_zscore = self._calculate_zscore(
trace['duration_ms'],
self.baseline['duration']['mean'],
self.baseline['duration']['stdev']
)
if abs(duration_zscore) > threshold:
anomaly_reasons.append(
f"异常延迟:{trace['duration_ms']:.1f}ms "
f"(Z-score: {duration_zscore:.2f})"
)
# 检查错误率异常
error_rate = trace.get('metadata', {}).get('error_rate', 0)
error_zscore = self._calculate_zscore(
error_rate,
self.baseline['error_rate']['mean'],
self.baseline['error_rate']['stdev']
)
if abs(error_zscore) > threshold:
anomaly_reasons.append(
f"异常错误率:{error_rate:.2%} "
f"(Z-score: {error_zscore:.2f})"
)
if anomaly_reasons:
anomalies.append({
'trace': trace,
'reasons': anomaly_reasons,
'severity': 'high' if len(anomaly_reasons) > 1 else 'medium'
})
return anomalies
def _calculate_zscore(
self,
value: float,
mean: float,
stdev: float
) -> float:
"""计算 Z-score"""
if stdev == 0:
return 0
return (value - mean) / stdev
四、调试技术
4.1 日志分析
# log_analysis.py
from typing import Dict, List
from collections import Counter
import re
class LogAnalyzer:
"""日志分析器"""
def __init__(self, log_entries: List[Dict]):
"""
初始化
Args:
log_entries: 日志条目
"""
self.logs = log_entries
def find_errors(self) -> List[Dict]:
"""查找错误"""
return [
log for log in self.logs
if log.get('level') in ['ERROR', 'CRITICAL']
]
def find_patterns(
self,
pattern: str
) -> List[Dict]:
"""查找模式"""
regex = re.compile(pattern, re.IGNORECASE)
return [
log for log in self.logs
if regex.search(str(log.get('message', '')))
]
def analyze_error_frequency(self) -> Dict:
"""分析错误频率"""
errors = self.find_errors()
# 按错误类型统计
error_types = Counter(
log.get('error_type', 'unknown')
for log in errors
)
# 按时间统计
hourly_errors = Counter(
log.get('timestamp', '')[:13] # 小时级别
for log in errors
)
return {
'total_errors': len(errors),
'by_type': dict(error_types),
'by_hour': dict(hourly_errors),
'most_common_error': error_types.most_common(1)
}
def find_correlations(
self,
target_error: str
) -> Dict:
"""查找相关性"""
# 查找目标错误之前的日志
target_logs = self.find_patterns(target_error)
# 分析前置条件
preceding_events = []
for log in target_logs:
# 查找前 5 条日志
idx = self.logs.index(log)
if idx >= 5:
preceding = self.logs[idx-5:idx]
preceding_events.extend(preceding)
# 统计常见前置事件
event_counts = Counter(
log.get('event_type', 'unknown')
for log in preceding_events
)
return {
'target_error': target_error,
'occurrences': len(target_logs),
'common_preceding_events': event_counts.most_common(5)
}
4.2 根因分析
# root_cause_analysis.py
from typing import Dict, List, Optional
class RootCauseAnalyzer:
"""根因分析器"""
def __init__(self):
self.cause_tree: Dict = {}
def build_cause_tree(
self,
symptom: str,
possible_causes: List[str]
):
"""构建原因树"""
self.cause_tree = {
'symptom': symptom,
'causes': []
}
for cause in possible_causes:
self.cause_tree['causes'].append({
'cause': cause,
'likelihood': self._estimate_likelihood(cause),
'evidence': [],
'verified': False
})
def _estimate_likelihood(self, cause: str) -> float:
"""估计可能性"""
# 简化实现
return 0.5
def add_evidence(
self,
cause: str,
evidence: str,
supports: bool
):
"""添加证据"""
for cause_node in self.cause_tree['causes']:
if cause_node['cause'] == cause:
cause_node['evidence'].append({
'evidence': evidence,
'supports': supports
})
# 更新可能性
supporting = sum(
1 for e in cause_node['evidence']
if e['supports']
)
total = len(cause_node['evidence'])
cause_node['likelihood'] = supporting / total if total > 0 else 0.5
break
def verify_cause(
self,
cause: str,
verified: bool
):
"""验证原因"""
for cause_node in self.cause_tree['causes']:
if cause_node['cause'] == cause:
cause_node['verified'] = verified
break
def get_root_cause(self) -> Optional[str]:
"""获取根因"""
# 找到已验证的原因中可能性最高的
verified_causes = [
c for c in self.cause_tree['causes']
if c['verified']
]
if verified_causes:
return max(verified_causes, key=lambda x: x['likelihood'])['cause']
# 如果没有已验证的,返回可能性最高的
if self.cause_tree['causes']:
return max(
self.cause_tree['causes'],
key=lambda x: x['likelihood']
)['cause']
return None
def get_analysis_report(self) -> Dict:
"""生成分析报告"""
return {
'symptom': self.cause_tree.get('symptom'),
'possible_causes': [
{
'cause': c['cause'],
'likelihood': c['likelihood'],
'verified': c['verified'],
'evidence_count': len(c['evidence'])
}
for c in self.cause_tree.get('causes', [])
],
'root_cause': self.get_root_cause()
}
五、调试工具
5.1 交互式调试器
# interactive_debugger.py
from typing import Dict, List, Optional
class InteractiveDebugger:
"""交互式调试器"""
def __init__(self, agent):
self.agent = agent
self.breakpoints: List[str] = []
self.current_state: Dict = {}
def set_breakpoint(self, action: str):
"""设置断点"""
if action not in self.breakpoints:
self.breakpoints.append(action)
def remove_breakpoint(self, action: str):
"""移除断点"""
if action in self.breakpoints:
self.breakpoints.remove(action)
def debug_run(
self,
input_data: Dict,
step_by_step: bool = False
) -> Dict:
"""
调试运行
Args:
input_data: 输入数据
step_by_step: 单步执行
"""
result = {
'steps': [],
'final_output': None,
'stopped_at': None
}
current_data = input_data
while True:
# 执行一步
step_result = self.agent.execute_step(current_data)
result['steps'].append({
'action': step_result['action'],
'input': current_data,
'output': step_result['output'],
'state': step_result['state']
})
# 检查断点
if step_result['action'] in self.breakpoints:
result['stopped_at'] = step_result['action']
if step_by_step:
# 等待用户指令
command = self._wait_for_command()
if command == 'continue':
pass # 继续
elif command == 'stop':
break
elif command == 'inspect':
self._inspect_state(step_result['state'])
continue
elif command == 'quit':
result['stopped_at'] = 'user_quit'
break
# 检查是否完成
if step_result.get('done'):
result['final_output'] = step_result['output']
break
current_data = step_result['output']
return result
def _wait_for_command(self) -> str:
"""等待用户命令"""
print("\n--- 断点 ---")
print("可用命令:continue, stop, inspect, quit")
while True:
command = input("> ").strip().lower()
if command in ['continue', 'stop', 'inspect', 'quit']:
return command
print("未知命令,请重试")
def _inspect_state(self, state: Dict):
"""检查状态"""
print("\n--- 当前状态 ---")
for key, value in state.items():
print(f"{key}: {value}")
5.2 可视化调试
# visual_debugger.py
from typing import Dict, List
class VisualDebugger:
"""可视化调试器"""
def __init__(self):
self.execution_graph: List[Dict] = []
def record_step(
self,
step_id: str,
action: str,
input_data: Dict,
output_data: Dict,
duration_ms: float
):
"""记录步骤"""
self.execution_graph.append({
'step_id': step_id,
'action': action,
'input': input_data,
'output': output_data,
'duration_ms': duration_ms
})
def generate_mermaid_flowchart(self) -> str:
"""生成 Mermaid 流程图"""
lines = ['graph LR']
for i, step in enumerate(self.execution_graph):
node_id = f"step_{i}"
label = f"{step['action']}\\n({step['duration_ms']:.1f}ms)"
lines.append(f" {node_id}[{label}]")
if i > 0:
prev_id = f"step_{i-1}"
lines.append(f" {prev_id} --> {node_id}")
return '\n'.join(lines)
def generate_timeline(self) -> str:
"""生成时间线图"""
lines = ['--- 执行时间线 ---']
cumulative_time = 0
for step in self.execution_graph:
cumulative_time += step['duration_ms']
lines.append(
f"{cumulative_time:8.1f}ms | {step['action']}"
)
return '\n'.join(lines)
def identify_bottlenecks(
self,
threshold_ms: float = 1000
) -> List[Dict]:
"""识别瓶颈"""
bottlenecks = []
for step in self.execution_graph:
if step['duration_ms'] > threshold_ms:
bottlenecks.append({
'step_id': step['step_id'],
'action': step['action'],
'duration_ms': step['duration_ms'],
'percentage': (
step['duration_ms'] /
sum(s['duration_ms'] for s in self.execution_graph)
* 100
)
})
return sorted(
bottlenecks,
key=lambda x: x['duration_ms'],
reverse=True
)
六、总结
6.1 核心要点
-
评估体系
- 多维度指标
- 基准测试
- 负载测试
-
行为分析
- 行为追踪
- 异常检测
- 模式识别
-
调试技术
- 日志分析
- 根因分析
- 交互式调试
6.2 最佳实践
-
持续评估
- 定期基准测试
- 监控关键指标
- 建立质量基线
-
快速定位
- 完善的日志
- 行为追踪
- 可视化调试
-
系统方法
- 标准化流程
- 工具化支持
- 知识沉淀
参考资料