Skip to content
清晨的一缕阳光
返回

Agent 评估与调试实战

Agent 评估与调试实战

评估和调试是保障 Agent 质量的关键环节。如何科学评估 Agent 性能?如何高效排查 Agent 问题?本文深入解析 Agent 评估与调试的实战方案。

一、评估框架

1.1 评估维度

Agent 评估维度:

┌─────────────────────────────────────┐
│ 1. 功能正确性(Correctness)         │
│    - 任务完成率                      │
│    - 输出准确性                      │
│    - 边界条件处理                    │
├─────────────────────────────────────┤
│ 2. 性能指标(Performance)           │
│    - 响应延迟                        │
│    - Token 效率                      │
│    - 资源消耗                        │
├─────────────────────────────────────┤
│ 3. 可靠性(Reliability)             │
│    - 错误率                          │
│    - 稳定性                          │
│    - 异常处理                        │
├─────────────────────────────────────┤
│ 4. 用户体验(User Experience)       │
│    - 响应质量                        │
│    - 交互流畅度                      │
│    - 用户满意度                      │
└─────────────────────────────────────┘

1.2 评估指标体系

# evaluation_metrics.py
from typing import Dict, List
from dataclasses import dataclass
from enum import Enum

class MetricType(Enum):
    """指标类型"""
    ACCURACY = "accuracy"
    EFFICIENCY = "efficiency"
    RELIABILITY = "reliability"
    USER_SATISFACTION = "user_satisfaction"

@dataclass
class Metric:
    """评估指标"""
    name: str
    metric_type: MetricType
    value: float
    target: float
    weight: float

class AgentEvaluator:
    """Agent 评估器"""
    
    def __init__(self):
        self.metrics: List[Metric] = []
    
    def add_metric(
        self,
        name: str,
        metric_type: MetricType,
        value: float,
        target: float,
        weight: float = 1.0
    ):
        """添加指标"""
        self.metrics.append(Metric(
            name=name,
            metric_type=metric_type,
            value=value,
            target=target,
            weight=weight
        ))
    
    def calculate_overall_score(self) -> float:
        """计算综合评分"""
        if not self.metrics:
            return 0.0
        
        # 按类型分组
        type_scores = {}
        type_weights = {}
        
        for metric in self.metrics:
            if metric.metric_type not in type_scores:
                type_scores[metric.metric_type] = 0
                type_weights[metric.metric_type] = 0
            
            # 计算得分(相对于目标)
            score = min(metric.value / metric.target, 1.0) if metric.target > 0 else 0
            type_scores[metric.metric_type] += score * metric.weight
            type_weights[metric.metric_type] += metric.weight
        
        # 计算类型平均分
        type_averages = []
        for metric_type in type_scores:
            if type_weights[metric_type] > 0:
                type_averages.append(
                    type_scores[metric_type] / type_weights[metric_type]
                )
        
        # 综合评分
        return sum(type_averages) / len(type_averages) if type_averages else 0.0
    
    def get_report(self) -> Dict:
        """生成评估报告"""
        report = {
            'overall_score': self.calculate_overall_score(),
            'metrics': [],
            'by_type': {}
        }
        
        # 按类型汇总
        for metric in self.metrics:
            report['metrics'].append({
                'name': metric.name,
                'value': metric.value,
                'target': metric.target,
                'achievement': metric.value / metric.target if metric.target > 0 else 0
            })
            
            metric_type = metric.metric_type.value
            if metric_type not in report['by_type']:
                report['by_type'][metric_type] = []
            report['by_type'][metric_type].append(metric.name)
        
        return report

二、性能评估

2.1 基准测试

# benchmark_testing.py
from typing import List, Dict
import time
from statistics import mean, median, stdev

