本文汇总了 Kafka 生产环境中的典型问题排查案例,包括消息丢失、重复、堆积、性能下降等问题的分析和解决方案。
一、消息丢失案例
案例 1:acks=1 导致消息丢失
现象:
- 偶尔出现消息丢失
- 消费者未收到部分消息
- 日志无异常
排查过程:
# 1. 检查 Producer 配置
cat producer.properties | grep acks
# 发现:acks=1
# 2. 检查 Broker 日志
tail -f /var/log/kafka/server.log | grep "Leader changed"
# 发现:频繁的 Leader 切换
# 3. 模拟故障
# 在消息发送时 kill 掉 Leader Broker
# 发现:部分消息丢失
原因分析:
Producer -> Broker1(Leader) -> 确认
↓
Broker2(Follower) // 未同步完成,Broker1 宕机
结果:消息丢失
解决方案:
// 修改配置
props.put("acks", "all");
props.put("min.insync.replicas", "2");
props.put("retries", Integer.MAX_VALUE);
props.put("enable.idempotence", true);
效果:消息丢失问题彻底解决
案例 2:未处理发送异常
现象:
- 消息发送失败但无告警
- 业务数据不一致
排查过程:
// 原代码
producer.send(record); // 异步发送,未处理回调
// 修改为
producer.send(record, (metadata, exception) -> {
if (exception != null) {
log.error("发送失败", exception);
// 记录失败消息,人工处理
saveFailedMessage(record);
}
});
解决方案:
public class ReliableProducer {
public void sendWithCallback(ProducerRecord record) {
producer.send(record, this::handleCallback);
}
private void handleCallback(RecordMetadata metadata, Exception exception) {
if (exception != null) {
// 重试逻辑
if (isRetriable(exception)) {
retrySend(metadata, exception);
} else {
// 记录失败
saveFailedMessage(metadata, exception);
sendAlert("消息发送失败");
}
}
}
}
二、消息重复案例
案例 3:网络重试导致重复
现象:
- 消费者收到重复消息
- 业务数据重复
排查过程:
# 1. 检查 Producer 日志
grep "Retrying" /var/log/producer/app.log
# 发现:大量重试记录
# 2. 检查网络
ping kafka-broker-1
# 发现:网络偶尔超时
# 3. 分析时间线
T1: Producer 发送消息
T2: Broker 接收并写入
T3: 响应超时(网络问题)
T4: Producer 重试发送
T5: Broker 再次写入
结果:消息重复
解决方案:
// 1. 开启幂等性
props.put("enable.idempotence", true);
props.put("max.in.flight.requests.per.connection", 5);
// 2. 业务层去重
@RedissonLock(key = "#msgId")
public void processMessage(String msgId, Message msg) {
// 检查是否已处理
if (isProcessed(msgId)) {
return;
}
// 处理业务
processBusiness(msg);
// 标记已处理
markAsProcessed(msgId);
}
案例 4:Rebalance 导致重复
现象:
- 消费组 Rebalance 后出现重复消息
- 自动提交位移
排查过程:
// 原代码
props.put("enable.auto.commit", "true"); // 自动提交
// 问题场景
T1: Consumer A 拉取消息(offset=100-150)
T2: 处理消息(offset=100-120)
T3: Rebalance 触发
T4: Consumer B 接管(从 offset=100 开始)
T5: Consumer B 再次消费 offset=100-120
结果:消息重复
解决方案:
// 1. 手动提交
props.put("enable.auto.commit", "false");
// 2. 处理完成后提交
while (true) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord record : records) {
process(record); // 处理业务
}
consumer.commitSync(); // 同步提交
}
// 3. 或使用异步提交 + 回调
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
// 记录需要重试的偏移量
}
});
三、消息堆积案例
案例 5:消费逻辑慢导致堆积
现象:
- Consumer Lag 持续增长
- 消费 TPS 远低于生产 TPS
排查过程:
# 1. 查看消费组状态
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group order-consumer-group
# 输出:
# GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# order-consumer-group order-topic 0 10000 50000 40000
# order-consumer-group order-topic 1 10000 50000 40000
# 2. 检查消费逻辑耗时
# 添加监控
processTime.record(System.currentTimeMillis() - startTime);
# 发现:平均处理时间 500ms
# 3. 分析代码
// 发现:每条消息都调用外部 HTTP 接口
for (Message msg : messages) {
httpClient.post(...); // 同步调用,耗时 500ms
}
解决方案:
// 1. 批量处理外部调用
List<Order> orders = new ArrayList<>();
for (Message msg : messages) {
orders.add(parse(msg));
}
// 批量调用
orderService.batchProcess(orders);
// 2. 异步处理
ExecutorService executor = Executors.newFixedThreadPool(10);
for (Message msg : messages) {
executor.submit(() -> process(msg));
}
// 3. 增加消费者数量
// 从 2 个增加到 8 个
效果:
- 消费 TPS:200 → 2000
- Lag:80000 → 0(2 小时内)
案例 6:分区数不足
现象:
- 消费者数量增加后,消费速度未提升
- 部分消费者空闲
排查过程:
# 1. 查看 Topic 配置
kafka-topics.sh --describe --topic order-topic
# 输出:Partitions: 4
# 2. 查看消费者数量
# 当前:8 个消费者
# 3. 分析
# 4 个分区最多支持 4 个消费者并发消费
# 多余的 4 个消费者空闲
解决方案:
# 增加分区数
kafka-topics.sh --alter --topic order-topic --partitions 16
# 验证
kafka-topics.sh --describe --topic order-topic
# Partitions: 16
注意:
- 分区数只能增加,不能减少
- 可能影响消息顺序性
- 需要提前规划
四、性能下降案例
案例 7:磁盘 IO 瓶颈
现象:
- Broker TPS 突然下降
- 请求延迟升高
排查过程:
# 1. 检查磁盘 IO
iostat -x 1 5
# 输出:%util 接近 100%
# 2. 检查磁盘空间
df -h /data/kafka-logs
# 输出:使用率 95%
# 3. 检查 GC
jstat -gcutil <pid> 1000
# 输出:Full GC 频繁
# 4. 分析日志
tail -f /var/log/kafka/server.log
# 发现:大量 "Slowest request" 警告
解决方案:
# 1. 清理过期日志
# 修改保留策略
log.retention.hours=72 # 从 168 改为 72
# 2. 迁移数据
# 将旧数据迁移到其他磁盘
# 3. 增加磁盘
# 挂载新磁盘,扩展 log.dirs
# 4. 优化刷盘
log.flush.interval.messages=10000 # 减少刷盘频率
案例 8:网络带宽不足
现象:
- 晚高峰期间性能下降
- 网络流量接近带宽上限
排查过程:
# 1. 检查网络流量
iftop -P -n -i eth0
# 输出:900Mbps / 1Gbps
# 2. 检查消息大小
# 平均消息大小:10KB
# 峰值 TPS:10 万
# 带宽需求:10KB * 10 万 = 1GB/s
# 3. 分析
# 带宽不足,导致请求排队
解决方案:
// 1. 消息压缩
props.put("compression.type", "lz4");
// 压缩后:10KB → 3KB
// 2. 增加带宽
# 升级网络:1Gbps → 10Gbps
// 3. 优化批量
props.put("batch.size", 65536); // 增大批量
props.put("linger.ms", 10); // 增加等待时间
五、Rebalance 案例
案例 9:频繁 Rebalance
现象:
- 消费组频繁 Rebalance
- 消费中断频繁
排查过程:
# 1. 查看 Rebalance 日志
grep "Rebalance" /var/log/consumer/app.log
# 发现:每 5 分钟 Rebalance 一次
# 2. 检查配置
cat consumer.properties | grep session.timeout
# 输出:session.timeout.ms=10000
# 3. 检查处理时间
# 发现:单次 poll 处理时间超过 10 秒
解决方案:
// 1. 调整超时配置
props.put("session.timeout.ms", "30000");
props.put("max.poll.interval.ms", "300000");
props.put("heartbeat.interval.ms", "10000");
// 2. 减少单次拉取数量
props.put("max.poll.records", "100");
// 3. 优化处理逻辑
// 异步处理,快速返回
六、故障切换案例
案例 10:Broker 宕机切换
现象:
- Broker-1 突然宕机
- 生产消费短暂中断
排查过程:
# 1. 检查 Broker 状态
ps -ef | grep kafka
# 发现:Broker-1 进程不存在
# 2. 检查日志
tail -f /var/log/kafka/server.log
# 发现:OOM 错误
# 3. 检查堆配置
cat kafka-server-start.sh | grep HEAP
# 输出:-Xmx2G -Xms2G
解决方案:
# 1. 调整堆内存
export KAFKA_HEAP_OPTS="-Xmx8G -Xms8G"
# 2. 优化 GC
export KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseG1GC -XX:MaxGCPauseMillis=20"
# 3. 重启 Broker
systemctl restart kafka
# 4. 验证
kafka-broker-api-versions.sh --bootstrap-server broker-1:9092
恢复过程:
T0: Broker-1 宕机
T0+30s: Controller 检测到,触发 Leader 选举
T0+60s: Leader 选举完成,生产恢复
T0+120s: Broker-1 重启完成
T0+180s: 副本同步完成,集群恢复正常
七、排查工具
常用命令
# Topic 管理
kafka-topics.sh --describe --topic my-topic
kafka-topics.sh --alter --topic my-topic --partitions 16
# 消费组管理
kafka-consumer-groups.sh --describe --group my-group
kafka-consumer-groups.sh --reset-offsets --group my-group --to-latest --execute
# 偏移量查询
kafka-run-class.sh kafka.tools.GetOffsetShell --topic my-topic
# 日志查询
kafka-log-dirs.sh --describe --broker-list 1 --log-dirs /data/kafka-logs
# 副本重分配
kafka-reassign-partitions.sh --execute --reassignment-json-file reassign.json
# 集群信息
kafka-broker-api-versions.sh --bootstrap-server localhost:9092
kafka-metadata.sh --snapshot /tmp/kafka-logs/__cluster_metadata-0/00000000000000000000.log
监控脚本
#!/bin/bash
# Kafka 健康检查脚本
check_kafka_health() {
local brokers=$1
# 检查 Broker 连接
for broker in $(echo $brokers | tr ',' ' '); do
if ! nc -z ${broker%:*} ${broker#*:} &>/dev/null; then
echo "CRITICAL: Broker $broker 无法连接"
return 2
fi
done
# 检查未同步副本
under_replicated=$(kafka-topics.sh --bootstrap-server $brokers --describe | \
grep -c "Isr:" || echo "0")
if [ "$under_replicated" -gt 0 ]; then
echo "WARNING: 存在 $under_replicated 个未同步副本"
return 1
fi
# 检查离线分区
offline=$(kafka-topics.sh --bootstrap-server $brokers --describe | \
grep -c "Offline" || echo "0")
if [ "$offline" -gt 0 ]; then
echo "CRITICAL: 存在 $offline 个离线分区"
return 2
fi
echo "OK: Kafka 集群健康"
return 0
}
# 使用
check_kafka_health "kafka-1:9092,kafka-2:9092,kafka-3:9092"
总结
Kafka 生产问题排查的核心要点:
- 消息丢失:acks=all、幂等性、重试机制
- 消息重复:幂等性、手动提交、去重逻辑
- 消息堆积:增加消费者、优化逻辑、增加分区
- 性能下降:磁盘 IO、网络带宽、GC 优化
- Rebalance:超时配置、处理时间、心跳间隔
核心要点:
- 建立完善的监控告警体系
- 掌握常用排查命令和工具
- 积累典型问题案例库
- 定期演练故障处理流程
参考资料
- Kafka Troubleshooting 官方文档
- Kafka 运维最佳实践
- 《Kafka 权威指南》第 12 章