Skip to content
清晨的一缕阳光
返回

Kafka 最佳实践与生产经验总结

本文总结了 Kafka 在生产环境中的最佳实践和经验教训,涵盖架构设计、配置优化、故障处理等方面。

一、架构设计

1.1 部署架构

推荐架构

graph TB
    subgraph 可用区 A
        K1[Kafka Broker 1]
        K2[Kafka Broker 2]
        ZK1[ZooKeeper 1]
    end
    
    subgraph 可用区 B
        K3[Kafka Broker 3]
        K4[Kafka Broker 4]
        ZK2[ZooKeeper 2]
    end
    
    subgraph 可用区 C
        K5[Kafka Broker 5]
        K6[Kafka Broker 6]
        ZK3[ZooKeeper 3]
    end
    
    ZK1 -.-> ZK2
    ZK2 -.-> ZK3
    
    K1 -.-> K4
    K2 -.-> K5
    K3 -.-> K6

部署建议

组件数量部署方式
Kafka Broker3-7跨可用区
ZooKeeper3 或 5奇数节点
Topic 副本3跨机架

1.2 Topic 设计

命名规范

# 格式
{业务线}.{子系统}.{事件类型}

# 示例
order.trade.created      # 订单 - 交易 - 创建
pay.transaction.completed # 支付 - 交易 - 完成
user.registered.success   # 用户 - 注册 - 成功

分区规划

// 分区数计算公式
int partitionNum = Math.max(
    expectedTPS / singlePartitionTPS,  // 基于吞吐量
    maxConsumerConcurrency             // 基于消费并发
);

// 推荐配置
| 场景 | TPS | 分区数 |
|------|-----|--------|
| 低并发 | < 1000 | 3-6 |
| 中并发 | 1000-5000 | 6-12 |
| 高并发 | 5000-20000 | 12-24 |
| 超高并发 | > 20000 | 24-48 |

1.3 消息设计

消息体大小

// 推荐:< 1KB
// 最大:1MB(默认)

// 大消息处理方案
public void sendLargeMessage(LargeObject obj) {
    // 方案 1:压缩
    byte[] compressed = compress(obj);
    
    // 方案 2:分片
    List<byte[]> shards = split(obj);
    for (byte[] shard : shards) {
        send(shard);
    }
    
    // 方案 3:存储外部,消息传引用
    String url = storage.save(obj);
    send(new ReferenceMessage(url));
}

Key 设计

// 保证顺序的消息需要 Key
// Key 用于分区哈希

// ✅ 好的 Key 设计
message.setKey(orderId.toString());  // 订单 ID
message.setKey(userId.toString());   // 用户 ID

// ❌ 不好的 Key 设计
message.setKey(UUID.randomUUID().toString());  // 随机 UUID,失去顺序性
message.setKey(null);  // 无 Key,轮询分区

二、配置优化

2.1 Producer 配置

public class ProducerConfig {
    
    public static Properties getHighThroughputProps() {
        Properties props = new Properties();
        
        // 基础配置
        props.put("bootstrap.servers", "kafka-1:9092,kafka-2:9092,kafka-3:9092");
        
        // 可靠性配置
        props.put("acks", "all");
        props.put("retries", Integer.MAX_VALUE);
        props.put("enable.idempotence", true);
        props.put("max.in.flight.requests.per.connection", 5);
        
        // 性能配置
        props.put("batch.size", 65536);        // 64KB
        props.put("linger.ms", 10);            // 10ms
        props.put("compression.type", "lz4");  // 压缩
        props.put("buffer.memory", 67108864);  // 64MB
        
        return props;
    }
    
    public static Properties getLowLatencyProps() {
        Properties props = new Properties();
        
        // 基础配置
        props.put("bootstrap.servers", "kafka-1:9092,kafka-2:9092,kafka-3:9092");
        
        // 低延迟配置
        props.put("acks", "1");
        props.put("batch.size", 16384);
        props.put("linger.ms", 0);
        props.put("compression.type", "none");
        
        return props;
    }
}

2.2 Consumer 配置

public class ConsumerConfig {
    
    public static Properties getProps(String groupId) {
        Properties props = new Properties();
        
        // 基础配置
        props.put("bootstrap.servers", "kafka-1:9092,kafka-2:9092,kafka-3:9092");
        props.put("group.id", groupId);
        
        // 自动提交
        props.put("enable.auto.commit", "false");
        
        // 拉取配置
        props.put("fetch.min.bytes", 1024);
        props.put("fetch.max.wait.ms", 100);
        props.put("max.poll.records", 500);
        
        // 反序列化
        props.put("key.deserializer", StringDeserializer.class);
        props.put("value.deserializer", StringDeserializer.class);
        
        return props;
    }
}

2.3 Broker 配置

# server.properties

# 基础配置
broker.id=0
listeners=PLAINTEXT://:9092
log.dirs=/data/kafka-logs

# 性能配置
num.network.threads=8
num.io.threads=16
num.replica.fetchers=2