class BenchmarkTester:
    """基准测试器"""
    
    def __init__(self, agent):
        self.agent = agent
        self.test_cases: List[Dict] = []
    
    def add_test_case(
        self,
        name: str,
        input_data: Dict,
        expected_output: Dict = None,
        difficulty: str = 'medium'
    ):
        """添加测试用例"""
        self.test_cases.append({
            'name': name,
            'input': input_data,
            'expected': expected_output,
            'difficulty': difficulty
        })
    
    def run_benchmark(
        self,
        iterations: int = 10
    ) -> Dict:
        """
        运行基准测试
        
        Args:
            iterations: 每用例迭代次数
        """
        results = {
            'test_cases': [],
            'summary': {}
        }
        
        all_latencies = []
        all_accuracies = []
        
        for test_case in self.test_cases:
            case_result = self._run_test_case(
                test_case,
                iterations
            )
            results['test_cases'].append(case_result)
            
            all_latencies.extend(case_result['latencies'])
            if 'accuracy' in case_result:
                all_accuracies.append(case_result['accuracy'])
        
        # 汇总统计
        results['summary'] = {
            'total_cases': len(self.test_cases),
            'latency': {
                'mean': mean(all_latencies),
                'median': median(all_latencies),
                'p95': sorted(all_latencies)[int(len(all_latencies) * 0.95)],
                'p99': sorted(all_latencies)[int(len(all_latencies) * 0.99)],
                'stdev': stdev(all_latencies) if len(all_latencies) > 1 else 0
            },
            'accuracy': {
                'mean': mean(all_accuracies) if all_accuracies else 0
            }
        }
        
        return results
    
    def _run_test_case(
        self,
        test_case: Dict,
        iterations: int
    ) -> Dict:
        """运行单个测试用例"""
        latencies = []
        outputs = []
        
        for _ in range(iterations):
            start = time.perf_counter()
            output = self.agent.process(test_case['input'])
            end = time.perf_counter()
            
            latencies.append((end - start) * 1000)  # 毫秒
            outputs.append(output)
        
        result = {
            'name': test_case['name'],
            'difficulty': test_case['difficulty'],
            'latencies': latencies,
            'mean_latency': mean(latencies)
        }
        
        # 如果有期望输出,计算准确率
        if test_case.get('expected'):
            accuracies = [
                self._calculate_accuracy(output, test_case['expected'])
                for output in outputs
            ]
            result['accuracy'] = mean(accuracies)
        
        return result
    
    def _calculate_accuracy(
        self,
        output: Dict,
        expected: Dict
    ) -> float:
        """计算准确率"""
        # 简化实现
        matches = 0
        total = 0
        
        for key in expected:
            if key in output:
                total += 1
                if output[key] == expected[key]:
                    matches += 1
        
        return matches / total if total > 0 else 0

2.2 负载测试

# load_testing.py
from typing import List, Dict
from concurrent.futures import ThreadPoolExecutor
import time

class LoadTester:
    """负载测试器"""
    
    def __init__(self, agent):
        self.agent = agent
    
    def test_concurrent_load(
        self,
        test_input: Dict,
        concurrent_users: List[int],
        duration_seconds: int = 60
    ) -> Dict:
        """
        并发负载测试
        
        Args:
            test_input: 测试输入
            concurrent_users: 并发用户数列表
            duration_seconds: 测试时长
        """
        results = {}
        
        for num_users in concurrent_users:
            result = self._test_load_level(
                test_input,
                num_users,
                duration_seconds
            )
            results[f'{num_users}_users'] = result
        
        return results
    
    def _test_load_level(
        self,
        test_input: Dict,
        num_users: int,
        duration: int
    ) -> Dict:
        """测试负载级别"""
        start_time = time.time()
        request_count = 0
        error_count = 0
        latencies = []
        
        def make_request():
            nonlocal request_count, error_count
            try:
                start = time.perf_counter()
                self.agent.process(test_input)
                end = time.perf_counter()
                latencies.append((end - start) * 1000)
                request_count += 1
            except Exception as e:
                error_count += 1
        
        # 并发执行
        with ThreadPoolExecutor(max_workers=num_users) as executor:
            while time.time() - start_time < duration:
                futures = [
                    executor.submit(make_request)
                    for _ in range(num_users)
                ]
                for future in futures:
                    future.result()
        
        # 计算指标
        elapsed = time.time() - start_time
        
        return {
            'duration': elapsed,
            'total_requests': request_count,
            'error_count': error_count,
            'error_rate': error_count / request_count if request_count > 0 else 0,
            'qps': request_count / elapsed,
            'latency': {
                'mean': mean(latencies) if latencies else 0,
                'p95': sorted(latencies)[int(len(latencies) * 0.95)] if latencies else 0,
                'p99': sorted(latencies)[int(len(latencies) * 0.99)] if latencies else 0
            }
        }

