Skip to content
清晨的一缕阳光
返回

Kafka 生产问题排查案例集

本文汇总了 Kafka 生产环境中的典型问题排查案例,包括消息丢失、重复、堆积、性能下降等问题的分析和解决方案。

一、消息丢失案例

案例 1:acks=1 导致消息丢失

现象

排查过程

# 1. 检查 Producer 配置
cat producer.properties | grep acks
# 发现:acks=1

# 2. 检查 Broker 日志
tail -f /var/log/kafka/server.log | grep "Leader changed"
# 发现:频繁的 Leader 切换

# 3. 模拟故障
# 在消息发送时 kill 掉 Leader Broker
# 发现:部分消息丢失

原因分析

Producer -> Broker1(Leader) -> 确认

              Broker2(Follower)  // 未同步完成,Broker1 宕机
              
结果:消息丢失

解决方案

// 修改配置
props.put("acks", "all");
props.put("min.insync.replicas", "2");
props.put("retries", Integer.MAX_VALUE);
props.put("enable.idempotence", true);

效果:消息丢失问题彻底解决


案例 2:未处理发送异常

现象

排查过程

// 原代码
producer.send(record);  // 异步发送,未处理回调

// 修改为
producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        log.error("发送失败", exception);
        // 记录失败消息,人工处理
        saveFailedMessage(record);
    }
});

解决方案

public class ReliableProducer {
    public void sendWithCallback(ProducerRecord record) {
        producer.send(record, this::handleCallback);
    }
    
    private void handleCallback(RecordMetadata metadata, Exception exception) {
        if (exception != null) {
            // 重试逻辑
            if (isRetriable(exception)) {
                retrySend(metadata, exception);
            } else {
                // 记录失败
                saveFailedMessage(metadata, exception);
                sendAlert("消息发送失败");
            }
        }
    }
}

二、消息重复案例

案例 3:网络重试导致重复

现象

排查过程

# 1. 检查 Producer 日志
grep "Retrying" /var/log/producer/app.log
# 发现:大量重试记录

# 2. 检查网络
ping kafka-broker-1
# 发现:网络偶尔超时

# 3. 分析时间线
T1: Producer 发送消息
T2: Broker 接收并写入
T3: 响应超时(网络问题)
T4: Producer 重试发送
T5: Broker 再次写入
结果:消息重复

解决方案

// 1. 开启幂等性
props.put("enable.idempotence", true);
props.put("max.in.flight.requests.per.connection", 5);

// 2. 业务层去重
@RedissonLock(key = "#msgId")
public void processMessage(String msgId, Message msg) {
    // 检查是否已处理
    if (isProcessed(msgId)) {
        return;
    }
    
    // 处理业务
    processBusiness(msg);
    
    // 标记已处理
    markAsProcessed(msgId);
}

案例 4:Rebalance 导致重复

现象

排查过程

// 原代码
props.put("enable.auto.commit", "true");  // 自动提交

