Skip to content
清晨的一缕阳光
返回

AI 应用监控与告警体系

AI 应用监控与告警体系

AI 应用的监控比传统应用更复杂。需要监控什么指标?如何设置告警?本文详解 AI 应用监控与告警体系的设计与实践。

一、监控指标体系

1.1 监控层次

AI 应用监控层次:

┌─────────────────────────────────────┐
│ 1. 基础设施层                        │
│    - CPU/内存/磁盘                   │
│    - 网络 IO                         │
│    - GPU 资源                        │
├─────────────────────────────────────┤
│ 2. 应用服务层                        │
│    - 服务可用性                      │
│    - 请求延迟                        │
│    - 错误率                          │
├─────────────────────────────────────┤
│ 3. AI 质量层                         │
│    - 检索质量                        │
│    - 生成质量                        │
│    - 用户满意度                      │
├─────────────────────────────────────┤
│ 4. 业务指标层                        │
│    - 用户活跃度                      │
│    - 任务完成率                      │
│    - 业务转化率                      │
└─────────────────────────────────────┘

1.2 核心指标

# monitoring_metrics.py
from typing import Dict, List

class AIMonitoringMetrics:
    """AI 监控指标"""
    
    # 系统指标
    SYSTEM_METRICS = {
        'cpu_usage': {
            'description': 'CPU 使用率',
            'unit': '%',
            'warning_threshold': 70,
            'critical_threshold': 90
        },
        'memory_usage': {
            'description': '内存使用率',
            'unit': '%',
            'warning_threshold': 70,
            'critical_threshold': 85
        },
        'disk_usage': {
            'description': '磁盘使用率',
            'unit': '%',
            'warning_threshold': 80,
            'critical_threshold': 90
        },
        'gpu_usage': {
            'description': 'GPU 使用率',
            'unit': '%',
            'warning_threshold': 80,
            'critical_threshold': 95
        }
    }
    
    # 应用指标
    APPLICATION_METRICS = {
        'request_rate': {
            'description': '请求速率',
            'unit': 'req/s',
            'warning_threshold': None,
            'critical_threshold': None
        },
        'response_latency_p50': {
            'description': 'P50 响应延迟',
            'unit': 'ms',
            'warning_threshold': 1000,
            'critical_threshold': 3000
        },
        'response_latency_p99': {
            'description': 'P99 响应延迟',
            'unit': 'ms',
            'warning_threshold': 3000,
            'critical_threshold': 5000
        },
        'error_rate': {
            'description': '错误率',
            'unit': '%',
            'warning_threshold': 1,
            'critical_threshold': 5
        },
        'availability': {
            'description': '可用性',
            'unit': '%',
            'warning_threshold': 99.9,
            'critical_threshold': 99
        }
    }
    
    # AI 质量指标
    AI_QUALITY_METRICS = {
        'retrieval_precision': {
            'description': '检索精确率',
            'unit': '%',
            'warning_threshold': 70,
            'critical_threshold': 50
        },
        'retrieval_recall': {
            'description': '检索召回率',
            'unit': '%',
            'warning_threshold': 70,
            'critical_threshold': 50
        },
        'generation_faithfulness': {
            'description': '生成忠实度',
            'unit': '%',
            'warning_threshold': 70,
            'critical_threshold': 50
        },
        'hallucination_rate': {
            'description': '幻觉率',
            'unit': '%',
            'warning_threshold': 10,
            'critical_threshold': 20
        },
        'user_satisfaction': {
            'description': '用户满意度',
            'unit': 'score (1-5)',
            'warning_threshold': 3.5,
            'critical_threshold': 3.0
        }
    }
    
    # 业务指标
    BUSINESS_METRICS = {
        'active_users': {
            'description': '活跃用户数',
            'unit': 'count',
            'warning_threshold': None,
            'critical_threshold': None
        },
        'task_completion_rate': {
            'description': '任务完成率',
            'unit': '%',
            'warning_threshold': 80,
            'critical_threshold': 60
        },
        'cost_per_query': {
            'description': '单次查询成本',
            'unit': 'USD',
            'warning_threshold': 0.05,
            'critical_threshold': 0.10
        }
    }

二、监控系统设计

2.1 监控架构

# monitoring_architecture.py
from typing import Dict, List

