RAG 评估与监控体系构建
RAG 系统的质量评估和监控是保障生产稳定运行的关键。如何科学评估 RAG 效果?如何建立完善的监控体系?本文将详解 RAG 评估与监控的完整方案。
一、RAG 评估框架
1.1 评估维度
RAG 评估维度:
┌─────────────────────────────────────┐
│ 1. 检索质量(Retrieval) │
│ - 召回率(Recall) │
│ - 准确率(Precision) │
│ - NDCG │
├─────────────────────────────────────┤
│ 2. 生成质量(Generation) │
│ - 忠实度(Faithfulness) │
│ - 相关性(Relevance) │
│ - 准确性(Accuracy) │
├─────────────────────────────────────┤
│ 3. 端到端质量(End-to-End) │
│ - 答案正确性 │
│ - 用户满意度 │
│ - 任务完成率 │
├─────────────────────────────────────┤
│ 4. 系统性能(Performance) │
│ - 响应延迟 │
│ - 吞吐量 │
│ - 资源消耗 │
└─────────────────────────────────────┘
1.2 主流评估工具
| 工具 | 特点 | 适用场景 |
|---|---|---|
| RAGAS | 开源、指标全 | 离线评估 |
| TruLens | 实时、可视化 | 生产监控 |
| LangSmith | 一站式平台 | 全流程追踪 |
| Arize Phoenix | 调试友好 | 问题排查 |
二、RAGAS 评估框架
2.1 RAGAS 核心指标
# ragas_evaluation.py
from ragas import evaluate
from ragas.metrics import (
faithfulness,
answer_relevancy,
context_precision,
context_recall,
context_relevancy,
answer_correctness
)
from datasets import Dataset
class RagasEvaluator:
"""RAGAS 评估器"""
def __init__(self, llm, embedding_model):
"""
初始化
Args:
llm: LLM 模型
embedding_model: Embedding 模型
"""
self.llm = llm
self.embedding_model = embedding_model
def evaluate(
self,
questions: List[str],
answers: List[str],
contexts: List[List[str]],
ground_truths: List[List[str]]
) -> Dict:
"""
评估 RAG 系统
Args:
questions: 问题列表
answers: 答案列表
contexts: 上下文列表
ground_truths: 标准答案
Returns:
评估结果
"""
# 构建数据集
dataset = Dataset.from_dict({
'question': questions,
'answer': answers,
'contexts': contexts,
'ground_truths': ground_truths
})
# 评估
result = evaluate(
dataset,
metrics=[
faithfulness,
answer_relevancy,
context_precision,
context_recall,
context_relevancy,
answer_correctness
],
llm=self.llm,
embeddings=self.embedding_model
)
return result
2.2 指标详解
# ragas_metrics_explained.py
class RagasMetricsExplained:
"""RAGAS 指标详解"""
# 1. Faithfulness(忠实度)
# 衡量答案是否基于上下文,无幻觉
# 计算方式:从答案提取陈述,验证是否可从上下文推断
faithfulness_example = """
问题:公司的年收入是多少?
上下文:公司 2023 年财报显示收入增长 20%
答案:公司年收入为 1 亿元
Faithfulness = 0.5(上下文未提及具体金额)
"""
# 2. Answer Relevancy(答案相关性)
# 衡量答案与问题的相关程度
# 计算方式:生成反向问题,计算相似度
answer_relevancy_example = """
问题:如何提高 RAG 检索质量?
答案:应该使用 Cross-Encoder 重排序
Answer Relevancy = 0.8(直接相关)
"""
# 3. Context Precision(上下文精确度)
# 衡量相关上下文在检索结果中的排名
# 计算方式:相关文档的平均倒数排名
context_precision_example = """
标准答案文档:[doc1, doc3]
检索结果:[doc1, doc2, doc3, doc4]
Context Precision = (1/1 + 1/3) / 2 = 0.67
"""
# 4. Context Recall(上下文召回率)
# 衡量检索到的上下文包含多少标准答案信息
# 计算方式:标准答案中可在上下文中找到的比例
context_recall_example = """
标准答案:公司收入 1 亿,增长 20%
上下文:公司 2023 年收入增长 20%
Context Recall = 0.5(只包含增长率,未包含金额)
"""
# 5. Context Relevancy(上下文相关性)
# 衡量上下文与问题的相关程度
# 计算方式:上下文中相关句子的比例
context_relevancy_example = """
问题:公司的财务数据
上下文:公司成立于 2020 年,2023 年收入 1 亿
Context Relevancy = 0.5(只有一半相关)
"""
# 6. Answer Correctness(答案正确性)
# 衡量答案与标准答案的一致性
# 计算方式:语义相似度 + F1 分数
answer_correctness_example = """
标准答案:公司年收入 1 亿元
预测答案:公司年收入为 1 亿
Answer Correctness = 0.95(几乎一致)
"""
2.3 批量评估
# batch_evaluation.py
from typing import List, Dict
from tqdm import tqdm
class BatchEvaluator:
"""批量评估器"""
def __init__(self, ragas_evaluator):
self.evaluator = ragas_evaluator
def evaluate_batch(
self,
test_cases: List[Dict],
batch_size: int = 10
) -> Dict:
"""
批量评估
Args:
test_cases: 测试用例列表
batch_size: 批次大小
Returns:
评估结果
"""
all_results = []
for i in tqdm(range(0, len(test_cases), batch_size)):
batch = test_cases[i:i + batch_size]
questions = [t['question'] for t in batch]
answers = [t['answer'] for t in batch]
contexts = [t['contexts'] for t in batch]
ground_truths = [t['ground_truth'] for t in batch]
result = self.evaluator.evaluate(
questions, answers, contexts, ground_truths
)
all_results.append(result)
# 汇总结果
return self._aggregate_results(all_results)
def _aggregate_results(self, results: List) -> Dict:
"""汇总结果"""
import pandas as pd
df = pd.DataFrame(results)
return {
'mean': df.mean().to_dict(),
'std': df.std().to_dict(),
'min': df.min().to_dict(),
'max': df.max().to_dict(),
'distribution': df.describe().to_dict()
}
三、TruLens 实时监控
3.1 TruLens 基础
# trulens_monitoring.py
from trulens_eval import (
TruLlama,
Feedback,
Select
)
from trulens_eval.feedback import Groundedness
import numpy as np
class TruLensMonitor:
"""TruLens 实时监控器"""
def __init__(self, app):
"""
初始化
Args:
app: RAG 应用
"""
self.app = app
self.trulens = TruLlama(app)
def setup_feedback(self):
"""设置反馈函数"""
# 1. 上下文相关性
context_relevance = (
Feedback(self._context_relevance)
.on(Select.RecordCalls.retriever)
.on(Select.RecordCalls.generator)
)
# 2. 答案忠实度
groundedness = (
Feedback(Groundedness().groundedness_measure_with_starburst_context)
.on(Select.RecordCalls.retriever)
.on(Select.RecordCalls.generator)
)
# 3. 答案相关性
answer_relevance = (
Feedback(self._answer_relevance)
.on(Select.RecordCalls.generator)
)
# 添加到 TruLens
self.trulens.add_feedback(context_relevance)
self.trulens.add_feedback(groundedness)
self.trulens.add_feedback(answer_relevance)
def _context_relevance(self, query: str, contexts: List[str]) -> float:
"""评估上下文相关性"""
# 使用 LLM 评估
prompt = f"""
请评估以下上下文与问题的相关性(0-1):
问题:{query}
上下文:
{' '.join(contexts)}
请只输出 0-1 之间的分数。
"""
score = self._llm_generate(prompt)
return float(score)
def _answer_relevance(self, query: str, answer: str) -> float:
"""评估答案相关性"""
prompt = f"""
请评估以下答案与问题的相关性(0-1):
问题:{query}
答案:{answer}
请只输出 0-1 之间的分数。
"""
score = self._llm_generate(prompt)
return float(score)
def _llm_generate(self, prompt: str) -> str:
"""调用 LLM 生成"""
# 实现 LLM 调用
pass
def run_evaluation(self, test_queries: List[str]) -> Dict:
"""运行评估"""
results = []
for query in test_queries:
# 运行应用
with self.trulens as recording:
result = self.app.query(query)
# 获取反馈分数
feedback_results = recording.get_feedback_results()
results.append({
'query': query,
'answer': result.answer,
'feedback_scores': feedback_results
})
return results
3.2 仪表盘配置
# trulens_dashboard.py
from trulens_eval import TruLlamaDashboard
class TruLensDashboard:
"""TruLens 仪表盘"""
def __init__(self):
self.dashboard = TruLlamaDashboard()
def show_metrics_summary(self):
"""显示指标汇总"""
return self.dashboard.get_metrics_summary()
def show_feedback_distribution(self, metric_name: str):
"""显示反馈分布"""
return self.dashboard.plot_feedback_distribution(metric_name)
def show_trend_over_time(self, metric_name: str):
"""显示时间趋势"""
return self.dashboard.plot_metric_trend(metric_name)
def show_heatmap(self):
"""显示指标热力图"""
return self.dashboard.plot_correlation_heatmap()
def export_report(self, filepath: str):
"""导出报告"""
self.dashboard.export_report(filepath)
# 使用示例
dashboard = TruLensDashboard()
# 查看指标汇总
summary = dashboard.show_metrics_summary()
print(summary)
# 查看忠实度分布
faithfulness_dist = dashboard.show_feedback_distribution('groundedness')
# 查看时间趋势
trend = dashboard.show_trend_over_time('context_relevance')
# 导出报告
dashboard.export_report('rag_evaluation_report.html')
四、生产监控体系
4.1 监控指标设计
# production_metrics.py
from typing import Dict, List
from dataclasses import dataclass
from datetime import datetime
@dataclass
class RAGMetrics:
"""RAG 监控指标"""
# 检索指标
retrieval_latency_p50: float
retrieval_latency_p95: float
retrieval_latency_p99: float
retrieval_recall: float
retrieval_precision: float
# 重排序指标
reranking_latency: float
reranking_ndcg: float
# 生成指标
generation_latency: float
generation_token_count: int
generation_cost: float
# 质量指标
faithfulness_score: float
relevance_score: float
correctness_score: float
# 系统指标
total_latency: float
error_rate: float
qps: float
timestamp: datetime
class ProductionMonitor:
"""生产监控器"""
def __init__(self, metrics_collector):
self.collector = metrics_collector
def collect_metrics(self, request_id: str, result: Dict) -> RAGMetrics:
"""收集指标"""
return RAGMetrics(
retrieval_latency_p50=result['retrieval']['latency_p50'],
retrieval_latency_p95=result['retrieval']['latency_p95'],
retrieval_latency_p99=result['retrieval']['latency_p99'],
retrieval_recall=result['retrieval']['recall'],
retrieval_precision=result['retrieval']['precision'],
reranking_latency=result['reranking']['latency'],
reranking_ndcg=result['reranking']['ndcg'],
generation_latency=result['generation']['latency'],
generation_token_count=result['generation']['token_count'],
generation_cost=result['generation']['cost'],
faithfulness_score=result['quality']['faithfulness'],
relevance_score=result['quality']['relevance'],
correctness_score=result['quality']['correctness'],
total_latency=result['total_latency'],
error_rate=result['error_rate'],
qps=result['qps'],
timestamp=datetime.now()
)
def aggregate_metrics(
self,
metrics: List[RAGMetrics],
window: str = '1h'
) -> Dict:
"""聚合指标"""
import pandas as pd
df = pd.DataFrame([m.__dict__ for m in metrics])
df['timestamp'] = pd.to_datetime(df['timestamp'])
# 按时间窗口聚合
df = df.set_index('timestamp').resample(window).mean()
return df.to_dict()
4.2 告警配置
# alert_config.py
from typing import Dict, List
class AlertConfig:
"""告警配置"""
DEFAULT_ALERTS = {
'high_latency': {
'metric': 'total_latency',
'condition': '> 5000', # ms
'severity': 'warning',
'cooldown': 300 # 秒
},
'low_quality': {
'metric': 'faithfulness_score',
'condition': '< 0.6',
'severity': 'warning',
'cooldown': 300
},
'high_error_rate': {
'metric': 'error_rate',
'condition': '> 0.05',
'severity': 'critical',
'cooldown': 60
},
'low_recall': {
'metric': 'retrieval_recall',
'condition': '< 0.7',
'severity': 'warning',
'cooldown': 300
}
}
@classmethod
def check_alerts(cls, metrics: RAGMetrics) -> List[Dict]:
"""检查告警"""
alerts = []
for alert_name, config in cls.DEFAULT_ALERTS.items():
metric_value = getattr(metrics, config['metric'])
condition = config['condition']
# 解析条件
operator, threshold = condition[0], float(condition[1:])
# 检查是否触发
triggered = False
if operator == '>' and metric_value > threshold:
triggered = True
elif operator == '<' and metric_value < threshold:
triggered = True
elif operator == '=' and metric_value == threshold:
triggered = True
if triggered:
alerts.append({
'name': alert_name,
'metric': config['metric'],
'value': metric_value,
'threshold': threshold,
'severity': config['severity'],
'timestamp': metrics.timestamp
})
return alerts
4.3 日志记录
# rag_logger.py
import logging
import json
from datetime import datetime
class RAGLogger:
"""RAG 日志记录器"""
def __init__(self, log_path: str = 'logs/rag.log'):
self.logger = logging.getLogger('RAG')
self.logger.setLevel(logging.INFO)
# 文件处理器
handler = logging.FileHandler(log_path)
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
))
self.logger.addHandler(handler)
def log_request(
self,
request_id: str,
query: str,
metadata: Dict = None
):
"""记录请求"""
self.logger.info(json.dumps({
'event': 'request',
'request_id': request_id,
'query': query,
'metadata': metadata or {},
'timestamp': datetime.now().isoformat()
}))
def log_response(
self,
request_id: str,
answer: str,
contexts: List[str],
metrics: Dict,
latency: float
):
"""记录响应"""
self.logger.info(json.dumps({
'event': 'response',
'request_id': request_id,
'answer': answer,
'contexts': contexts,
'metrics': metrics,
'latency': latency,
'timestamp': datetime.now().isoformat()
}))
def log_error(
self,
request_id: str,
error: str,
stack_trace: str = None
):
"""记录错误"""
self.logger.error(json.dumps({
'event': 'error',
'request_id': request_id,
'error': error,
'stack_trace': stack_trace,
'timestamp': datetime.now().isoformat()
}))
五、质量保障体系
5.1 测试策略
# rag_testing.py
from typing import List, Dict
class RAGTesting:
"""RAG 测试框架"""
def __init__(self, rag_system):
self.rag = rag_system
def run_test_suite(self, test_cases: List[Dict]) -> Dict:
"""运行测试套件"""
results = {
'total': len(test_cases),
'passed': 0,
'failed': 0,
'details': []
}
for test_case in test_cases:
result = self._run_single_test(test_case)
results['details'].append(result)
if result['passed']:
results['passed'] += 1
else:
results['failed'] += 1
return results
def _run_single_test(self, test_case: Dict) -> Dict:
"""运行单个测试"""
query = test_case['query']
expected = test_case['expected']
min_quality = test_case.get('min_quality', 0.7)
# 执行查询
response = self.rag.query(query)
# 验证答案
answer_match = self._check_answer_match(
response['answer'],
expected
)
# 验证质量
quality_ok = all(
score >= min_quality
for score in response['quality_scores'].values()
)
passed = answer_match and quality_ok
return {
'query': query,
'expected': expected,
'actual': response['answer'],
'passed': passed,
'quality_scores': response['quality_scores']
}
def _check_answer_match(
self,
actual: str,
expected: List[str]
) -> bool:
"""检查答案匹配"""
# 语义匹配
for exp in expected:
if self._semantic_similarity(actual, exp) > 0.8:
return True
return False
def _semantic_similarity(
self,
text1: str,
text2: str
) -> float:
"""计算语义相似度"""
# 使用 Embedding 计算余弦相似度
pass
5.2 回归测试
# regression_testing.py
class RAGRegressionTest:
"""RAG 回归测试"""
def __init__(self, baseline_results: Dict):
self.baseline = baseline_results
def run_regression_test(
self,
current_results: Dict,
tolerance: float = 0.05
) -> Dict:
"""
运行回归测试
Args:
current_results: 当前结果
tolerance: 允许偏差
"""
regressions = []
for metric in ['faithfulness', 'relevance', 'correctness']:
baseline_score = self.baseline[metric]
current_score = current_results[metric]
change = current_score - baseline_score
if change < -tolerance:
regressions.append({
'metric': metric,
'baseline': baseline_score,
'current': current_score,
'change': change,
'severity': 'high' if change < -0.1 else 'medium'
})
return {
'passed': len(regressions) == 0,
'regressions': regressions,
'summary': self._generate_summary(regressions)
}
def _generate_summary(self, regressions: List) -> str:
"""生成回归报告"""
if not regressions:
return "无回归,质量稳定 ✓"
summary = "发现质量回归:\n"
for reg in regressions:
summary += (
f"- {reg['metric']}: "
f"{reg['baseline']:.2f} → {reg['current']:.2f} "
f"({reg['change']:+.2f})\n"
)
return summary
六、实战案例
6.1 完整评估流程
# complete_evaluation.py
class CompleteRAGEvaluation:
"""完整 RAG 评估流程"""
def __init__(self, config: Dict):
self.config = config
self.ragas_evaluator = RagasEvaluator(
config['llm'],
config['embedding_model']
)
self.trulens_monitor = TruLensMonitor(config['app'])
self.logger = RAGLogger()
def run_full_evaluation(self, test_dataset: List[Dict]) -> Dict:
"""运行完整评估"""
results = {}
# 1. 离线评估(RAGAS)
print("运行 RAGAS 评估...")
results['ragas'] = self.ragas_evaluator.evaluate(
test_dataset
)
# 2. 实时监控(TruLens)
print("运行 TruLens 监控...")
self.trulens_monitor.setup_feedback()
results['trulens'] = self.trulens_monitor.run_evaluation(
[t['query'] for t in test_dataset]
)
# 3. 性能测试
print("运行性能测试...")
results['performance'] = self._run_performance_test(test_dataset)
# 4. 生成报告
print("生成评估报告...")
results['report'] = self._generate_report(results)
return results
def _run_performance_test(self, test_dataset: List[Dict]) -> Dict:
"""运行性能测试"""
latencies = []
for test_case in test_dataset:
start = time.time()
self.config['app'].query(test_case['query'])
latencies.append(time.time() - start)
return {
'p50': np.percentile(latencies, 50),
'p95': np.percentile(latencies, 95),
'p99': np.percentile(latencies, 99),
'qps': len(test_dataset) / sum(latencies)
}
def _generate_report(self, results: Dict) -> str:
"""生成评估报告"""
report = """
# RAG 系统评估报告
## 1. RAGAS 指标
"""
for metric, score in results['ragas'].items():
report += f"- {metric}: {score:.3f}\n"
report += "\n## 2. 性能指标\n"
report += f"- P50 延迟:{results['performance']['p50']:.2f}s\n"
report += f"- P95 延迟:{results['performance']['p95']:.2f}s\n"
report += f"- QPS: {results['performance']['qps']:.1f}\n"
return report
七、总结
7.1 核心要点
-
评估框架选择
- 离线评估:RAGAS
- 实时监控:TruLens
- 全流程:LangSmith
-
关键指标
- 检索:Recall、Precision、NDCG
- 生成:Faithfulness、Relevance
- 端到端:Correctness
-
监控体系
- 实时指标收集
- 自动告警
- 日志追踪
7.2 最佳实践
-
建立基线
- 定期运行评估
- 记录历史数据
- 设置质量阈值
-
持续监控
- 关键指标仪表板
- 自动告警通知
- 异常快速定位
-
回归测试
- 每次变更前测试
- 设置回归阈值
- 自动化 CI/CD
参考资料