// 问题场景
T1: Consumer A 拉取消息(offset=100-150
T2: 处理消息(offset=100-120
T3: Rebalance 触发
T4: Consumer B 接管(从 offset=100 开始)
T5: Consumer B 再次消费 offset=100-120
结果:消息重复

解决方案

// 1. 手动提交
props.put("enable.auto.commit", "false");

// 2. 处理完成后提交
while (true) {
    ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
    
    for (ConsumerRecord record : records) {
        process(record);  // 处理业务
    }
    
    consumer.commitSync();  // 同步提交
}

// 3. 或使用异步提交 + 回调
consumer.commitAsync((offsets, exception) -> {
    if (exception != null) {
        // 记录需要重试的偏移量
    }
});

三、消息堆积案例

案例 5:消费逻辑慢导致堆积

现象

排查过程

# 1. 查看消费组状态
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group order-consumer-group

# 输出:
# GROUP                  TOPIC          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
# order-consumer-group   order-topic    0          10000           50000           40000
# order-consumer-group   order-topic    1          10000           50000           40000

# 2. 检查消费逻辑耗时
# 添加监控
processTime.record(System.currentTimeMillis() - startTime);
# 发现:平均处理时间 500ms

# 3. 分析代码
// 发现:每条消息都调用外部 HTTP 接口
for (Message msg : messages) {
    httpClient.post(...);  // 同步调用,耗时 500ms
}

解决方案

// 1. 批量处理外部调用
List<Order> orders = new ArrayList<>();
for (Message msg : messages) {
    orders.add(parse(msg));
}
// 批量调用
orderService.batchProcess(orders);

// 2. 异步处理
ExecutorService executor = Executors.newFixedThreadPool(10);
for (Message msg : messages) {
    executor.submit(() -> process(msg));
}

// 3. 增加消费者数量
// 从 2 个增加到 8 个

效果


案例 6:分区数不足

现象

排查过程

# 1. 查看 Topic 配置
kafka-topics.sh --describe --topic order-topic
# 输出:Partitions: 4

# 2. 查看消费者数量
# 当前:8 个消费者

# 3. 分析
# 4 个分区最多支持 4 个消费者并发消费
# 多余的 4 个消费者空闲

解决方案

# 增加分区数
kafka-topics.sh --alter --topic order-topic --partitions 16

# 验证
kafka-topics.sh --describe --topic order-topic
# Partitions: 16

注意

四、性能下降案例

案例 7:磁盘 IO 瓶颈

现象

排查过程

# 1. 检查磁盘 IO
iostat -x 1 5
# 输出:%util 接近 100%

# 2. 检查磁盘空间
df -h /data/kafka-logs
# 输出:使用率 95%

# 3. 检查 GC
jstat -gcutil <pid> 1000
# 输出:Full GC 频繁

# 4. 分析日志
tail -f /var/log/kafka/server.log
# 发现:大量 "Slowest request" 警告

解决方案

# 1. 清理过期日志
# 修改保留策略
log.retention.hours=72  # 从 168 改为 72

# 2. 迁移数据
# 将旧数据迁移到其他磁盘

# 3. 增加磁盘
# 挂载新磁盘,扩展 log.dirs

# 4. 优化刷盘
log.flush.interval.messages=10000  # 减少刷盘频率

案例 8:网络带宽不足

现象

排查过程

# 1. 检查网络流量
iftop -P -n -i eth0
# 输出:900Mbps / 1Gbps

# 2. 检查消息大小
# 平均消息大小:10KB
# 峰值 TPS:10 万
# 带宽需求:10KB * 10 万 = 1GB/s

# 3. 分析
# 带宽不足,导致请求排队

解决方案

// 1. 消息压缩
props.put("compression.type", "lz4");
// 压缩后:10KB → 3KB

// 2. 增加带宽
# 升级网络:1Gbps → 10Gbps

// 3. 优化批量
props.put("batch.size", 65536);  // 增大批量
props.put("linger.ms", 10);      // 增加等待时间

五、Rebalance 案例

案例 9:频繁 Rebalance

现象

排查过程

# 1. 查看 Rebalance 日志
grep "Rebalance" /var/log/consumer/app.log
# 发现:每 5 分钟 Rebalance 一次

# 2. 检查配置
cat consumer.properties | grep session.timeout
# 输出:session.timeout.ms=10000

# 3. 检查处理时间
# 发现:单次 poll 处理时间超过 10 秒

解决方案

// 1. 调整超时配置
props.put("session.timeout.ms", "30000");
props.put("max.poll.interval.ms", "300000");
props.put("heartbeat.interval.ms", "10000");

// 2. 减少单次拉取数量
props.put("max.poll.records", "100");

// 3. 优化处理逻辑
// 异步处理,快速返回

六、故障切换案例

案例 10:Broker 宕机切换

现象

排查过程

# 1. 检查 Broker 状态
ps -ef | grep kafka
# 发现:Broker-1 进程不存在

# 2. 检查日志
tail -f /var/log/kafka/server.log
# 发现:OOM 错误

# 3. 检查堆配置
cat kafka-server-start.sh | grep HEAP
# 输出:-Xmx2G -Xms2G

解决方案

# 1. 调整堆内存
export KAFKA_HEAP_OPTS="-Xmx8G -Xms8G"

# 2. 优化 GC
export KAFKA_JVM_PERFORMANCE_OPTS="-XX:+UseG1GC -XX:MaxGCPauseMillis=20"

# 3. 重启 Broker
systemctl restart kafka

# 4. 验证
kafka-broker-api-versions.sh --bootstrap-server broker-1:9092

恢复过程

T0: Broker-1 宕机
T0+30s: Controller 检测到,触发 Leader 选举
T0+60s: Leader 选举完成,生产恢复
T0+120s: Broker-1 重启完成
T0+180s: 副本同步完成,集群恢复正常

七、排查工具

常用命令

# Topic 管理
kafka-topics.sh --describe --topic my-topic
kafka-topics.sh --alter --topic my-topic --partitions 16

# 消费组管理
kafka-consumer-groups.sh --describe --group my-group
kafka-consumer-groups.sh --reset-offsets --group my-group --to-latest --execute

# 偏移量查询
kafka-run-class.sh kafka.tools.GetOffsetShell --topic my-topic

# 日志查询
kafka-log-dirs.sh --describe --broker-list 1 --log-dirs /data/kafka-logs

# 副本重分配
kafka-reassign-partitions.sh --execute --reassignment-json-file reassign.json

# 集群信息
kafka-broker-api-versions.sh --bootstrap-server localhost:9092
kafka-metadata.sh --snapshot /tmp/kafka-logs/__cluster_metadata-0/00000000000000000000.log

监控脚本

#!/bin/bash
# Kafka 健康检查脚本

check_kafka_health() {
    local brokers=$1
    
    # 检查 Broker 连接
    for broker in $(echo $brokers | tr ',' ' '); do
        if ! nc -z ${broker%:*} ${broker#*:} &>/dev/null; then
            echo "CRITICAL: Broker $broker 无法连接"
            return 2
        fi
    done
    
    # 检查未同步副本
    under_replicated=$(kafka-topics.sh --bootstrap-server $brokers --describe | \
        grep -c "Isr:" || echo "0")
    
    if [ "$under_replicated" -gt 0 ]; then
        echo "WARNING: 存在 $under_replicated 个未同步副本"
        return 1
    fi
    
    # 检查离线分区
    offline=$(kafka-topics.sh --bootstrap-server $brokers --describe | \
        grep -c "Offline" || echo "0")
    
    if [ "$offline" -gt 0 ]; then
        echo "CRITICAL: 存在 $offline 个离线分区"
        return 2
    fi
    
    echo "OK: Kafka 集群健康"
    return 0
}

# 使用
check_kafka_health "kafka-1:9092,kafka-2:9092,kafka-3:9092"

总结

Kafka 生产问题排查的核心要点:

  1. 消息丢失:acks=all、幂等性、重试机制
  2. 消息重复:幂等性、手动提交、去重逻辑
  3. 消息堆积:增加消费者、优化逻辑、增加分区
  4. 性能下降:磁盘 IO、网络带宽、GC 优化
  5. Rebalance:超时配置、处理时间、心跳间隔

核心要点

参考资料


分享这篇文章到:

上一篇文章
Kafka 消息设计最佳实践与案例
下一篇文章
Redis 云原生部署方案