Kafka 监控体系是保障集群稳定运行的关键。本文将深入探讨指标采集、日志收集、链路追踪、告警配置等可观测性实践。
一、监控架构
1.1 监控层次
graph TB
subgraph 基础设施层
CPU[CPU]
MEM[内存]
DISK[磁盘]
NET[网络]
end
subgraph Kafka 层
BROKER[Broker]
TOPIC[Topic]
PARTITION[Partition]
CONSUMER[Consumer]
end
subgraph 业务层
TPS[TPS]
LAG[Consumer Lag]
ERROR[错误率]
end
subgraph 展示层
PROM[Prometheus]
GRAF[Grafana]
ALERT[告警]
end
CPU --> BROKER
BROKER --> TPS
TPS --> PROM
PROM --> GRAF
PROM --> ALERT
1.2 监控指标分类
| 分类 | 指标 | 说明 |
|---|---|---|
| 基础设施 | CPU、内存、磁盘、网络 | 服务器资源 |
| Broker | TPS、延迟、连接数 | Broker 性能 |
| Topic | 消息量、分区数、副本数 | Topic 状态 |
| Consumer | Lag、TPS、重平衡 | 消费状态 |
| 业务 | 成功率、错误率、延迟 | 业务指标 |
二、指标采集
2.1 JMX Exporter
配置:
# jmx-exporter.yml
lowercaseOutputName: true
rules:
- pattern: kafka.server<type=BrokerTopicMetrics, name=(MessagesInPerSec|BytesInPerSec)><>(OneMinuteRate)
name: kafka_server_broker_topic_metrics_$1
value: $3
type: GAUGE
- pattern: kafka.server<type=ReplicaManager, name=(UnderReplicatedPartitions|OfflinePartitionsCount)><>Value
name: kafka_server_replica_manager_$1
value: $3
type: GAUGE
- pattern: kafka.consumer<type=consumer-fetch-manager-metrics, client-id=(.+), topic=(.+)><>(records-lag-max)
name: kafka_consumer_records_lag_max
value: $5
type: GAUGE
labels:
client-id: $2
topic: $3
部署:
#!/bin/bash
# 部署 JMX Exporter
# 1. 下载 JMX Exporter
wget https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.17.2/jmx_prometheus_javaagent-0.17.2.jar
mv jmx_prometheus_javaagent-0.17.2.jar /opt/jmx_exporter.jar
# 2. 配置 Kafka 启动参数
export KAFKA_JMX_OPTS="-javaagent:/opt/jmx_exporter.jar=9090:/opt/jmx-exporter.yml"
# 3. 重启 Kafka
systemctl restart kafka
2.2 Prometheus 配置
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'kafka-broker'
static_configs:
- targets:
- 'kafka-1:9090'
- 'kafka-2:9090'
- 'kafka-3:9090'
metrics_path: '/metrics'
- job_name: 'kafka-consumer'
static_configs:
- targets:
- 'consumer-1:9090'
- 'consumer-2:9090'
- job_name: 'zookeeper'
static_configs:
- targets:
- 'zk-1:2181'
- 'zk-2:2181'
- 'zk-3:2181'
三、日志收集
3.1 Filebeat 配置
# filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/kafka/*.log
fields:
service: kafka
multiline.pattern: '^\['
multiline.negate: true
multiline.match: after
processors:
- add_host_metadata: ~
- add_cloud_metadata: ~
output.elasticsearch:
hosts: ["elasticsearch:9200"]
indices:
- index: "kafka-%{+yyyy.MM.dd}"
3.2 日志分析
错误日志分析:
#!/bin/bash
# 错误日志分析脚本
LOG_FILE="/var/log/kafka/server.log"
echo "=== 错误统计 ==="
grep -i "error" $LOG_FILE | \
awk -F'ERROR' '{print $2}' | \
awk -F':' '{print $1}' | \
sort | uniq -c | sort -rn
echo -e "\n=== 警告统计 ==="
grep -i "warn" $LOG_FILE | \
awk -F'WARN' '{print $2}' | \
awk -F':' '{print $1}' | \
sort | uniq -c | sort -rn
echo -e "\n=== 最近错误 ==="
grep -i "error" $LOG_FILE | tail -20
慢查询分析:
#!/bin/bash
# 慢查询分析脚本
LOG_FILE="/var/log/kafka/server.log"
echo "=== 慢请求统计 ==="
grep "Slowest request" $LOG_FILE | \
awk -F'took' '{print $2}' | \
awk '{print $1}' | \
sort -n | tail -20
四、链路追踪
4.1 OpenTelemetry 集成
配置:
// Java 应用配置
Properties props = new Properties();
props.put("bootstrap.servers", "kafka-1:9092");
// OpenTelemetry 配置
props.put("opentelemetry.trace.enabled", "true");
props.put("opentelemetry.exporter.endpoint", "http://jaeger:14268/api/traces");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
4.2 链路追踪实现
public class TracedProducer {
private final KafkaProducer<String, String> producer;
private final Tracer tracer;
public TracedProducer(Tracer tracer) {
this.tracer = tracer;
this.producer = createProducer();
}
public void send(String topic, String key, String value) {
Span span = tracer.buildSpan("kafka-produce")
.withTag("topic", topic)
.withTag("key", key)
.start();
try {
ProducerRecord<String, String> record =
new ProducerRecord<>(topic, key, value);
// 注入追踪上下文
tracer.inject(span.context(), Builtin.TEXT_MAP,
new HeadersAdapter(record.headers()));
producer.send(record);
span.finish();
} catch (Exception e) {
span.setTag("error", true);
span.finish();
throw e;
}
}
}
4.3 Jaeger 配置
# docker-compose.yml
version: '3'
services:
jaeger:
image: jaegertracing/all-in-one:1.37
ports:
- "5775:5775/udp"
- "6831:6831/udp"
- "6832:6832/udp"
- "16686:16686"
- "14268:14268"
environment:
- COLLECTOR_ZIPKIN_HTTP_PORT=9411
五、告警配置
5.1 告警规则
# alerting_rules.yml
groups:
- name: kafka
rules:
# Broker 宕机
- alert: KafkaBrokerDown
expr: up{job="kafka-broker"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Broker 宕机:{{ $labels.instance }}"
# 未同步副本
- alert: KafkaUnderReplicatedPartitions
expr: kafka_server_replica_manager_under_replicated_partitions > 0
for: 5m
labels:
severity: warning
annotations:
summary: "存在未同步副本:{{ $value }}"
# 离线分区
- alert: KafkaOfflinePartitions
expr: kafka_server_replica_manager_offline_partitions_count > 0
for: 1m
labels:
severity: critical
annotations:
summary: "存在离线分区:{{ $value }}"
# Consumer Lag
- alert: KafkaConsumerLag
expr: kafka_consumer_records_lag_max > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "消费滞后:{{ $labels.group }} - {{ $value }}"
# 磁盘使用率
- alert: KafkaDiskHigh
expr: node_filesystem_avail_bytes{mountpoint="/data"} / node_filesystem_size_bytes{mountpoint="/data"} < 0.2
for: 5m
labels:
severity: warning
annotations:
summary: "磁盘使用率过高:{{ $value }}"
# 错误率
- alert: KafkaErrorRate
expr: rate(kafka_server_broker_topic_metrics_messages_in_per_sec{status="error"}[5m]) > 0.01
for: 5m
labels:
severity: warning
annotations:
summary: "错误率过高:{{ $value }}"
5.2 告警通知
# alertmanager.yml
global:
smtp_smarthost: 'smtp.example.com:587'
smtp_from: 'alert@example.com'
route:
group_by: ['alertname']
group_wait: 10s
group_interval: 10s
repeat_interval: 1h
receiver: 'email'
routes:
- match:
severity: critical
receiver: 'pagerduty'
- match:
severity: warning
receiver: 'email'
receivers:
- name: 'email'
email_configs:
- to: 'ops-team@example.com'
- name: 'pagerduty'
pagerduty_configs:
- service_key: 'your-pagerduty-key'
- name: 'dingtalk'
webhook_configs:
- url: 'https://oapi.dingtalk.com/robot/send?access_token=xxx'
六、Grafana 仪表盘
6.1 核心面板
{
"dashboard": {
"title": "Kafka 监控",
"panels": [
{
"title": "消息吞吐量",
"targets": [{
"expr": "sum(rate(kafka_server_broker_topic_metrics_messages_in_per_sec[1m]))"
}]
},
{
"title": "Consumer Lag",
"targets": [{
"expr": "sum(kafka_consumer_records_lag_max) by (group)"
}]
},
{
"title": "未同步副本",
"targets": [{
"expr": "kafka_server_replica_manager_under_replicated_partitions"
}]
},
{
"title": "磁盘使用率",
"targets": [{
"expr": "1 - (node_filesystem_avail_bytes{mountpoint=\"/data\"} / node_filesystem_size_bytes{mountpoint=\"/data\"})"
}]
}
]
}
}
6.2 推荐仪表盘
| 仪表盘 | 说明 |
|---|---|
| Kafka Overview | 集群概览、TPS、延迟 |
| Broker Metrics | Broker 详细指标 |
| Consumer Lag | Consumer Lag 监控 |
| Topic Metrics | Topic 详细指标 |
| JVM Metrics | JVM 指标、GC |
七、最佳实践
7.1 监控建议
监控建议:
1. 采集间隔 15 秒
2. 保留数据 30 天
3. 配置多级告警
4. 建立值班制度
5. 定期演练
7.2 告警阈值
告警阈值建议:
- Broker 宕机:1 分钟
- 未同步副本:5 分钟
- Consumer Lag > 10000:5 分钟
- 磁盘使用率 > 80%:5 分钟
- 错误率 > 1%:5 分钟
7.3 检查清单
监控检查:
- [ ] JMX Exporter 正常运行
- [ ] Prometheus 正常采集
- [ ] Grafana 仪表盘正常
- [ ] 告警规则配置
- [ ] 告警通知正常
- [ ] 日志收集正常
- [ ] 链路追踪正常
总结
Kafka 监控体系的核心要点:
- 监控架构:基础设施、Kafka 层、业务层
- 指标采集:JMX Exporter、Prometheus
- 日志收集:Filebeat、ELK
- 链路追踪:OpenTelemetry、Jaeger
- 告警配置:告警规则、通知配置
- 仪表盘:Grafana、核心面板
核心要点:
- 建立完整的监控体系
- 配置合理的告警阈值
- 实现链路追踪
- 建立值班制度
- 定期演练和优化