三、行为分析

3.1 行为追踪

# behavior_tracing.py
from typing import Dict, List, Optional
from dataclasses import dataclass, asdict
from datetime import datetime

@dataclass
class TraceEvent:
    """追踪事件"""
    timestamp: str
    event_type: str
    agent_id: str
    action: str
    input_data: Optional[Dict]
    output_data: Optional[Dict]
    duration_ms: float
    metadata: Optional[Dict]

class BehaviorTracer:
    """行为追踪器"""
    
    def __init__(self):
        self.traces: List[TraceEvent] = []
    
    def record(
        self,
        event_type: str,
        agent_id: str,
        action: str,
        input_data: Dict = None,
        output_data: Dict = None,
        duration_ms: float = 0,
        metadata: Dict = None
    ):
        """记录事件"""
        event = TraceEvent(
            timestamp=datetime.now().isoformat(),
            event_type=event_type,
            agent_id=agent_id,
            action=action,
            input_data=input_data,
            output_data=output_data,
            duration_ms=duration_ms,
            metadata=metadata
        )
        self.traces.append(event)
    
    def get_agent_trace(self, agent_id: str) -> List[Dict]:
        """获取 Agent 追踪"""
        return [
            asdict(t) for t in self.traces
            if t.agent_id == agent_id
        ]
    
    def analyze_behavior(self, agent_id: str) -> Dict:
        """分析行为模式"""
        agent_traces = self.get_agent_trace(agent_id)
        
        if not agent_traces:
            return {}
        
        # 行动统计
        action_counts = {}
        for trace in agent_traces:
            action = trace['action']
            action_counts[action] = action_counts.get(action, 0) + 1
        
        # 延迟统计
        durations = [t['duration_ms'] for t in agent_traces]
        
        # 错误统计
        error_count = sum(
            1 for t in agent_traces
            if t.get('metadata', {}).get('error')
        )
        
        return {
            'agent_id': agent_id,
            'total_actions': len(agent_traces),
            'action_distribution': action_counts,
            'avg_duration_ms': mean(durations) if durations else 0,
            'error_rate': error_count / len(agent_traces) if agent_traces else 0,
            'most_common_action': max(action_counts, key=action_counts.get) if action_counts else None
        }
    
    def get_sequence_patterns(
        self,
        agent_id: str,
        pattern_length: int = 3
    ) -> List[Dict]:
        """获取行动序列模式"""
        agent_traces = self.get_agent_trace(agent_id)
        
        if len(agent_traces) < pattern_length:
            return []
        
        # 提取行动序列
        actions = [t['action'] for t in agent_traces]
        
        # 统计模式
        patterns = {}
        for i in range(len(actions) - pattern_length + 1):
            pattern = tuple(actions[i:i + pattern_length])
            patterns[pattern] = patterns.get(pattern, 0) + 1
        
        # 排序
        sorted_patterns = sorted(
            patterns.items(),
            key=lambda x: x[1],
            reverse=True
        )
        
        return [
            {'pattern': list(p), 'count': c}
            for p, c in sorted_patterns[:10]
        ]

3.2 异常检测

# anomaly_detection.py
from typing import Dict, List
import statistics

