Skip to content
清晨的一缕阳光
返回

Kafka 监控体系与可观测性实战

Kafka 监控体系是保障集群稳定运行的关键。本文将深入探讨指标采集、日志收集、链路追踪、告警配置等可观测性实践。

一、监控架构

1.1 监控层次

graph TB
    subgraph 基础设施层
        CPU[CPU]
        MEM[内存]
        DISK[磁盘]
        NET[网络]
    end
    
    subgraph Kafka 层
        BROKER[Broker]
        TOPIC[Topic]
        PARTITION[Partition]
        CONSUMER[Consumer]
    end
    
    subgraph 业务层
        TPS[TPS]
        LAG[Consumer Lag]
        ERROR[错误率]
    end
    
    subgraph 展示层
        PROM[Prometheus]
        GRAF[Grafana]
        ALERT[告警]
    end
    
    CPU --> BROKER
    BROKER --> TPS
    TPS --> PROM
    PROM --> GRAF
    PROM --> ALERT

1.2 监控指标分类

分类指标说明
基础设施CPU、内存、磁盘、网络服务器资源
BrokerTPS、延迟、连接数Broker 性能
Topic消息量、分区数、副本数Topic 状态
ConsumerLag、TPS、重平衡消费状态
业务成功率、错误率、延迟业务指标

二、指标采集

2.1 JMX Exporter

配置

# jmx-exporter.yml
lowercaseOutputName: true
rules:
  - pattern: kafka.server<type=BrokerTopicMetrics, name=(MessagesInPerSec|BytesInPerSec)><>(OneMinuteRate)
    name: kafka_server_broker_topic_metrics_$1
    value: $3
    type: GAUGE
  
  - pattern: kafka.server<type=ReplicaManager, name=(UnderReplicatedPartitions|OfflinePartitionsCount)><>Value
    name: kafka_server_replica_manager_$1
    value: $3
    type: GAUGE
  
  - pattern: kafka.consumer<type=consumer-fetch-manager-metrics, client-id=(.+), topic=(.+)><>(records-lag-max)
    name: kafka_consumer_records_lag_max
    value: $5
    type: GAUGE
    labels:
      client-id: $2
      topic: $3

部署

#!/bin/bash
# 部署 JMX Exporter

# 1. 下载 JMX Exporter
wget https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.17.2/jmx_prometheus_javaagent-0.17.2.jar
mv jmx_prometheus_javaagent-0.17.2.jar /opt/jmx_exporter.jar

# 2. 配置 Kafka 启动参数
export KAFKA_JMX_OPTS="-javaagent:/opt/jmx_exporter.jar=9090:/opt/jmx-exporter.yml"

# 3. 重启 Kafka
systemctl restart kafka

2.2 Prometheus 配置

# prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

scrape_configs:
  - job_name: 'kafka-broker'
    static_configs:
      - targets: 
          - 'kafka-1:9090'
          - 'kafka-2:9090'
          - 'kafka-3:9090'
    metrics_path: '/metrics'
  
  - job_name: 'kafka-consumer'
    static_configs:
      - targets:
          - 'consumer-1:9090'
          - 'consumer-2:9090'
  
  - job_name: 'zookeeper'
    static_configs:
      - targets:
          - 'zk-1:2181'
          - 'zk-2:2181'
          - 'zk-3:2181'

三、日志收集

3.1 Filebeat 配置

# filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/kafka/*.log
  fields:
    service: kafka
  multiline.pattern: '^\['
  multiline.negate: true
  multiline.match: after

processors:
  - add_host_metadata: ~
  - add_cloud_metadata: ~

output.elasticsearch:
  hosts: ["elasticsearch:9200"]
  indices:
    - index: "kafka-%{+yyyy.MM.dd}"

3.2 日志分析

错误日志分析

#!/bin/bash
# 错误日志分析脚本

LOG_FILE="/var/log/kafka/server.log"

echo "=== 错误统计 ==="
grep -i "error" $LOG_FILE | \
  awk -F'ERROR' '{print $2}' | \
  awk -F':' '{print $1}' | \
  sort | uniq -c | sort -rn

echo -e "\n=== 警告统计 ==="
grep -i "warn" $LOG_FILE | \
  awk -F'WARN' '{print $2}' | \
  awk -F':' '{print $1}' | \
  sort | uniq -c | sort -rn

echo -e "\n=== 最近错误 ==="
grep -i "error" $LOG_FILE | tail -20

慢查询分析

#!/bin/bash
# 慢查询分析脚本

LOG_FILE="/var/log/kafka/server.log"

echo "=== 慢请求统计 ==="
grep "Slowest request" $LOG_FILE | \
  awk -F'took' '{print $2}' | \
  awk '{print $1}' | \
  sort -n | tail -20

四、链路追踪

4.1 OpenTelemetry 集成

配置

// Java 应用配置
Properties props = new Properties();
props.put("bootstrap.servers", "kafka-1:9092");

// OpenTelemetry 配置
props.put("opentelemetry.trace.enabled", "true");
props.put("opentelemetry.exporter.endpoint", "http://jaeger:14268/api/traces");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

4.2 链路追踪实现

public class TracedProducer {
    
    private final KafkaProducer<String, String> producer;
    private final Tracer tracer;
    
    public TracedProducer(Tracer tracer) {
        this.tracer = tracer;
        this.producer = createProducer();
    }
    
