Kafka 运维监控是保障消息系统稳定运行的关键。本文将深入探讨 Kafka 监控指标、告警配置、故障排查等实战技巧。
一、监控架构
1.1 整体架构
graph TB
subgraph Kafka 集群
B1[Broker 1]
B2[Broker 2]
B3[Broker 3]
ZK[ZooKeeper]
end
subgraph 监控采集
JMX[JMX Exporter]
PROM[Prometheus]
end
subgraph 展示告警
GRAF[Grafana]
ALERT[AlertManager]
end
B1 --> JMX
B2 --> JMX
B3 --> JMX
JMX --> PROM
PROM --> GRAF
PROM --> ALERT
1.2 监控层次
| 层次 | 监控内容 | 工具 |
|---|---|---|
| 基础设施 | CPU、内存、磁盘、网络 | Node Exporter |
| JVM | GC、堆内存、线程 | JMX Exporter |
| Kafka | TPS、延迟、堆积 | Kafka Exporter |
| 业务 | 消息量、消费进度 | 自定义指标 |
二、关键指标
2.1 Broker 指标
| 指标名称 | 说明 | 告警阈值 |
|---|---|---|
kafka_server_BrokerTopicMetrics_MessagesInPerSec | 消息写入速率 | - |
kafka_server_BrokerTopicMetrics_BytesInPerSec | 流入字节速率 | - |
kafka_server_BrokerTopicMetrics_BytesOutPerSec | 流出字节速率 | - |
kafka_network_RequestMetrics_RequestQueueTimeMs | 请求排队时间 | > 500ms |
kafka_network_RequestMetrics_LocalTimeMs | 本地处理时间 | > 100ms |
2.2 Consumer 指标
| 指标名称 | 说明 | 告警阈值 |
|---|---|---|
kafka_consumer_group_lag | 消费滞后量 | > 10000 |
kafka_consumer_group_lag_sum | 总滞后量 | > 100000 |
kafka_consumer_records_consumed_rate | 消费速率 | - |
kafka_consumer_fetch_latency | 拉取延迟 | > 100ms |
2.3 Producer 指标
| 指标名称 | 说明 | 告警阈值 |
|---|---|---|
kafka_producer_record_send_rate | 发送速率 | - |
kafka_producer_request_latency | 请求延迟 | > 100ms |
kafka_producer_record_error_rate | 错误率 | > 1% |
kafka_producer_compression_rate | 压缩率 | - |
2.4 Topic/Partition 指标
| 指标名称 | 说明 | 告警阈值 |
|---|---|---|
kafka_topic_partition_count | 分区数 | - |
kafka_topic_partition_in_sync_replica | 同步副本数 | < replication.factor |
kafka_topic_partition_under_replicated | 未同步副本数 | > 0 |
kafka_topic_partition_offline | 离线分区数 | > 0 |
三、Prometheus 配置
3.1 JMX Exporter
# kafka-jmx.yml
lowercaseOutputName: true
rules:
- pattern: kafka.server<type=BrokerTopicMetrics, name=(MessagesInPerSec|BytesInPerSec|BytesOutPerSec)><>(OneMinuteRate)
name: kafka_server_BrokerTopicMetrics_$1
value: $3
type: GAUGE
- pattern: kafka.server<type=ReplicaManager, name=(UnderReplicatedPartitions|OfflinePartitionsCount)><>Value
name: kafka_server_ReplicaManager_$1
value: $3
type: GAUGE
- pattern: kafka.consumer<type=consumer-fetch-manager-metrics, client-id=(.+), topic=(.+), partition=(.+)><>(records-lag-max)
name: kafka_consumer_partition_lag
value: $5
type: GAUGE
labels:
client-id: $2
topic: $3
partition: $4
3.2 Prometheus 配置
# prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'kafka-broker'
static_configs:
- targets:
- 'kafka-1:9090'
- 'kafka-2:9090'
- 'kafka-3:9090'
metrics_path: '/metrics'
- job_name: 'kafka-consumer'
static_configs:
- targets:
- 'consumer-1:9090'
- 'consumer-2:9090'
- job_name: 'zookeeper'
static_configs:
- targets:
- 'zk-1:2181'
- 'zk-2:2181'
- 'zk-3:2181'
3.3 告警规则
# alerting_rules.yml
groups:
- name: kafka
rules:
# Broker 告警
- alert: KafkaBrokerDown
expr: up{job="kafka-broker"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Kafka Broker 宕机:{{ $labels.instance }}"
- alert: KafkaUnderReplicatedPartitions
expr: kafka_server_ReplicaManager_UnderReplicatedPartitions > 0
for: 5m
labels:
severity: warning
annotations:
summary: "存在未同步副本:{{ $value }}"
- alert: KafkaOfflinePartitions
expr: kafka_server_ReplicaManager_OfflinePartitionsCount > 0
for: 1m
labels:
severity: critical
annotations:
summary: "存在离线分区:{{ $value }}"
# Consumer 告警
- alert: KafkaConsumerLag
expr: kafka_consumer_group_lag > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "Consumer Lag 过高:{{ $labels.group }} - {{ $value }}"
- alert: KafkaConsumerLagCritical
expr: kafka_consumer_group_lag > 100000
for: 5m
labels:
severity: critical
annotations:
summary: "Consumer Lag 严重:{{ $labels.group }} - {{ $value }}"
# Producer 告警
- alert: KafkaProducerErrorRate
expr: rate(kafka_producer_record_error_rate[5m]) > 0.01
for: 5m
labels:
severity: warning
annotations:
summary: "Producer 错误率过高:{{ $value }}"
# 性能告警
- alert: KafkaRequestQueueTime
expr: kafka_network_RequestMetrics_RequestQueueTimeMs > 500
for: 5m
labels:
severity: warning
annotations:
summary: "请求排队时间过长:{{ $value }}ms"
四、Grafana 仪表盘
4.1 核心面板
{
"dashboard": {
"title": "Kafka 监控大盘",
"panels": [
{
"title": "消息吞吐量",
"targets": [
{
"expr": "sum(rate(kafka_server_BrokerTopicMetrics_MessagesInPerSec[1m]))",
"legendFormat": "Messages In"
}
]
},
{
"title": "Consumer Lag",
"targets": [
{
"expr": "sum(kafka_consumer_group_lag) by (group)",
"legendFormat": "{{ group }}"
}
]
},
{
"title": "未同步副本",
"targets": [
{
"expr": "kafka_server_ReplicaManager_UnderReplicatedPartitions",
"legendFormat": "Under Replicated"
}
]
}
]
}
}
4.2 推荐面板
| 面板名称 | 指标 | 图表类型 |
|---|---|---|
| 消息吞吐量 | MessagesInPerSec | 时间序列 |
| 字节吞吐量 | BytesIn/OutPerSec | 时间序列 |
| Consumer Lag | consumer_group_lag | 时间序列 |
| 未同步副本 | UnderReplicatedPartitions | 状态图 |
| 请求延迟 | RequestQueueTimeMs | 热力图 |
| GC 时间 | GCTime | 时间序列 |
五、故障排查
5.1 Broker 故障
症状:Broker 无法连接
排查步骤:
# 1. 检查进程状态
ps -ef | grep kafka
# 2. 检查日志
tail -f /var/log/kafka/server.log
# 3. 检查端口
netstat -tlnp | grep 9092
# 4. 检查 ZooKeeper 连接
echo stat | nc localhost 2181
# 5. 检查磁盘空间
df -h /var/kafka-logs
# 6. 检查 GC 日志
tail -f /var/log/kafka/gc.log
常见原因:
- 磁盘空间不足
- ZooKeeper 连接断开
- OOM 错误
- 网络问题
5.2 Consumer Lag 过高
症状:消费滞后持续增长
排查步骤:
# 1. 查看消费组状态
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group my-group
# 2. 查看滞后详情
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group my-group --members --verbose
# 3. 检查消费者日志
tail -f /var/log/consumer/app.log
# 4. 检查消费者线程
jstack <pid> | grep -A 10 "consumer"
# 5. 检查消息处理时间
# 添加监控指标记录处理时间
解决方案:
// 1. 增加消费者数量
// 2. 增加消费线程
consumer.setConsumeThreadMax(64);
// 3. 优化处理逻辑
// 异步处理、批量处理
// 4. 增加分区数
kafka-topics.sh --alter --topic my-topic --partitions 32
5.3 消息丢失
症状:消息未到达消费者
排查步骤:
# 1. 检查 Producer 日志
tail -f /var/log/producer/app.log | grep "error"
# 2. 检查 Broker 日志
tail -f /var/log/kafka/server.log | grep "error"
# 3. 查看消息偏移量
kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list localhost:9092 \
--topic my-topic
# 4. 检查副本状态
kafka-topics.sh --describe --topic my-topic
解决方案:
// Producer 配置
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);
props.put("enable.idempotence", true);
props.put("max.in.flight.requests.per.connection", 5);
5.4 性能下降
症状:TPS 下降、延迟升高
排查步骤:
# 1. 检查 CPU 使用率
top -p $(ps -ef | grep kafka | awk '{print $2}')
# 2. 检查 IO 等待
iostat -x 1 5
# 3. 检查 GC 情况
jstat -gcutil <pid> 1000 10
# 4. 检查网络流量
iftop -P -n -i eth0
# 5. 检查磁盘 IO
iotop -o -P
解决方案:
# Broker 优化
num.io.threads=16
num.network.threads=8
log.flush.interval.messages=10000
# JVM 优化
-XX:+UseG1GC
-XX:MaxGCPauseMillis=20
六、运维工具
6.1 官方工具
# Topic 管理
kafka-topics.sh --create --topic my-topic --partitions 3 --replication-factor 2
kafka-topics.sh --describe --topic my-topic
kafka-topics.sh --delete --topic my-topic
# 消费组管理
kafka-consumer-groups.sh --describe --group my-group
kafka-consumer-groups.sh --reset-offsets --group my-group --to-earliest --execute
# 偏移量查询
kafka-run-class.sh kafka.tools.GetOffsetShell --topic my-topic
# 日志清理
kafka-delete-records.sh --offset-json-file offset.json --bootstrap-server localhost:9092
# 副本重分配
kafka-reassign-partitions.sh --execute --reassignment-json-file reassign.json
6.2 第三方工具
| 工具 | 说明 | 链接 |
|---|---|---|
| Kafka Manager | Web 管理界面 | GitHub |
| Kafka Eagle | 监控管理平台 | GitHub |
| Offset Explorer | 桌面客户端 | 官网 |
| kcat | 命令行工具 | GitHub |
6.3 自定义脚本
#!/bin/bash
# Kafka 健康检查脚本
BROKERS="kafka-1:9092,kafka-2:9092,kafka-3:9092"
# 检查 Broker 连接
for broker in $(echo $BROKERS | tr ',' ' '); do
if ! nc -z ${broker%:*} ${broker#*:} &>/dev/null; then
echo "CRITICAL: Broker $broker 无法连接"
exit 2
fi
done
# 检查未同步副本
under_replicated=$(kafka-topics.sh --bootstrap-server $BROKERS --describe | \
grep -c "Isr:" || echo "0")
if [ "$under_replicated" -gt 0 ]; then
echo "WARNING: 存在 $under_replicated 个未同步副本"
exit 1
fi
# 检查离线分区
offline=$(kafka-topics.sh --bootstrap-server $BROKERS --describe | \
grep -c "Offline" || echo "0")
if [ "$offline" -gt 0 ]; then
echo "CRITICAL: 存在 $offline 个离线分区"
exit 2
fi
echo "OK: Kafka 集群健康"
exit 0
七、最佳实践
7.1 监控配置
| 场景 | 采集间隔 | 保留时间 | 告警阈值 |
|---|---|---|---|
| 开发环境 | 60s | 7 天 | 宽松 |
| 测试环境 | 30s | 14 天 | 中等 |
| 生产环境 | 15s | 30 天 | 严格 |
7.2 日志管理
# log4j 配置
log4j.rootLogger=INFO, stdout, kafkaAppender
log4j.appender.kafkaAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.kafkaAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.kafkaAppender.File=/var/log/kafka/server.log
log4j.appender.kafkaAppender.MaxFileSize=1GB
log4j.appender.kafkaAppender.MaxBackupIndex=10
7.3 备份策略
#!/bin/bash
# 元数据备份脚本
BACKUP_DIR="/backup/kafka"
DATE=$(date +%Y%m%d_%H%M%S)
# 备份 Topic 配置
kafka-topics.sh --bootstrap-server localhost:9092 --describe > \
$BACKUP_DIR/topics_$DATE.txt
# 备份消费组偏移量
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --all-groups > \
$BACKUP_DIR/consumer-groups_$DATE.txt
# 保留 30 天
find $BACKUP_DIR -name "*.txt" -mtime +30 -delete
总结
Kafka 运维监控的核心要点:
- 监控架构:JMX Exporter + Prometheus + Grafana
- 关键指标:Broker、Consumer、Producer、Topic/Partition
- 告警配置:分级告警、合理阈值
- 故障排查:日志分析、工具使用
- 最佳实践:监控配置、日志管理、备份策略
核心要点:
- 建立完整的监控体系
- 配置合理的告警阈值
- 掌握常见故障排查方法
- 定期备份元数据
参考资料
- Kafka Monitoring 官方文档
- Prometheus Kafka Exporter
- 《Kafka 权威指南》第 9 章