Skip to content
清晨的一缕阳光
返回

AI 应用生产环境问题排查指南

AI 应用生产环境问题排查指南

生产环境中的 AI 应用会遇到各种问题。如何快速定位问题?如何有效解决?本文总结 AI 应用生产环境问题排查的实战经验。

一、问题排查框架

1.1 排查流程

问题排查流程:

┌─────────────────────────────────────┐
│ 1. 问题识别                          │
│    - 监控告警                        │
│    - 用户反馈                        │
│    - 日志分析                        │
├─────────────────────────────────────┤
│ 2. 问题分类                          │
│    - 功能问题                        │
│    - 性能问题                        │
│    - 质量问题                        │
│    - 资源问题                        │
├─────────────────────────────────────┤
│ 3. 问题定位                          │
│    - 日志分析                        │
│    - 指标分析                        │
│    - 链路追踪                        │
├─────────────────────────────────────┤
│ 4. 问题解决                          │
│    - 临时方案                        │
│    - 永久修复                        │
│    - 验证确认                        │
├─────────────────────────────────────┤
│ 5. 问题复盘                          │
│    - 根因分析                        │
│    - 改进措施                        │
│    - 知识沉淀                        │
└─────────────────────────────────────┘

1.2 排查工具

# troubleshooting_tools.py
from typing import Dict, List

class TroubleshootingToolkit:
    """排查工具包"""
    
    def __init__(self):
        self.tools = {
            'logging': self._init_logging_tool(),
            'metrics': self._init_metrics_tool(),
            'tracing': self._init_tracing_tool(),
            'profiling': self._init_profiling_tool()
        }
    
    def _init_logging_tool(self) -> Dict:
        """日志工具"""
        return {
            'name': 'ELK Stack',
            'purpose': '日志收集和分析',
            'queries': [
                'error logs in last 1 hour',
                'slow queries > 5s',
                'exception stack traces'
            ]
        }
    
    def _init_metrics_tool(self) -> Dict:
        """指标工具"""
        return {
            'name': 'Prometheus + Grafana',
            'purpose': '指标监控和告警',
            'key_metrics': [
                'request_rate',
                'error_rate',
                'latency_p99',
                'resource_usage'
            ]
        }
    
    def _init_tracing_tool(self) -> Dict:
        """链路追踪工具"""
        return {
            'name': 'Jaeger / Zipkin',
            'purpose': '分布式链路追踪',
            'use_cases': [
                'identify slow spans',
                'trace request flow',
                'find bottlenecks'
            ]
        }
    
    def _init_profiling_tool(self) -> Dict:
        """性能分析工具"""
        return {
            'name': 'py-spy / cProfile',
            'purpose': '代码性能分析',
            'use_cases': [
                'identify hot paths',
                'find memory leaks',
                'analyze cpu usage'
            ]
        }

二、常见问题排查

2.1 响应慢问题

# slow_response_troubleshooting.py
from typing import Dict, List

class SlowResponseTroubleshooter:
    """响应慢问题排查器"""
    
    def diagnose(self, symptoms: Dict) -> List[Dict]:
        """诊断问题"""
        possible_causes = []
        
        # 检查 LLM 延迟
        if symptoms.get('llm_latency', 0) > 3000:
            possible_causes.append({
                'cause': 'LLM API 响应慢',
                'likelihood': 'high',
                'verification': '检查 LLM API 延迟指标',
                'solution': '启用缓存或切换到更快的模型'
            })
        
        # 检查检索延迟
        if symptoms.get('retrieval_latency', 0) > 1000:
            possible_causes.append({
                'cause': '向量检索慢',
                'likelihood': 'high',
                'verification': '检查向量数据库性能',
                'solution': '使用近似检索或优化索引'
            })
        
        # 检查网络延迟
        if symptoms.get('network_latency', 0) > 500:
            possible_causes.append({
                'cause': '网络延迟高',
                'likelihood': 'medium',
                'verification': '检查网络 RTT',
                'solution': '优化网络配置或使用 CDN'
            })
        
        # 检查资源争用
        if symptoms.get('resource_contention', False):
            possible_causes.append({
                'cause': '资源争用',
                'likelihood': 'medium',
                'verification': '检查 CPU/内存使用率',
                'solution': '扩容或优化资源分配'
            })
        
        return sorted(
            possible_causes,
            key=lambda x: {'high': 0, 'medium': 1, 'low': 2}[x['likelihood']]
        )
    
    def get_checklist(self) -> List[str]:
        """获取排查清单"""
        return [
            '1. 检查 LLM API 延迟',
            '2. 检查向量检索延迟',
            '3. 检查网络延迟',
            '4. 检查缓存命中率',
            '5. 检查资源使用率',
            '6. 检查并发连接数',
            '7. 检查数据库连接池',
            '8. 检查 GC 情况'
        ]