class AnomalyDetector:
    """异常检测器"""
    
    def __init__(self, baseline_traces: List[Dict]):
        """
        初始化
        
        Args:
            baseline_traces: 基线追踪数据
        """
        self.baseline = self._compute_baseline(baseline_traces)
    
    def _compute_baseline(
        self,
        traces: List[Dict]
    ) -> Dict:
        """计算基线"""
        durations = [t['duration_ms'] for t in traces]
        error_rates = [
            t.get('metadata', {}).get('error_rate', 0)
            for t in traces
        ]
        
        return {
            'duration': {
                'mean': statistics.mean(durations),
                'stdev': statistics.stdev(durations) if len(durations) > 1 else 0
            },
            'error_rate': {
                'mean': statistics.mean(error_rates),
                'stdev': statistics.stdev(error_rates) if len(error_rates) > 1 else 0
            }
        }
    
    def detect_anomalies(
        self,
        current_traces: List[Dict],
        threshold: float = 2.0
    ) -> List[Dict]:
        """
        检测异常
        
        Args:
            current_traces: 当前追踪
            threshold: 阈值(标准差倍数)
        """
        anomalies = []
        
        for trace in current_traces:
            anomaly_reasons = []
            
            # 检查延迟异常
            duration_zscore = self._calculate_zscore(
                trace['duration_ms'],
                self.baseline['duration']['mean'],
                self.baseline['duration']['stdev']
            )
            
            if abs(duration_zscore) > threshold:
                anomaly_reasons.append(
                    f"异常延迟:{trace['duration_ms']:.1f}ms "
                    f"(Z-score: {duration_zscore:.2f})"
                )
            
            # 检查错误率异常
            error_rate = trace.get('metadata', {}).get('error_rate', 0)
            error_zscore = self._calculate_zscore(
                error_rate,
                self.baseline['error_rate']['mean'],
                self.baseline['error_rate']['stdev']
            )
            
            if abs(error_zscore) > threshold:
                anomaly_reasons.append(
                    f"异常错误率:{error_rate:.2%} "
                    f"(Z-score: {error_zscore:.2f})"
                )
            
            if anomaly_reasons:
                anomalies.append({
                    'trace': trace,
                    'reasons': anomaly_reasons,
                    'severity': 'high' if len(anomaly_reasons) > 1 else 'medium'
                })
        
        return anomalies
    
    def _calculate_zscore(
        self,
        value: float,
        mean: float,
        stdev: float
    ) -> float:
        """计算 Z-score"""
        if stdev == 0:
            return 0
        return (value - mean) / stdev

四、调试技术

4.1 日志分析

# log_analysis.py
from typing import Dict, List
from collections import Counter
import re

class LogAnalyzer:
    """日志分析器"""
    
    def __init__(self, log_entries: List[Dict]):
        """
        初始化
        
        Args:
            log_entries: 日志条目
        """
        self.logs = log_entries
    
    def find_errors(self) -> List[Dict]:
        """查找错误"""
        return [
            log for log in self.logs
            if log.get('level') in ['ERROR', 'CRITICAL']
        ]
    
    def find_patterns(
        self,
        pattern: str
    ) -> List[Dict]:
        """查找模式"""
        regex = re.compile(pattern, re.IGNORECASE)
        return [
            log for log in self.logs
            if regex.search(str(log.get('message', '')))
        ]
    
    def analyze_error_frequency(self) -> Dict:
        """分析错误频率"""
        errors = self.find_errors()
        
        # 按错误类型统计
        error_types = Counter(
            log.get('error_type', 'unknown')
            for log in errors
        )
        
        # 按时间统计
        hourly_errors = Counter(
            log.get('timestamp', '')[:13]  # 小时级别
            for log in errors
        )
        
        return {
            'total_errors': len(errors),
            'by_type': dict(error_types),
            'by_hour': dict(hourly_errors),
            'most_common_error': error_types.most_common(1)
        }
    
    def find_correlations(
        self,
        target_error: str
    ) -> Dict:
        """查找相关性"""
        # 查找目标错误之前的日志
        target_logs = self.find_patterns(target_error)
        
        # 分析前置条件
        preceding_events = []
        for log in target_logs:
            # 查找前 5 条日志
            idx = self.logs.index(log)
            if idx >= 5:
                preceding = self.logs[idx-5:idx]
                preceding_events.extend(preceding)
        
        # 统计常见前置事件
        event_counts = Counter(
            log.get('event_type', 'unknown')
            for log in preceding_events
        )
        
        return {
            'target_error': target_error,
            'occurrences': len(target_logs),
            'common_preceding_events': event_counts.most_common(5)
        }