class MonitoringArchitecture:
    """监控架构"""
    
    def __init__(self):
        self.components = {
            'data_collection': self._data_collection_layer(),
            'data_storage': self._data_storage_layer(),
            'data_processing': self._data_processing_layer(),
            'alerting': self._alerting_layer(),
            'visualization': self._visualization_layer()
        }
    
    def _data_collection_layer(self) -> Dict:
        """数据采集层"""
        return {
            'tools': [
                'Prometheus Node Exporter',
                'Application Metrics SDK',
                'Log Collectors (Fluentd/Fluent Bit)',
                'Distributed Tracing (Jaeger/Zipkin)'
            ],
            'metrics_types': [
                'system_metrics',
                'application_metrics',
                'business_metrics',
                'ai_quality_metrics'
            ]
        }
    
    def _data_storage_layer(self) -> Dict:
        """数据存储层"""
        return {
            'time_series_db': 'Prometheus/VictoriaMetrics',
            'log_storage': 'Elasticsearch',
            'trace_storage': 'Jaeger/Elasticsearch',
            'alert_storage': 'Alertmanager'
        }
    
    def _data_processing_layer(self) -> Dict:
        """数据处理层"""
        return {
            'aggregation': 'Prometheus Recording Rules',
            'stream_processing': 'Apache Flink/Kafka Streams',
            'batch_processing': 'Apache Spark'
        }
    
    def _alerting_layer(self) -> Dict:
        """告警层"""
        return {
            'alert_manager': 'Prometheus Alertmanager',
            'notification_channels': [
                'Email',
                'Slack',
                'PagerDuty',
                '钉钉',
                '企业微信'
            ],
            'escalation_policy': 'based_on_severity'
        }
    
    def _visualization_layer(self) -> Dict:
        """可视化层"""
        return {
            'dashboards': 'Grafana',
            'custom_reports': 'Kibana',
            'real_time_monitoring': 'Grafana Live'
        }

2.2 指标采集

# metrics_collection.py
from typing import Dict, List
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time

class AIMetricsCollector:
    """AI 指标采集器"""
    
    def __init__(self, port: int = 8000):
        # 系统指标
        self.cpu_usage = Gauge('ai_cpu_usage_percent', 'CPU 使用率')
        self.memory_usage = Gauge('ai_memory_usage_percent', '内存使用率')
        
        # 应用指标
        self.request_count = Counter('ai_requests_total', '总请求数', ['endpoint', 'method'])
        self.request_latency = Histogram('ai_request_latency_seconds', '请求延迟', ['endpoint'])
        self.error_count = Counter('ai_errors_total', '错误数', ['endpoint', 'error_type'])
        
        # AI 质量指标
        self.retrieval_precision = Gauge('ai_retrieval_precision', '检索精确率')
        self.retrieval_recall = Gauge('ai_retrieval_recall', '检索召回率')
        self.generation_faithfulness = Gauge('ai_generation_faithfulness', '生成忠实度')
        self.hallucination_rate = Gauge('ai_hallucination_rate', '幻觉率')
        
        # 业务指标
        self.active_users = Gauge('ai_active_users', '活跃用户数')
        self.task_completion_rate = Gauge('ai_task_completion_rate', '任务完成率')
        self.cost_per_query = Gauge('ai_cost_per_query', '单次查询成本')
        
        # 启动指标服务器
        start_http_server(port)
    
    def record_request(
        self,
        endpoint: str,
        method: str,
        latency: float
    ):
        """记录请求"""
        self.request_count.labels(endpoint=endpoint, method=method).inc()
        self.request_latency.labels(endpoint=endpoint).observe(latency)
    
    def record_error(
        self,
        endpoint: str,
        error_type: str
    ):
        """记录错误"""
        self.error_count.labels(endpoint=endpoint, error_type=error_type).inc()
    
    def record_ai_quality(
        self,
        precision: float,
        recall: float,
        faithfulness: float,
        hallucination_rate: float
    ):
        """记录 AI 质量指标"""
        self.retrieval_precision.set(precision)
        self.retrieval_recall.set(recall)
        self.generation_faithfulness.set(faithfulness)
        self.hallucination_rate.set(hallucination_rate)
    
    def record_business_metrics(
        self,
        active_users: int,
        task_completion_rate: float,
        cost_per_query: float
    ):
        """记录业务指标"""
        self.active_users.set(active_users)
        self.task_completion_rate.set(task_completion_rate)
        self.cost_per_query.set(cost_per_query)

# 使用示例
collector = AIMetricsCollector()

# 记录请求
start_time = time.time()
# ... 处理请求 ...
latency = time.time() - start_time
collector.record_request('/api/query', 'POST', latency)

# 记录 AI 质量
collector.record_ai_quality(
    precision=0.85,
    recall=0.78,
    faithfulness=0.82,
    hallucination_rate=0.05
)

三、告警策略

3.1 告警规则