    public void send(String topic, String key, String value) {
        Span span = tracer.buildSpan("kafka-produce")
            .withTag("topic", topic)
            .withTag("key", key)
            .start();
        
        try {
            ProducerRecord<String, String> record = 
                new ProducerRecord<>(topic, key, value);
            
            // 注入追踪上下文
            tracer.inject(span.context(), Builtin.TEXT_MAP, 
                new HeadersAdapter(record.headers()));
            
            producer.send(record);
            span.finish();
            
        } catch (Exception e) {
            span.setTag("error", true);
            span.finish();
            throw e;
        }
    }
}

4.3 Jaeger 配置

# docker-compose.yml
version: '3'
services:
  jaeger:
    image: jaegertracing/all-in-one:1.37
    ports:
      - "5775:5775/udp"
      - "6831:6831/udp"
      - "6832:6832/udp"
      - "16686:16686"
      - "14268:14268"
    environment:
      - COLLECTOR_ZIPKIN_HTTP_PORT=9411

五、告警配置

5.1 告警规则

# alerting_rules.yml
groups:
  - name: kafka
    rules:
      # Broker 宕机
      - alert: KafkaBrokerDown
        expr: up{job="kafka-broker"} == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Broker 宕机:{{ $labels.instance }}"
      
      # 未同步副本
      - alert: KafkaUnderReplicatedPartitions
        expr: kafka_server_replica_manager_under_replicated_partitions > 0
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "存在未同步副本:{{ $value }}"
      
      # 离线分区
      - alert: KafkaOfflinePartitions
        expr: kafka_server_replica_manager_offline_partitions_count > 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "存在离线分区:{{ $value }}"
      
      # Consumer Lag
      - alert: KafkaConsumerLag
        expr: kafka_consumer_records_lag_max > 10000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "消费滞后:{{ $labels.group }} - {{ $value }}"
      
      # 磁盘使用率
      - alert: KafkaDiskHigh
        expr: node_filesystem_avail_bytes{mountpoint="/data"} / node_filesystem_size_bytes{mountpoint="/data"} < 0.2
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "磁盘使用率过高:{{ $value }}"
      
      # 错误率
      - alert: KafkaErrorRate
        expr: rate(kafka_server_broker_topic_metrics_messages_in_per_sec{status="error"}[5m]) > 0.01
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "错误率过高:{{ $value }}"

5.2 告警通知

# alertmanager.yml
global:
  smtp_smarthost: 'smtp.example.com:587'
  smtp_from: 'alert@example.com'

route:
  group_by: ['alertname']
  group_wait: 10s
  group_interval: 10s
  repeat_interval: 1h
  receiver: 'email'
  
  routes:
    - match:
        severity: critical
      receiver: 'pagerduty'
    - match:
        severity: warning
      receiver: 'email'

receivers:
  - name: 'email'
    email_configs:
      - to: 'ops-team@example.com'
  
  - name: 'pagerduty'
    pagerduty_configs:
      - service_key: 'your-pagerduty-key'
  
  - name: 'dingtalk'
    webhook_configs:
      - url: 'https://oapi.dingtalk.com/robot/send?access_token=xxx'

六、Grafana 仪表盘

6.1 核心面板

{
  "dashboard": {
    "title": "Kafka 监控",
    "panels": [
      {
        "title": "消息吞吐量",
        "targets": [{
          "expr": "sum(rate(kafka_server_broker_topic_metrics_messages_in_per_sec[1m]))"
        }]
      },
      {
        "title": "Consumer Lag",
        "targets": [{
          "expr": "sum(kafka_consumer_records_lag_max) by (group)"
        }]
      },
      {
        "title": "未同步副本",
        "targets": [{
          "expr": "kafka_server_replica_manager_under_replicated_partitions"
        }]
      },
      {
        "title": "磁盘使用率",
        "targets": [{
          "expr": "1 - (node_filesystem_avail_bytes{mountpoint=\"/data\"} / node_filesystem_size_bytes{mountpoint=\"/data\"})"
        }]
      }
    ]
  }
}

6.2 推荐仪表盘

仪表盘说明
Kafka Overview集群概览、TPS、延迟
Broker MetricsBroker 详细指标
Consumer LagConsumer Lag 监控
Topic MetricsTopic 详细指标
JVM MetricsJVM 指标、GC

七、最佳实践

7.1 监控建议

监控建议:
1. 采集间隔 15 秒
2. 保留数据 30 天
3. 配置多级告警
4. 建立值班制度
5. 定期演练

7.2 告警阈值

告警阈值建议:
- Broker 宕机:1 分钟
- 未同步副本:5 分钟
- Consumer Lag > 10000:5 分钟
- 磁盘使用率 > 80%:5 分钟
- 错误率 > 1%:5 分钟

7.3 检查清单

监控检查:
- [ ] JMX Exporter 正常运行
- [ ] Prometheus 正常采集
- [ ] Grafana 仪表盘正常
- [ ] 告警规则配置
- [ ] 告警通知正常
- [ ] 日志收集正常
- [ ] 链路追踪正常

总结

Kafka 监控体系的核心要点:

  1. 监控架构:基础设施、Kafka 层、业务层
  2. 指标采集:JMX Exporter、Prometheus
  3. 日志收集:Filebeat、ELK
  4. 链路追踪:OpenTelemetry、Jaeger
  5. 告警配置:告警规则、通知配置
  6. 仪表盘:Grafana、核心面板

核心要点

参考资料


分享这篇文章到:

上一篇文章
Redis 持久化性能优化
下一篇文章
RocketMQ 云原生部署与 Kubernetes 实战