Skip to content
清晨的一缕阳光
返回

RAG 评估与监控体系构建

RAG 评估与监控体系构建

RAG 系统的质量评估和监控是保障生产稳定运行的关键。如何科学评估 RAG 效果?如何建立完善的监控体系?本文将详解 RAG 评估与监控的完整方案。

一、RAG 评估框架

1.1 评估维度

RAG 评估维度:

┌─────────────────────────────────────┐
│ 1. 检索质量(Retrieval)            │
│    - 召回率(Recall)               │
│    - 准确率(Precision)            │
│    - NDCG                           │
├─────────────────────────────────────┤
│ 2. 生成质量(Generation)           │
│    - 忠实度(Faithfulness)         │
│    - 相关性(Relevance)            │
│    - 准确性(Accuracy)             │
├─────────────────────────────────────┤
│ 3. 端到端质量(End-to-End)         │
│    - 答案正确性                     │
│    - 用户满意度                     │
│    - 任务完成率                     │
├─────────────────────────────────────┤
│ 4. 系统性能(Performance)          │
│    - 响应延迟                       │
│    - 吞吐量                         │
│    - 资源消耗                       │
└─────────────────────────────────────┘

1.2 主流评估工具

工具特点适用场景
RAGAS开源、指标全离线评估
TruLens实时、可视化生产监控
LangSmith一站式平台全流程追踪
Arize Phoenix调试友好问题排查

二、RAGAS 评估框架

2.1 RAGAS 核心指标

# ragas_evaluation.py
from ragas import evaluate
from ragas.metrics import (
    faithfulness,
    answer_relevancy,
    context_precision,
    context_recall,
    context_relevancy,
    answer_correctness
)
from datasets import Dataset

class RagasEvaluator:
    """RAGAS 评估器"""
    
    def __init__(self, llm, embedding_model):
        """
        初始化
        
        Args:
            llm: LLM 模型
            embedding_model: Embedding 模型
        """
        self.llm = llm
        self.embedding_model = embedding_model
    
    def evaluate(
        self,
        questions: List[str],
        answers: List[str],
        contexts: List[List[str]],
        ground_truths: List[List[str]]
    ) -> Dict:
        """
        评估 RAG 系统
        
        Args:
            questions: 问题列表
            answers: 答案列表
            contexts: 上下文列表
            ground_truths: 标准答案
        
        Returns:
            评估结果
        """
        # 构建数据集
        dataset = Dataset.from_dict({
            'question': questions,
            'answer': answers,
            'contexts': contexts,
            'ground_truths': ground_truths
        })
        
        # 评估
        result = evaluate(
            dataset,
            metrics=[
                faithfulness,
                answer_relevancy,
                context_precision,
                context_recall,
                context_relevancy,
                answer_correctness
            ],
            llm=self.llm,
            embeddings=self.embedding_model
        )
        
        return result

2.2 指标详解

# ragas_metrics_explained.py

class RagasMetricsExplained:
    """RAGAS 指标详解"""
    
    # 1. Faithfulness(忠实度)
    # 衡量答案是否基于上下文,无幻觉
    # 计算方式:从答案提取陈述,验证是否可从上下文推断
    faithfulness_example = """
问题:公司的年收入是多少?
上下文:公司 2023 年财报显示收入增长 20%
答案:公司年收入为 1 亿元

Faithfulness = 0.5(上下文未提及具体金额)
"""
    
    # 2. Answer Relevancy(答案相关性)
    # 衡量答案与问题的相关程度
    # 计算方式:生成反向问题,计算相似度
    answer_relevancy_example = """
问题:如何提高 RAG 检索质量?
答案:应该使用 Cross-Encoder 重排序

Answer Relevancy = 0.8(直接相关)
"""
    
    # 3. Context Precision(上下文精确度)
    # 衡量相关上下文在检索结果中的排名
    # 计算方式:相关文档的平均倒数排名
    context_precision_example = """
标准答案文档:[doc1, doc3]
检索结果:[doc1, doc2, doc3, doc4]

Context Precision = (1/1 + 1/3) / 2 = 0.67
"""
    
    # 4. Context Recall(上下文召回率)
    # 衡量检索到的上下文包含多少标准答案信息
    # 计算方式:标准答案中可在上下文中找到的比例
    context_recall_example = """
标准答案:公司收入 1 亿,增长 20%
上下文:公司 2023 年收入增长 20%

Context Recall = 0.5(只包含增长率,未包含金额)
"""
    
    # 5. Context Relevancy(上下文相关性)
    # 衡量上下文与问题的相关程度
    # 计算方式:上下文中相关句子的比例
    context_relevancy_example = """
问题:公司的财务数据
上下文:公司成立于 2020 年,2023 年收入 1 亿

Context Relevancy = 0.5(只有一半相关)
"""
    
    # 6. Answer Correctness(答案正确性)
    # 衡量答案与标准答案的一致性
    # 计算方式:语义相似度 + F1 分数
    answer_correctness_example = """
标准答案:公司年收入 1 亿元
预测答案:公司年收入为 1 亿

Answer Correctness = 0.95(几乎一致)
"""