# 排查案例
"""
案例:RAG 系统响应突然变慢

症状:
- P99 延迟从 3 秒增加到 10 秒
- 错误率正常
- QPS 正常

排查步骤:
1. 检查监控面板,发现 LLM 延迟正常
2. 检查向量检索,发现检索延迟从 100ms 增加到 2 秒
3. 检查向量数据库,发现索引重建中
4. 检查日志,发现自动索引重建触发

根因:
- 向量数据库配置了自动索引重建
- 重建期间检索性能下降

解决方案:
1. 临时:切换到备用索引
2. 永久:调整索引重建时间为低峰期
3. 预防:添加索引重建告警
"""

2.2 质量问题

# quality_issue_troubleshooting.py
from typing import Dict, List

class QualityIssueTroubleshooter:
    """质量问题排查器"""
    
    def diagnose(self, symptoms: Dict) -> List[Dict]:
        """诊断质量问题"""
        possible_causes = []
        
        # 检查 Prompt 质量
        if symptoms.get('prompt_issues', False):
            possible_causes.append({
                'cause': 'Prompt 设计问题',
                'likelihood': 'high',
                'verification': '检查 Prompt 模板和变量',
                'solution': '优化 Prompt 或增加示例'
            })
        
        # 检查上下文质量
        if symptoms.get('context_issues', False):
            possible_causes.append({
                'cause': '检索上下文不相关',
                'likelihood': 'high',
                'verification': '检查检索结果相关性',
                'solution': '优化检索策略或重排序'
            })
        
        # 检查模型问题
        if symptoms.get('model_degradation', False):
            possible_causes.append({
                'cause': '模型性能下降',
                'likelihood': 'medium',
                'verification': '对比不同模型输出',
                'solution': '切换模型或微调'
            })
        
        # 检查数据问题
        if symptoms.get('data_issues', False):
            possible_causes.append({
                'cause': '知识库数据问题',
                'likelihood': 'medium',
                'verification': '检查数据质量和时效性',
                'solution': '更新或清洗数据'
            })
        
        return possible_causes
    
    def get_quality_checklist(self) -> List[str]:
        """质量检查清单"""
        return [
            '1. 检查 Prompt 是否清晰明确',
            '2. 检查上下文是否相关',
            '3. 检查检索结果质量',
            '4. 检查模型版本是否变更',
            '5. 检查知识库数据是否过期',
            '6. 检查用户反馈',
            '7. 运行回归测试',
            '8. 对比历史输出'
        ]

# 排查案例
"""
案例:Agent 输出质量突然下降

症状:
- 用户反馈答案不准确
- 答案相关性下降
- 幻觉增加

排查步骤:
1. 检查 Prompt 模板,无变更
2. 检查检索结果,发现相关性下降
3. 检查向量数据库,发现新增了大量低质量文档
4. 检查数据导入日志,发现批量导入了未清洗数据

根因:
- 新知识库数据质量差
- 影响了检索结果相关性

解决方案:
1. 临时:回滚到之前的数据版本
2. 永久:建立数据质量检查流程
3. 预防:添加数据质量监控
"""

2.3 资源问题

# resource_issue_troubleshooting.py
from typing import Dict, List