# 日志配置
log.segment.bytes=1073741824       # 1GB
log.retention.hours=168            # 7 天
log.retention.bytes=-1             # 不限制

# 副本配置
default.replication.factor=3
min.insync.replicas=2
unclean.leader.election.enable=false

# 刷盘配置(异步)
log.flush.interval.messages=10000
log.flush.interval.ms=1000

三、消息可靠性

3.1 发送可靠性

public class ReliableProducer {
    
    /**
     * 可靠发送(同步 + 重试)
     */
    public boolean sendReliable(Message msg) {
        int maxRetries = 3;
        for (int i = 0; i < maxRetries; i++) {
            try {
                SendResult result = producer.send(msg);
                if (result != null) {
                    return true;
                }
            } catch (Exception e) {
                log.warn("发送失败,第 {} 次重试", i + 1, e);
            }
        }
        return false;
    }
    
    /**
     * 异步发送带回调
     */
    public void sendAsync(Message msg, SendCallback callback) {
        producer.send(msg, (metadata, exception) -> {
            if (exception == null) {
                callback.onSuccess(metadata);
            } else {
                callback.onFailure(exception);
            }
        });
    }
}

interface SendCallback {
    void onSuccess(RecordMetadata metadata);
    void onFailure(Exception exception);
}

3.2 消费可靠性

public class ReliableConsumer {
    
    /**
     * 可靠消费(幂等 + 手动提交)
     */
    public ConsumeResult consumeMessage(ConsumerRecords<String, String> records) {
        for (ConsumerRecord<String, String> record : records) {
            String msgId = record.key();
            
            // 1. 幂等检查
            if (isProcessed(msgId)) {
                continue;
            }
            
            try {
                // 2. 业务处理
                processBusiness(record);
                
                // 3. 标记已处理
                markAsProcessed(msgId);
                
            } catch (BusinessException e) {
                // 业务异常,记录日志
                log.error("业务异常:msgId={}", msgId, e);
                saveFailedMessage(record);
                
            } catch (Exception e) {
                // 系统异常,抛出重试
                throw e;
            }
        }
        
        // 4. 提交位移
        consumer.commitSync();
        
        return ConsumeResult.SUCCESS;
    }
}

3.3 消息追踪

public class MessageTrace {
    
    /**
     * 发送追踪
     */
    public void traceSend(String msgId, String topic, SendResult result) {
        TraceMessage trace = new TraceMessage();
        trace.setMsgId(msgId);
        trace.setTopic(topic);
        trace.setOffset(result.offset());
        trace.setSendTime(System.currentTimeMillis());
        trace.setStatus("SENT");
        
        traceService.save(trace);
    }
    
    /**
     * 消费追踪
     */
    public void traceConsume(String msgId, String groupId, boolean success) {
        TraceMessage trace = traceService.findByMsgId(msgId);
        if (trace != null) {
            trace.setConsumeTime(System.currentTimeMillis());
            trace.setConsumerGroup(groupId);
            trace.setConsumeSuccess(success);
            traceService.update(trace);
        }
    }
    
    /**
     * 查询消息轨迹
     */
    public MessageTrace getTrace(String msgId) {
        return traceService.findByMsgId(msgId);
    }
}

四、故障处理

4.1 常见问题

问题原因解决方案
消息丢失acks=0/1acks=all + 幂等性
消息重复网络重试实现幂等性
消息堆积消费慢增加消费者、优化逻辑
Rebalance 频繁会话超时调整超时配置
发送失败Broker 故障重试、切换 Broker

4.2 消息堆积处理

#!/bin/bash
# 消息堆积应急处理脚本

TOPIC="order-topic"
GROUP="order-consumer-group"

# 1. 查看堆积情况
echo "=== 消费组状态 ==="
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group $GROUP

# 2. 查看消费者状态
echo "=== 消费者状态 ==="
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --group $GROUP --members --verbose

# 3. 临时增加消费者
echo "=== 启动临时消费者 ==="
nohup java -jar consumer.jar --group=$GROUP-temp &

# 4. 如需要,跳过部分消息(慎用)
echo "=== 重置偏移量 ==="
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --group $GROUP --topic $TOPIC --reset-offsets --to-latest --execute

4.3 Broker 故障处理

#!/bin/bash
# Broker 故障切换脚本

FAILEDBROKER="kafka-broker-1"
NEWBROKER="kafka-broker-4"

# 1. 停止故障 Broker
echo "停止故障 Broker..."
ssh $FAILEDBROKER "systemctl stop kafka"

# 2. 启动备用 Broker
echo "启动备用 Broker..."
ssh $NEWBROKER "systemctl start kafka"

# 3. 检查状态
echo "检查 Broker 状态..."
kafka-broker-api-versions.sh --bootstrap-server $NEWBROKER:9092

# 4. 发送告警
sendAlert "Broker 故障切换完成:$FAILEDBROKER -> $NEWBROKER"

五、性能优化

5.1 吞吐量优化

// 高吞吐配置
public class HighThroughputConfig {
    