2.3 批量评估

# batch_evaluation.py
from typing import List, Dict
from tqdm import tqdm

class BatchEvaluator:
    """批量评估器"""
    
    def __init__(self, ragas_evaluator):
        self.evaluator = ragas_evaluator
    
    def evaluate_batch(
        self,
        test_cases: List[Dict],
        batch_size: int = 10
    ) -> Dict:
        """
        批量评估
        
        Args:
            test_cases: 测试用例列表
            batch_size: 批次大小
        
        Returns:
            评估结果
        """
        all_results = []
        
        for i in tqdm(range(0, len(test_cases), batch_size)):
            batch = test_cases[i:i + batch_size]
            
            questions = [t['question'] for t in batch]
            answers = [t['answer'] for t in batch]
            contexts = [t['contexts'] for t in batch]
            ground_truths = [t['ground_truth'] for t in batch]
            
            result = self.evaluator.evaluate(
                questions, answers, contexts, ground_truths
            )
            all_results.append(result)
        
        # 汇总结果
        return self._aggregate_results(all_results)
    
    def _aggregate_results(self, results: List) -> Dict:
        """汇总结果"""
        import pandas as pd
        
        df = pd.DataFrame(results)
        
        return {
            'mean': df.mean().to_dict(),
            'std': df.std().to_dict(),
            'min': df.min().to_dict(),
            'max': df.max().to_dict(),
            'distribution': df.describe().to_dict()
        }

三、TruLens 实时监控

3.1 TruLens 基础

# trulens_monitoring.py
from trulens_eval import (
    TruLlama,
    Feedback,
    Select
)
from trulens_eval.feedback import Groundedness
import numpy as np

class TruLensMonitor:
    """TruLens 实时监控器"""
    
    def __init__(self, app):
        """
        初始化
        
        Args:
            app: RAG 应用
        """
        self.app = app
        self.trulens = TruLlama(app)
    
    def setup_feedback(self):
        """设置反馈函数"""
        # 1. 上下文相关性
        context_relevance = (
            Feedback(self._context_relevance)
            .on(Select.RecordCalls.retriever)
            .on(Select.RecordCalls.generator)
        )
        
        # 2. 答案忠实度
        groundedness = (
            Feedback(Groundedness().groundedness_measure_with_starburst_context)
            .on(Select.RecordCalls.retriever)
            .on(Select.RecordCalls.generator)
        )
        
        # 3. 答案相关性
        answer_relevance = (
            Feedback(self._answer_relevance)
            .on(Select.RecordCalls.generator)
        )
        
        # 添加到 TruLens
        self.trulens.add_feedback(context_relevance)
        self.trulens.add_feedback(groundedness)
        self.trulens.add_feedback(answer_relevance)
    
    def _context_relevance(self, query: str, contexts: List[str]) -> float:
        """评估上下文相关性"""
        # 使用 LLM 评估
        prompt = f"""
请评估以下上下文与问题的相关性(0-1):

问题:{query}

上下文:
{' '.join(contexts)}

请只输出 0-1 之间的分数。
"""
        score = self._llm_generate(prompt)
        return float(score)
    
    def _answer_relevance(self, query: str, answer: str) -> float:
        """评估答案相关性"""
        prompt = f"""
请评估以下答案与问题的相关性(0-1):

问题:{query}

答案:{answer}

请只输出 0-1 之间的分数。
"""
        score = self._llm_generate(prompt)
        return float(score)
    
    def _llm_generate(self, prompt: str) -> str:
        """调用 LLM 生成"""
        # 实现 LLM 调用
        pass
    
    def run_evaluation(self, test_queries: List[str]) -> Dict:
        """运行评估"""
        results = []
        
        for query in test_queries:
            # 运行应用
            with self.trulens as recording:
                result = self.app.query(query)
            
            # 获取反馈分数
            feedback_results = recording.get_feedback_results()
            
            results.append({
                'query': query,
                'answer': result.answer,
                'feedback_scores': feedback_results
            })
        
        return results