# alerting_rules.yml
groups:
  - name: AI Application Alerts
    rules:
      # 系统告警
      - alert: HighCPUUsage
        expr: ai_cpu_usage_percent > 80
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "CPU 使用率过高"
          description: "CPU 使用率 {{ $value }}% 超过 80%"
      
      - alert: HighMemoryUsage
        expr: ai_memory_usage_percent > 85
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "内存使用率过高"
          description: "内存使用率 {{ $value }}% 超过 85%"
      
      # 应用告警
      - alert: HighErrorRate
        expr: sum(rate(ai_errors_total[5m])) / sum(rate(ai_requests_total[5m])) > 0.05
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "错误率过高"
          description: "错误率 {{ $value }} 超过 5%"
      
      - alert: HighLatency
        expr: histogram_quantile(0.99, sum(rate(ai_request_latency_seconds_bucket[5m])) by (le)) > 3
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "P99 延迟过高"
          description: "P99 延迟 {{ $value }}s 超过 3s"
      
      # AI 质量告警
      - alert: LowRetrievalPrecision
        expr: ai_retrieval_precision < 0.5
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "检索精确率过低"
          description: "检索精确率 {{ $value }} 低于 50%"
      
      - alert: HighHallucinationRate
        expr: ai_hallucination_rate > 0.2
        for: 10m
        labels:
          severity: critical
        annotations:
          summary: "幻觉率过高"
          description: "幻觉率 {{ $value }} 超过 20%"
      
      # 业务告警
      - alert: LowTaskCompletionRate
        expr: ai_task_completion_rate < 0.6
        for: 30m
        labels:
          severity: warning
        annotations:
          summary: "任务完成率过低"
          description: "任务完成率 {{ $value }} 低于 60%"

3.2 告警路由

# alertmanager_config.yml
global:
  resolve_timeout: 5m
  smtp_smarthost: 'smtp.example.com:587'
  smtp_from: 'alerts@example.com'

route:
  group_by: ['alertname', 'severity']
  group_wait: 10s
  group_interval: 10s
  repeat_interval: 1h
  receiver: 'default'
  routes:
    - match:
        severity: critical
      receiver: 'pagerduty'
      continue: true
    - match:
        severity: warning
      receiver: 'slack'
    - match:
        alertname: HighHallucinationRate
      receiver: 'ai-team'

receivers:
  - name: 'default'
    email_configs:
      - to: 'team@example.com'
  
  - name: 'pagerduty'
    pagerduty_configs:
      - service_key: '<pagerduty_service_key>'
  
  - name: 'slack'
    slack_configs:
      - api_url: '<slack_webhook_url>'
        channel: '#alerts'
  
  - name: 'ai-team'
    slack_configs:
      - api_url: '<ai_team_webhook_url>'
        channel: '#ai-alerts'

inhibit_rules:
  - source_match:
      severity: 'critical'
    target_match:
      severity: 'warning'
    equal: ['alertname']

四、日志管理

4.1 日志规范

# logging_standards.py
import logging
import json
from typing import Dict, Any

class AILoggingStandards:
    """AI 日志规范"""
    
    # 日志级别定义
    LOG_LEVELS = {
        'DEBUG': '调试信息,用于开发调试',
        'INFO': '一般信息,用于记录正常流程',
        'WARNING': '警告信息,用于记录潜在问题',
        'ERROR': '错误信息,用于记录错误',
        'CRITICAL': '严重错误,用于记录系统故障'
    }
    
    # 结构化日志格式
    LOG_FORMAT = {
        'timestamp': 'ISO8601',
        'level': 'string',
        'service': 'string',
        'trace_id': 'string',
        'span_id': 'string',
        'message': 'string',
        'context': 'object'
    }
    
    @staticmethod
    def create_structured_logger(
        name: str,
        level: int = logging.INFO
    ) -> logging.Logger:
        """创建结构化日志器"""
        logger = logging.getLogger(name)
        logger.setLevel(level)
        
        # 创建 JSON 格式化器
        formatter = JSONFormatter()
        
        # 添加控制台处理器
        console_handler = logging.StreamHandler()
        console_handler.setFormatter(formatter)
        logger.addHandler(console_handler)
        
        return logger

class JSONFormatter(logging.Formatter):
    """JSON 格式化器"""
    
    def format(self, record: logging.LogRecord) -> str:
        log_data = {
            'timestamp': self.formatTime(record),
            'level': record.levelname,
            'logger': record.name,
            'message': record.getMessage(),
            'module': record.module,
            'function': record.funcName,
            'line': record.lineno
        }
        
        # 添加额外字段
        if hasattr(record, 'context'):
            log_data['context'] = record.context
        
        if record.exc_info:
            log_data['exception'] = self.formatException(record.exc_info)
        
        return json.dumps(log_data, ensure_ascii=False)

# 使用示例
logger = AILoggingStandards.create_structured_logger('ai_app')