class ResourceIssueTroubleshooter:
    """资源问题排查器"""
    
    def diagnose(self, symptoms: Dict) -> List[Dict]:
        """诊断资源问题"""
        possible_causes = []
        
        # 检查内存问题
        if symptoms.get('memory_high', False):
            possible_causes.append({
                'cause': '内存泄漏',
                'likelihood': 'high',
                'verification': '检查内存使用趋势',
                'solution': '分析堆 dump,修复泄漏'
            })
            possible_causes.append({
                'cause': '缓存过大',
                'likelihood': 'medium',
                'verification': '检查缓存大小',
                'solution': '调整缓存策略'
            })
        
        # 检查 CPU 问题
        if symptoms.get('cpu_high', False):
            possible_causes.append({
                'cause': '计算密集型任务',
                'likelihood': 'high',
                'verification': '检查 CPU 剖析',
                'solution': '优化算法或扩容'
            })
            possible_causes.append({
                'cause': '死循环',
                'likelihood': 'low',
                'verification': '检查线程状态',
                'solution': '修复代码逻辑'
            })
        
        # 检查磁盘问题
        if symptoms.get('disk_high', False):
            possible_causes.append({
                'cause': '日志积累',
                'likelihood': 'high',
                'verification': '检查日志文件大小',
                'solution': '配置日志轮转'
            })
            possible_causes.append({
                'cause': '数据增长',
                'likelihood': 'medium',
                'verification': '检查数据目录',
                'solution': '清理或扩容'
            })
        
        return possible_causes
    
    def get_resource_checklist(self) -> List[str]:
        """资源检查清单"""
        return [
            '1. 检查内存使用率和趋势',
            '2. 检查 CPU 使用率和负载',
            '3. 检查磁盘使用率和 IO',
            '4. 检查网络带宽',
            '5. 检查连接数',
            '6. 检查线程数',
            '7. 检查 GC 情况',
            '8. 检查文件描述符'
        ]

# 排查案例
"""
案例:内存使用持续增长

症状:
- 内存使用每天增长 10%
- 重启后恢复正常
- 无内存泄漏告警

排查步骤:
1. 检查内存使用趋势,确认持续增长
2. 分析堆 dump,发现大量缓存对象
3. 检查缓存配置,发现无 TTL 限制
4. 检查缓存访问模式,发现大量一次性查询

根因:
- 响应缓存无过期策略
- 一次性查询结果被永久缓存

解决方案:
1. 临时:清理缓存
2. 永久:添加缓存 TTL
3. 预防:添加内存增长告警
"""

三、紧急故障处理

3.1 故障分级

# incident_management.py
from enum import Enum
from typing import Dict, List

class IncidentSeverity(Enum):
    """故障级别"""
    P0 = "P0"  # 系统完全不可用
    P1 = "P1"  # 核心功能不可用
    P2 = "P2"  # 部分功能不可用
    P3 = "P3"  # 性能下降或轻微问题