3.2 仪表盘配置

# trulens_dashboard.py
from trulens_eval import TruLlamaDashboard

class TruLensDashboard:
    """TruLens 仪表盘"""
    
    def __init__(self):
        self.dashboard = TruLlamaDashboard()
    
    def show_metrics_summary(self):
        """显示指标汇总"""
        return self.dashboard.get_metrics_summary()
    
    def show_feedback_distribution(self, metric_name: str):
        """显示反馈分布"""
        return self.dashboard.plot_feedback_distribution(metric_name)
    
    def show_trend_over_time(self, metric_name: str):
        """显示时间趋势"""
        return self.dashboard.plot_metric_trend(metric_name)
    
    def show_heatmap(self):
        """显示指标热力图"""
        return self.dashboard.plot_correlation_heatmap()
    
    def export_report(self, filepath: str):
        """导出报告"""
        self.dashboard.export_report(filepath)

# 使用示例
dashboard = TruLensDashboard()

# 查看指标汇总
summary = dashboard.show_metrics_summary()
print(summary)

# 查看忠实度分布
faithfulness_dist = dashboard.show_feedback_distribution('groundedness')

# 查看时间趋势
trend = dashboard.show_trend_over_time('context_relevance')

# 导出报告
dashboard.export_report('rag_evaluation_report.html')

四、生产监控体系

4.1 监控指标设计

# production_metrics.py
from typing import Dict, List
from dataclasses import dataclass
from datetime import datetime

@dataclass
class RAGMetrics:
    """RAG 监控指标"""
    # 检索指标
    retrieval_latency_p50: float
    retrieval_latency_p95: float
    retrieval_latency_p99: float
    retrieval_recall: float
    retrieval_precision: float
    
    # 重排序指标
    reranking_latency: float
    reranking_ndcg: float
    
    # 生成指标
    generation_latency: float
    generation_token_count: int
    generation_cost: float
    
    # 质量指标
    faithfulness_score: float
    relevance_score: float
    correctness_score: float
    
    # 系统指标
    total_latency: float
    error_rate: float
    qps: float
    
    timestamp: datetime

class ProductionMonitor:
    """生产监控器"""
    
    def __init__(self, metrics_collector):
        self.collector = metrics_collector
    
    def collect_metrics(self, request_id: str, result: Dict) -> RAGMetrics:
        """收集指标"""
        return RAGMetrics(
            retrieval_latency_p50=result['retrieval']['latency_p50'],
            retrieval_latency_p95=result['retrieval']['latency_p95'],
            retrieval_latency_p99=result['retrieval']['latency_p99'],
            retrieval_recall=result['retrieval']['recall'],
            retrieval_precision=result['retrieval']['precision'],
            reranking_latency=result['reranking']['latency'],
            reranking_ndcg=result['reranking']['ndcg'],
            generation_latency=result['generation']['latency'],
            generation_token_count=result['generation']['token_count'],
            generation_cost=result['generation']['cost'],
            faithfulness_score=result['quality']['faithfulness'],
            relevance_score=result['quality']['relevance'],
            correctness_score=result['quality']['correctness'],
            total_latency=result['total_latency'],
            error_rate=result['error_rate'],
            qps=result['qps'],
            timestamp=datetime.now()
        )
    
    def aggregate_metrics(
        self,
        metrics: List[RAGMetrics],
        window: str = '1h'
    ) -> Dict:
        """聚合指标"""
        import pandas as pd
        
        df = pd.DataFrame([m.__dict__ for m in metrics])
        df['timestamp'] = pd.to_datetime(df['timestamp'])
        
        # 按时间窗口聚合
        df = df.set_index('timestamp').resample(window).mean()
        
        return df.to_dict()

4.2 告警配置

# alert_config.py
from typing import Dict, List