# 记录一般日志
logger.info('请求处理完成', extra={'context': {
    'request_id': '123',
    'latency_ms': 150
}})

# 记录错误日志
logger.error('处理失败', extra={'context': {
    'request_id': '123',
    'error': 'LLM API timeout'
}})

4.2 日志分析

# log_analysis.py
from typing import Dict, List
import re

class LogAnalyzer:
    """日志分析器"""
    
    def __init__(self, log_storage):
        self.log_storage = log_storage
    
    def search_logs(
        self,
        query: str,
        time_range: str = '1h',
        limit: int = 100
    ) -> List[Dict]:
        """搜索日志"""
        # 实现日志搜索逻辑
        return []
    
    def analyze_error_patterns(
        self,
        time_range: str = '24h'
    ) -> Dict:
        """分析错误模式"""
        errors = self.search_logs(
            query='level:ERROR',
            time_range=time_range
        )
        
        # 统计错误类型
        error_types = {}
        for log in errors:
            error_type = log.get('context', {}).get('error_type', 'unknown')
            error_types[error_type] = error_types.get(error_type, 0) + 1
        
        return {
            'total_errors': len(errors),
            'error_types': error_types,
            'top_errors': sorted(
                error_types.items(),
                key=lambda x: x[1],
                reverse=True
            )[:10]
        }
    
    def detect_anomalies(
        self,
        metric_name: str,
        time_range: str = '24h'
    ) -> List[Dict]:
        """检测异常"""
        # 实现异常检测逻辑
        return []
    
    def generate_daily_report(self) -> Dict:
        """生成日报"""
        return {
            'date': datetime.now().strftime('%Y-%m-%d'),
            'total_requests': self._count_requests(),
            'total_errors': self._count_errors(),
            'error_rate': self._calculate_error_rate(),
            'avg_latency': self._calculate_avg_latency(),
            'top_issues': self._get_top_issues()
        }

五、可视化仪表板

5.1 Grafana 仪表板

{
  "dashboard": {
    "title": "AI Application Overview",
    "panels": [
      {
        "title": "Request Rate",
        "type": "graph",
        "targets": [
          {
            "expr": "sum(rate(ai_requests_total[1m]))",
            "legendFormat": "Requests/s"
          }
        ]
      },
      {
        "title": "Response Latency",
        "type": "graph",
        "targets": [
          {
            "expr": "histogram_quantile(0.50, sum(rate(ai_request_latency_seconds_bucket[1m])) by (le))",
            "legendFormat": "P50"
          },
          {
            "expr": "histogram_quantile(0.99, sum(rate(ai_request_latency_seconds_bucket[1m])) by (le))",
            "legendFormat": "P99"
          }
        ]
      },
      {
        "title": "Error Rate",
        "type": "graph",
        "targets": [
          {
            "expr": "sum(rate(ai_errors_total[1m])) / sum(rate(ai_requests_total[1m])) * 100",
            "legendFormat": "Error Rate %"
          }
        ]
      },
      {
        "title": "AI Quality Metrics",
        "type": "graph",
        "targets": [
          {
            "expr": "ai_retrieval_precision * 100",
            "legendFormat": "Retrieval Precision %"
          },
          {
            "expr": "ai_generation_faithfulness * 100",
            "legendFormat": "Generation Faithfulness %"
          },
          {
            "expr": "ai_hallucination_rate * 100",
            "legendFormat": "Hallucination Rate %"
          }
        ]
      },
      {
        "title": "User Satisfaction",
        "type": "singlestat",
        "targets": [
          {
            "expr": "ai_user_satisfaction",
            "legendFormat": "Avg Rating"
          }
        ]
      },
      {
        "title": "Cost per Query",
        "type": "singlestat",
        "targets": [
          {
            "expr": "ai_cost_per_query",
            "legendFormat": "USD"
          }
        ]
      }
    ]
  }
}

六、总结

6.1 核心要点

  1. 监控指标

    • 系统指标
    • 应用指标
    • AI 质量指标
    • 业务指标
  2. 告警策略

    • 分级告警
    • 合理阈值
    • 有效路由
  3. 日志管理

    • 结构化日志
    • 集中存储
    • 智能分析

6.2 最佳实践

  1. 全面监控

    • 多层次指标
    • 实时采集
    • 长期存储
  2. 智能告警

    • 避免告警疲劳
    • 有效告警路由
    • 自动恢复
  3. 持续改进

    • 定期回顾告警
    • 优化阈值
    • 完善文档

参考资料


分享这篇文章到:

上一篇文章
RocketMQ DLedger 实战指南
下一篇文章
RocketMQ 命名空间详解与多租户实践