class IncidentManager:
    """故障管理器"""
    
    def __init__(self):
        self.response_times = {
            IncidentSeverity.P0: 5,   # 5 分钟内响应
            IncidentSeverity.P1: 15,  # 15 分钟内响应
            IncidentSeverity.P2: 60,  # 60 分钟内响应
            IncidentSeverity.P3: 240  # 4 小时内响应
        }
        
        self.escalation_matrix = {
            IncidentSeverity.P0: ['oncall', 'team_lead', 'vp'],
            IncidentSeverity.P1: ['oncall', 'team_lead'],
            IncidentSeverity.P2: ['oncall'],
            IncidentSeverity.P3: ['oncall']
        }
    
    def create_incident(
        self,
        title: str,
        severity: IncidentSeverity,
        symptoms: Dict
    ) -> Dict:
        """创建故障单"""
        return {
            'id': self._generate_incident_id(),
            'title': title,
            'severity': severity.value,
            'status': 'open',
            'symptoms': symptoms,
            'created_at': datetime.now().isoformat(),
            'response_deadline': (
                datetime.now() +
                timedelta(minutes=self.response_times[severity])
            ).isoformat(),
            'escalation_contacts': self.escalation_matrix[severity]
        }
    
    def _generate_incident_id(self) -> str:
        """生成故障单 ID"""
        return f"INC-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
    
    def get_runbook(self, incident_type: str) -> Dict:
        """获取应急手册"""
        runbooks = {
            'service_down': {
                'steps': [
                    '1. 确认服务状态',
                    '2. 检查最近变更',
                    '3. 尝试重启服务',
                    '4. 回滚最近变更',
                    '5. 切换流量到备用'
                ],
                'contacts': ['oncall', 'devops']
            },
            'high_latency': {
                'steps': [
                    '1. 检查监控指标',
                    '2. 识别瓶颈组件',
                    '3. 扩容瓶颈组件',
                    '4. 启用降级策略',
                    '5. 分析根因'
                ],
                'contacts': ['oncall', 'performance_team']
            },
            'data_corruption': {
                'steps': [
                    '1. 确认数据问题范围',
                    '2. 停止数据写入',
                    '3. 评估数据恢复方案',
                    '4. 执行数据恢复',
                    '5. 验证数据完整性'
                ],
                'contacts': ['oncall', 'dba', 'dev_team']
            }
        }
        
        return runbooks.get(incident_type, {})

# 故障处理流程
"""
P0 故障处理流程:

1. 发现故障(监控/用户反馈)

2. 创建故障单(5 分钟内)

3. 通知相关人员(升级矩阵)

4. 执行应急手册

5. 恢复服务(首要目标)

6. 根因分析

7. 永久修复

8. 故障复盘
"""

3.2 回滚策略

# rollback_strategy.py
from typing import Dict, List

class RollbackManager:
    """回滚管理器"""
    
    def __init__(self):
        self.deployment_history: List[Dict] = []
    
    def record_deployment(
        self,
        version: str,
        changes: List[str],
        deployed_at: str
    ):
        """记录部署"""
        self.deployment_history.append({
            'version': version,
            'changes': changes,
            'deployed_at': deployed_at,
            'status': 'deployed'
        })
    
    def get_rollback_target(
        self,
        current_version: str
    ) -> Dict:
        """获取回滚目标"""
        # 找到上一个稳定版本
        for deployment in reversed(self.deployment_history):
            if (
                deployment['version'] != current_version and
                deployment['status'] == 'stable'
            ):
                return deployment
        
        return None
    
    def execute_rollback(
        self,
        from_version: str,
        to_version: str
    ) -> Dict:
        """执行回滚"""
        return {
            'from_version': from_version,
            'to_version': to_version,
            'status': 'in_progress',
            'steps': [
                '1. 停止新版本流量',
                '2. 切换到老版本',
                '3. 验证老版本功能',
                '4. 监控老版本指标',
                '5. 确认回滚完成'
            ]
        }

# 回滚决策树
"""
回滚决策树:

问题是否由最近变更引起?
├── 是 → 考虑回滚
│   ├── 影响范围大?
│   │   ├── 是 → 立即回滚
│   │   └── 否 → 尝试修复
│   └── 回滚风险高?
│       ├── 是 → 尝试热修复
│       └── 否 → 执行回滚
└── 否 → 排查其他原因
"""

四、总结

4.1 排查原则

  1. 先恢复后排查

    • 优先恢复服务
    • 保留现场证据
    • 事后详细分析
  2. 数据驱动

    • 基于指标判断
    • 避免主观猜测
    • 用数据验证假设
  3. 系统化方法

    • 遵循排查流程
    • 使用检查清单
    • 记录排查过程

4.2 最佳实践

  1. 预防优于治疗

    • 完善监控
    • 建立告警
    • 定期演练
  2. 知识沉淀

    • 记录故障案例
    • 更新应急手册
    • 团队分享
  3. 持续改进

    • 故障复盘
    • 根因分析
    • 改进措施

参考资料


分享这篇文章到:

上一篇文章
RocketMQ 性能调优实战指南
下一篇文章
Spring Kafka 集成开发实战指南