4.2 根因分析

# root_cause_analysis.py
from typing import Dict, List, Optional

class RootCauseAnalyzer:
    """根因分析器"""
    
    def __init__(self):
        self.cause_tree: Dict = {}
    
    def build_cause_tree(
        self,
        symptom: str,
        possible_causes: List[str]
    ):
        """构建原因树"""
        self.cause_tree = {
            'symptom': symptom,
            'causes': []
        }
        
        for cause in possible_causes:
            self.cause_tree['causes'].append({
                'cause': cause,
                'likelihood': self._estimate_likelihood(cause),
                'evidence': [],
                'verified': False
            })
    
    def _estimate_likelihood(self, cause: str) -> float:
        """估计可能性"""
        # 简化实现
        return 0.5
    
    def add_evidence(
        self,
        cause: str,
        evidence: str,
        supports: bool
    ):
        """添加证据"""
        for cause_node in self.cause_tree['causes']:
            if cause_node['cause'] == cause:
                cause_node['evidence'].append({
                    'evidence': evidence,
                    'supports': supports
                })
                
                # 更新可能性
                supporting = sum(
                    1 for e in cause_node['evidence']
                    if e['supports']
                )
                total = len(cause_node['evidence'])
                cause_node['likelihood'] = supporting / total if total > 0 else 0.5
                break
    
    def verify_cause(
        self,
        cause: str,
        verified: bool
    ):
        """验证原因"""
        for cause_node in self.cause_tree['causes']:
            if cause_node['cause'] == cause:
                cause_node['verified'] = verified
                break
    
    def get_root_cause(self) -> Optional[str]:
        """获取根因"""
        # 找到已验证的原因中可能性最高的
        verified_causes = [
            c for c in self.cause_tree['causes']
            if c['verified']
        ]
        
        if verified_causes:
            return max(verified_causes, key=lambda x: x['likelihood'])['cause']
        
        # 如果没有已验证的,返回可能性最高的
        if self.cause_tree['causes']:
            return max(
                self.cause_tree['causes'],
                key=lambda x: x['likelihood']
            )['cause']
        
        return None
    
    def get_analysis_report(self) -> Dict:
        """生成分析报告"""
        return {
            'symptom': self.cause_tree.get('symptom'),
            'possible_causes': [
                {
                    'cause': c['cause'],
                    'likelihood': c['likelihood'],
                    'verified': c['verified'],
                    'evidence_count': len(c['evidence'])
                }
                for c in self.cause_tree.get('causes', [])
            ],
            'root_cause': self.get_root_cause()
        }

五、调试工具

5.1 交互式调试器

# interactive_debugger.py
from typing import Dict, List, Optional