class AlertConfig:
    """告警配置"""
    
    DEFAULT_ALERTS = {
        'high_latency': {
            'metric': 'total_latency',
            'condition': '> 5000',  # ms
            'severity': 'warning',
            'cooldown': 300  # 秒
        },
        'low_quality': {
            'metric': 'faithfulness_score',
            'condition': '< 0.6',
            'severity': 'warning',
            'cooldown': 300
        },
        'high_error_rate': {
            'metric': 'error_rate',
            'condition': '> 0.05',
            'severity': 'critical',
            'cooldown': 60
        },
        'low_recall': {
            'metric': 'retrieval_recall',
            'condition': '< 0.7',
            'severity': 'warning',
            'cooldown': 300
        }
    }
    
    @classmethod
    def check_alerts(cls, metrics: RAGMetrics) -> List[Dict]:
        """检查告警"""
        alerts = []
        
        for alert_name, config in cls.DEFAULT_ALERTS.items():
            metric_value = getattr(metrics, config['metric'])
            condition = config['condition']
            
            # 解析条件
            operator, threshold = condition[0], float(condition[1:])
            
            # 检查是否触发
            triggered = False
            if operator == '>' and metric_value > threshold:
                triggered = True
            elif operator == '<' and metric_value < threshold:
                triggered = True
            elif operator == '=' and metric_value == threshold:
                triggered = True
            
            if triggered:
                alerts.append({
                    'name': alert_name,
                    'metric': config['metric'],
                    'value': metric_value,
                    'threshold': threshold,
                    'severity': config['severity'],
                    'timestamp': metrics.timestamp
                })
        
        return alerts

4.3 日志记录

# rag_logger.py
import logging
import json
from datetime import datetime

class RAGLogger:
    """RAG 日志记录器"""
    
    def __init__(self, log_path: str = 'logs/rag.log'):
        self.logger = logging.getLogger('RAG')
        self.logger.setLevel(logging.INFO)
        
        # 文件处理器
        handler = logging.FileHandler(log_path)
        handler.setFormatter(logging.Formatter(
            '%(asctime)s - %(levelname)s - %(message)s'
        ))
        self.logger.addHandler(handler)
    
    def log_request(
        self,
        request_id: str,
        query: str,
        metadata: Dict = None
    ):
        """记录请求"""
        self.logger.info(json.dumps({
            'event': 'request',
            'request_id': request_id,
            'query': query,
            'metadata': metadata or {},
            'timestamp': datetime.now().isoformat()
        }))
    
    def log_response(
        self,
        request_id: str,
        answer: str,
        contexts: List[str],
        metrics: Dict,
        latency: float
    ):
        """记录响应"""
        self.logger.info(json.dumps({
            'event': 'response',
            'request_id': request_id,
            'answer': answer,
            'contexts': contexts,
            'metrics': metrics,
            'latency': latency,
            'timestamp': datetime.now().isoformat()
        }))
    
    def log_error(
        self,
        request_id: str,
        error: str,
        stack_trace: str = None
    ):
        """记录错误"""
        self.logger.error(json.dumps({
            'event': 'error',
            'request_id': request_id,
            'error': error,
            'stack_trace': stack_trace,
            'timestamp': datetime.now().isoformat()
        }))

五、质量保障体系

5.1 测试策略

# rag_testing.py
from typing import List, Dict

class RAGTesting:
    """RAG 测试框架"""
    
    def __init__(self, rag_system):
        self.rag = rag_system
    
    def run_test_suite(self, test_cases: List[Dict]) -> Dict:
        """运行测试套件"""
        results = {
            'total': len(test_cases),
            'passed': 0,
            'failed': 0,
            'details': []
        }
        
        for test_case in test_cases:
            result = self._run_single_test(test_case)
            results['details'].append(result)
            
            if result['passed']:
                results['passed'] += 1
            else:
                results['failed'] += 1
        
        return results
    
    def _run_single_test(self, test_case: Dict) -> Dict:
        """运行单个测试"""
        query = test_case['query']
        expected = test_case['expected']
        min_quality = test_case.get('min_quality', 0.7)
        
        # 执行查询
        response = self.rag.query(query)
        
        # 验证答案
        answer_match = self._check_answer_match(
            response['answer'],
            expected
        )
        
        # 验证质量
        quality_ok = all(
            score >= min_quality
            for score in response['quality_scores'].values()
        )
        
        passed = answer_match and quality_ok
        
        return {
            'query': query,
            'expected': expected,
            'actual': response['answer'],
            'passed': passed,
            'quality_scores': response['quality_scores']
        }
    
    def _check_answer_match(
        self,
        actual: str,
        expected: List[str]
    ) -> bool:
        """检查答案匹配"""
        # 语义匹配
        for exp in expected:
            if self._semantic_similarity(actual, exp) > 0.8:
                return True
        return False
    
    def _semantic_similarity(
        self,
        text1: str,
        text2: str
    ) -> float:
        """计算语义相似度"""
        # 使用 Embedding 计算余弦相似度
        pass

5.2 回归测试