    public static Properties getProducerProps() {
        Properties props = new Properties();
        
        // 批量发送
        props.put("batch.size", 131072);  // 128KB
        props.put("linger.ms", 20);       // 20ms
        
        // 压缩
        props.put("compression.type", "lz4");
        
        // 可靠性(适当降低)
        props.put("acks", "1");
        
        return props;
    }
    
    public static Properties getConsumerProps() {
        Properties props = new Properties();
        
        // 批量拉取
        props.put("fetch.min.bytes", 65536);  // 64KB
        props.put("fetch.max.wait.ms", 200);  // 200ms
        props.put("max.poll.records", 1000);  // 1000 条
        
        // 增加并发
        props.put("max.poll.interval.ms", 600000);
        
        return props;
    }
}

5.2 延迟优化

// 低延迟配置
public class LowLatencyConfig {
    
    public static Properties getProducerProps() {
        Properties props = new Properties();
        
        // 不等待
        props.put("batch.size", 16384);  // 16KB
        props.put("linger.ms", 0);       // 不等待
        
        // 不压缩(减少 CPU 开销)
        props.put("compression.type", "none");
        
        // 可靠性
        props.put("acks", "all");
        
        return props;
    }
}

六、监控告警

6.1 关键指标

指标告警阈值说明
生产 TPS< 1000写入速率过低
消费 TPS< 1000消费速率过低
Consumer Lag> 10000消息堆积
发送失败率> 1%发送异常
消费失败率> 1%消费异常
UnderReplicated> 0未同步副本

6.2 Prometheus 告警

groups:
  - name: kafka
    rules:
      - alert: KafkaBrokerDown
        expr: up{job="kafka-broker"} == 0
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Broker 宕机:{{ $labels.instance }}"
      
      - alert: KafkaConsumerLag
        expr: kafka_consumer_group_lag > 10000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "消费滞后:{{ $labels.group }} - {{ $value }}"
      
      - alert: KafkaUnderReplicatedPartitions
        expr: kafka_server_ReplicaManager_UnderReplicatedPartitions > 0
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "存在未同步副本"

七、运维管理

7.1 日常巡检

#!/bin/bash
# 日常巡检脚本

echo "=== Kafka 日常巡检 ==="
echo "时间:$(date)"

# 1. 检查 Broker
echo -e "\n=== Broker 状态 ==="
for broker in kafka-1 kafka-2 kafka-3; do
    echo "检查 $broker..."
    kafka-broker-api-versions.sh --bootstrap-server $broker:9092 &>/dev/null
    if [ $? -eq 0 ]; then
        echo "$broker 正常"
    else
        echo "$broker 异常"
    fi
done

# 2. 检查 Topic
echo -e "\n=== Topic 统计 ==="
kafka-topics.sh --bootstrap-server localhost:9092 --describe | wc -l

# 3. 检查消费组
echo -e "\n=== 消费组进度 ==="
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

# 4. 检查磁盘
echo -e "\n=== 磁盘使用 ==="
df -h /data/kafka-logs

# 5. 检查未同步副本
echo -e "\n=== 未同步副本 ==="
kafka-topics.sh --bootstrap-server localhost:9092 --describe | grep -c "Isr:"

7.2 备份策略

#!/bin/bash
# 元数据备份脚本

BACKUP_DIR="/backup/kafka"
DATE=$(date +%Y%m%d)

# 备份 Topic 配置
kafka-topics.sh --bootstrap-server localhost:9092 --describe > \
  $BACKUP_DIR/topics_$DATE.txt

# 备份消费组偏移量
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
  --describe --all-groups > \
  $BACKUP_DIR/consumer-groups_$DATE.txt

# 备份 ACL
kafka-acls.sh --bootstrap-server localhost:9092 --list > \
  $BACKUP_DIR/acls_$DATE.txt

# 保留 30 天
find $BACKUP_DIR -name "*.txt" -mtime +30 -delete

八、经验总结

8.1 设计原则

  1. 消息要轻量:消息体尽量小于 1KB
  2. Key 要合理:用于分区和顺序保证
  3. 消费要幂等:防止重复消费
  4. 监控要完善:关键指标告警
  5. 配置要合理:根据场景选择

8.2 避坑指南

表现避免方法
消息重复数据重复实现幂等性
消息丢失数据不一致acks=all + 幂等性
消息堆积延迟高监控 + 扩容
顺序错乱业务异常合理设置 Key
Rebalance 频繁消费中断调整超时配置

8.3 检查清单

上线前检查

运维检查

总结

Kafka 最佳实践的核心要点:

  1. 架构设计:合理部署、Topic 规划、消息设计
  2. 配置优化:Producer、Consumer、Broker 配置
  3. 消息可靠:发送可靠、消费可靠、消息追踪
  4. 故障处理:常见问题、应急处理
  5. 运维管理:日常巡检、备份策略

核心要点

参考资料


分享这篇文章到:

上一篇文章
Kafka 性能调优进阶实战
下一篇文章
Spring Cloud 微服务系列完整学习指南