class InteractiveDebugger:
    """交互式调试器"""
    
    def __init__(self, agent):
        self.agent = agent
        self.breakpoints: List[str] = []
        self.current_state: Dict = {}
    
    def set_breakpoint(self, action: str):
        """设置断点"""
        if action not in self.breakpoints:
            self.breakpoints.append(action)
    
    def remove_breakpoint(self, action: str):
        """移除断点"""
        if action in self.breakpoints:
            self.breakpoints.remove(action)
    
    def debug_run(
        self,
        input_data: Dict,
        step_by_step: bool = False
    ) -> Dict:
        """
        调试运行
        
        Args:
            input_data: 输入数据
            step_by_step: 单步执行
        """
        result = {
            'steps': [],
            'final_output': None,
            'stopped_at': None
        }
        
        current_data = input_data
        
        while True:
            # 执行一步
            step_result = self.agent.execute_step(current_data)
            
            result['steps'].append({
                'action': step_result['action'],
                'input': current_data,
                'output': step_result['output'],
                'state': step_result['state']
            })
            
            # 检查断点
            if step_result['action'] in self.breakpoints:
                result['stopped_at'] = step_result['action']
                
                if step_by_step:
                    # 等待用户指令
                    command = self._wait_for_command()
                    
                    if command == 'continue':
                        pass  # 继续
                    elif command == 'stop':
                        break
                    elif command == 'inspect':
                        self._inspect_state(step_result['state'])
                        continue
                    elif command == 'quit':
                        result['stopped_at'] = 'user_quit'
                        break
            
            # 检查是否完成
            if step_result.get('done'):
                result['final_output'] = step_result['output']
                break
            
            current_data = step_result['output']
        
        return result
    
    def _wait_for_command(self) -> str:
        """等待用户命令"""
        print("\n--- 断点 ---")
        print("可用命令:continue, stop, inspect, quit")
        
        while True:
            command = input("> ").strip().lower()
            if command in ['continue', 'stop', 'inspect', 'quit']:
                return command
            print("未知命令,请重试")
    
    def _inspect_state(self, state: Dict):
        """检查状态"""
        print("\n--- 当前状态 ---")
        for key, value in state.items():
            print(f"{key}: {value}")

5.2 可视化调试

# visual_debugger.py
from typing import Dict, List

class VisualDebugger:
    """可视化调试器"""
    
    def __init__(self):
        self.execution_graph: List[Dict] = []
    
    def record_step(
        self,
        step_id: str,
        action: str,
        input_data: Dict,
        output_data: Dict,
        duration_ms: float
    ):
        """记录步骤"""
        self.execution_graph.append({
            'step_id': step_id,
            'action': action,
            'input': input_data,
            'output': output_data,
            'duration_ms': duration_ms
        })
    
    def generate_mermaid_flowchart(self) -> str:
        """生成 Mermaid 流程图"""
        lines = ['graph LR']
        
        for i, step in enumerate(self.execution_graph):
            node_id = f"step_{i}"
            label = f"{step['action']}\\n({step['duration_ms']:.1f}ms)"
            lines.append(f"    {node_id}[{label}]")
            
            if i > 0:
                prev_id = f"step_{i-1}"
                lines.append(f"    {prev_id} --> {node_id}")
        
        return '\n'.join(lines)
    
    def generate_timeline(self) -> str:
        """生成时间线图"""
        lines = ['--- 执行时间线 ---']
        
        cumulative_time = 0
        for step in self.execution_graph:
            cumulative_time += step['duration_ms']
            lines.append(
                f"{cumulative_time:8.1f}ms | {step['action']}"
            )
        
        return '\n'.join(lines)
    
    def identify_bottlenecks(
        self,
        threshold_ms: float = 1000
    ) -> List[Dict]:
        """识别瓶颈"""
        bottlenecks = []
        
        for step in self.execution_graph:
            if step['duration_ms'] > threshold_ms:
                bottlenecks.append({
                    'step_id': step['step_id'],
                    'action': step['action'],
                    'duration_ms': step['duration_ms'],
                    'percentage': (
                        step['duration_ms'] /
                        sum(s['duration_ms'] for s in self.execution_graph)
                        * 100
                    )
                })
        
        return sorted(
            bottlenecks,
            key=lambda x: x['duration_ms'],
            reverse=True
        )

六、总结

6.1 核心要点

  1. 评估体系

    • 多维度指标
    • 基准测试
    • 负载测试
  2. 行为分析

    • 行为追踪
    • 异常检测
    • 模式识别
  3. 调试技术

    • 日志分析
    • 根因分析
    • 交互式调试

6.2 最佳实践

  1. 持续评估

    • 定期基准测试
    • 监控关键指标
    • 建立质量基线
  2. 快速定位

    • 完善的日志
    • 行为追踪
    • 可视化调试
  3. 系统方法

    • 标准化流程
    • 工具化支持
    • 知识沉淀

参考资料


分享这篇文章到:

上一篇文章
RocketMQ 安全加固与权限管理实战
下一篇文章
RocketMQ EventBridge 事件驱动架构实战