# regression_testing.py
class RAGRegressionTest:
    """RAG 回归测试"""
    
    def __init__(self, baseline_results: Dict):
        self.baseline = baseline_results
    
    def run_regression_test(
        self,
        current_results: Dict,
        tolerance: float = 0.05
    ) -> Dict:
        """
        运行回归测试
        
        Args:
            current_results: 当前结果
            tolerance: 允许偏差
        """
        regressions = []
        
        for metric in ['faithfulness', 'relevance', 'correctness']:
            baseline_score = self.baseline[metric]
            current_score = current_results[metric]
            
            change = current_score - baseline_score
            
            if change < -tolerance:
                regressions.append({
                    'metric': metric,
                    'baseline': baseline_score,
                    'current': current_score,
                    'change': change,
                    'severity': 'high' if change < -0.1 else 'medium'
                })
        
        return {
            'passed': len(regressions) == 0,
            'regressions': regressions,
            'summary': self._generate_summary(regressions)
        }
    
    def _generate_summary(self, regressions: List) -> str:
        """生成回归报告"""
        if not regressions:
            return "无回归,质量稳定 ✓"
        
        summary = "发现质量回归:\n"
        for reg in regressions:
            summary += (
                f"- {reg['metric']}: "
                f"{reg['baseline']:.2f}{reg['current']:.2f} "
                f"({reg['change']:+.2f})\n"
            )
        
        return summary

六、实战案例

6.1 完整评估流程

# complete_evaluation.py
class CompleteRAGEvaluation:
    """完整 RAG 评估流程"""
    
    def __init__(self, config: Dict):
        self.config = config
        self.ragas_evaluator = RagasEvaluator(
            config['llm'],
            config['embedding_model']
        )
        self.trulens_monitor = TruLensMonitor(config['app'])
        self.logger = RAGLogger()
    
    def run_full_evaluation(self, test_dataset: List[Dict]) -> Dict:
        """运行完整评估"""
        results = {}
        
        # 1. 离线评估(RAGAS)
        print("运行 RAGAS 评估...")
        results['ragas'] = self.ragas_evaluator.evaluate(
            test_dataset
        )
        
        # 2. 实时监控(TruLens)
        print("运行 TruLens 监控...")
        self.trulens_monitor.setup_feedback()
        results['trulens'] = self.trulens_monitor.run_evaluation(
            [t['query'] for t in test_dataset]
        )
        
        # 3. 性能测试
        print("运行性能测试...")
        results['performance'] = self._run_performance_test(test_dataset)
        
        # 4. 生成报告
        print("生成评估报告...")
        results['report'] = self._generate_report(results)
        
        return results
    
    def _run_performance_test(self, test_dataset: List[Dict]) -> Dict:
        """运行性能测试"""
        latencies = []
        
        for test_case in test_dataset:
            start = time.time()
            self.config['app'].query(test_case['query'])
            latencies.append(time.time() - start)
        
        return {
            'p50': np.percentile(latencies, 50),
            'p95': np.percentile(latencies, 95),
            'p99': np.percentile(latencies, 99),
            'qps': len(test_dataset) / sum(latencies)
        }
    
    def _generate_report(self, results: Dict) -> str:
        """生成评估报告"""
        report = """
# RAG 系统评估报告

## 1. RAGAS 指标
"""
        for metric, score in results['ragas'].items():
            report += f"- {metric}: {score:.3f}\n"
        
        report += "\n## 2. 性能指标\n"
        report += f"- P50 延迟:{results['performance']['p50']:.2f}s\n"
        report += f"- P95 延迟:{results['performance']['p95']:.2f}s\n"
        report += f"- QPS: {results['performance']['qps']:.1f}\n"
        
        return report

七、总结

7.1 核心要点

  1. 评估框架选择

    • 离线评估:RAGAS
    • 实时监控:TruLens
    • 全流程:LangSmith
  2. 关键指标

    • 检索:Recall、Precision、NDCG
    • 生成:Faithfulness、Relevance
    • 端到端:Correctness
  3. 监控体系

    • 实时指标收集
    • 自动告警
    • 日志追踪

7.2 最佳实践

  1. 建立基线

    • 定期运行评估
    • 记录历史数据
    • 设置质量阈值
  2. 持续监控

    • 关键指标仪表板
    • 自动告警通知
    • 异常快速定位
  3. 回归测试

    • 每次变更前测试
    • 设置回归阈值
    • 自动化 CI/CD

参考资料


分享这篇文章到:

上一篇文章
三国志之曹操
下一篇文章
Redis RDB 持久化详解