本文总结了 Kafka 在生产环境中的最佳实践和经验教训,涵盖架构设计、配置优化、故障处理等方面。
一、架构设计
1.1 部署架构
推荐架构:
graph TB
subgraph 可用区 A
K1[Kafka Broker 1]
K2[Kafka Broker 2]
ZK1[ZooKeeper 1]
end
subgraph 可用区 B
K3[Kafka Broker 3]
K4[Kafka Broker 4]
ZK2[ZooKeeper 2]
end
subgraph 可用区 C
K5[Kafka Broker 5]
K6[Kafka Broker 6]
ZK3[ZooKeeper 3]
end
ZK1 -.-> ZK2
ZK2 -.-> ZK3
K1 -.-> K4
K2 -.-> K5
K3 -.-> K6
部署建议:
| 组件 | 数量 | 部署方式 |
|---|---|---|
| Kafka Broker | 3-7 | 跨可用区 |
| ZooKeeper | 3 或 5 | 奇数节点 |
| Topic 副本 | 3 | 跨机架 |
1.2 Topic 设计
命名规范:
# 格式
{业务线}.{子系统}.{事件类型}
# 示例
order.trade.created # 订单 - 交易 - 创建
pay.transaction.completed # 支付 - 交易 - 完成
user.registered.success # 用户 - 注册 - 成功
分区规划:
// 分区数计算公式
int partitionNum = Math.max(
expectedTPS / singlePartitionTPS, // 基于吞吐量
maxConsumerConcurrency // 基于消费并发
);
// 推荐配置
| 场景 | TPS | 分区数 |
|------|-----|--------|
| 低并发 | < 1000 | 3-6 |
| 中并发 | 1000-5000 | 6-12 |
| 高并发 | 5000-20000 | 12-24 |
| 超高并发 | > 20000 | 24-48 |
1.3 消息设计
消息体大小:
// 推荐:< 1KB
// 最大:1MB(默认)
// 大消息处理方案
public void sendLargeMessage(LargeObject obj) {
// 方案 1:压缩
byte[] compressed = compress(obj);
// 方案 2:分片
List<byte[]> shards = split(obj);
for (byte[] shard : shards) {
send(shard);
}
// 方案 3:存储外部,消息传引用
String url = storage.save(obj);
send(new ReferenceMessage(url));
}
Key 设计:
// 保证顺序的消息需要 Key
// Key 用于分区哈希
// ✅ 好的 Key 设计
message.setKey(orderId.toString()); // 订单 ID
message.setKey(userId.toString()); // 用户 ID
// ❌ 不好的 Key 设计
message.setKey(UUID.randomUUID().toString()); // 随机 UUID,失去顺序性
message.setKey(null); // 无 Key,轮询分区
二、配置优化
2.1 Producer 配置
public class ProducerConfig {
public static Properties getHighThroughputProps() {
Properties props = new Properties();
// 基础配置
props.put("bootstrap.servers", "kafka-1:9092,kafka-2:9092,kafka-3:9092");
// 可靠性配置
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);
props.put("enable.idempotence", true);
props.put("max.in.flight.requests.per.connection", 5);
// 性能配置
props.put("batch.size", 65536); // 64KB
props.put("linger.ms", 10); // 10ms
props.put("compression.type", "lz4"); // 压缩
props.put("buffer.memory", 67108864); // 64MB
return props;
}
public static Properties getLowLatencyProps() {
Properties props = new Properties();
// 基础配置
props.put("bootstrap.servers", "kafka-1:9092,kafka-2:9092,kafka-3:9092");
// 低延迟配置
props.put("acks", "1");
props.put("batch.size", 16384);
props.put("linger.ms", 0);
props.put("compression.type", "none");
return props;
}
}
2.2 Consumer 配置
public class ConsumerConfig {
public static Properties getProps(String groupId) {
Properties props = new Properties();
// 基础配置
props.put("bootstrap.servers", "kafka-1:9092,kafka-2:9092,kafka-3:9092");
props.put("group.id", groupId);
// 自动提交
props.put("enable.auto.commit", "false");
// 拉取配置
props.put("fetch.min.bytes", 1024);
props.put("fetch.max.wait.ms", 100);
props.put("max.poll.records", 500);
// 反序列化
props.put("key.deserializer", StringDeserializer.class);
props.put("value.deserializer", StringDeserializer.class);
return props;
}
}
2.3 Broker 配置
# server.properties
# 基础配置
broker.id=0
listeners=PLAINTEXT://:9092
log.dirs=/data/kafka-logs
# 性能配置
num.network.threads=8
num.io.threads=16
num.replica.fetchers=2
# 日志配置
log.segment.bytes=1073741824 # 1GB
log.retention.hours=168 # 7 天
log.retention.bytes=-1 # 不限制
# 副本配置
default.replication.factor=3
min.insync.replicas=2
unclean.leader.election.enable=false
# 刷盘配置(异步)
log.flush.interval.messages=10000
log.flush.interval.ms=1000
三、消息可靠性
3.1 发送可靠性
public class ReliableProducer {
/**
* 可靠发送(同步 + 重试)
*/
public boolean sendReliable(Message msg) {
int maxRetries = 3;
for (int i = 0; i < maxRetries; i++) {
try {
SendResult result = producer.send(msg);
if (result != null) {
return true;
}
} catch (Exception e) {
log.warn("发送失败,第 {} 次重试", i + 1, e);
}
}
return false;
}
/**
* 异步发送带回调
*/
public void sendAsync(Message msg, SendCallback callback) {
producer.send(msg, (metadata, exception) -> {
if (exception == null) {
callback.onSuccess(metadata);
} else {
callback.onFailure(exception);
}
});
}
}
interface SendCallback {
void onSuccess(RecordMetadata metadata);
void onFailure(Exception exception);
}
3.2 消费可靠性
public class ReliableConsumer {
/**
* 可靠消费(幂等 + 手动提交)
*/
public ConsumeResult consumeMessage(ConsumerRecords<String, String> records) {
for (ConsumerRecord<String, String> record : records) {
String msgId = record.key();
// 1. 幂等检查
if (isProcessed(msgId)) {
continue;
}
try {
// 2. 业务处理
processBusiness(record);
// 3. 标记已处理
markAsProcessed(msgId);
} catch (BusinessException e) {
// 业务异常,记录日志
log.error("业务异常:msgId={}", msgId, e);
saveFailedMessage(record);
} catch (Exception e) {
// 系统异常,抛出重试
throw e;
}
}
// 4. 提交位移
consumer.commitSync();
return ConsumeResult.SUCCESS;
}
}
3.3 消息追踪
public class MessageTrace {
/**
* 发送追踪
*/
public void traceSend(String msgId, String topic, SendResult result) {
TraceMessage trace = new TraceMessage();
trace.setMsgId(msgId);
trace.setTopic(topic);
trace.setOffset(result.offset());
trace.setSendTime(System.currentTimeMillis());
trace.setStatus("SENT");
traceService.save(trace);
}
/**
* 消费追踪
*/
public void traceConsume(String msgId, String groupId, boolean success) {
TraceMessage trace = traceService.findByMsgId(msgId);
if (trace != null) {
trace.setConsumeTime(System.currentTimeMillis());
trace.setConsumerGroup(groupId);
trace.setConsumeSuccess(success);
traceService.update(trace);
}
}
/**
* 查询消息轨迹
*/
public MessageTrace getTrace(String msgId) {
return traceService.findByMsgId(msgId);
}
}
四、故障处理
4.1 常见问题
| 问题 | 原因 | 解决方案 |
|---|---|---|
| 消息丢失 | acks=0/1 | acks=all + 幂等性 |
| 消息重复 | 网络重试 | 实现幂等性 |
| 消息堆积 | 消费慢 | 增加消费者、优化逻辑 |
| Rebalance 频繁 | 会话超时 | 调整超时配置 |
| 发送失败 | Broker 故障 | 重试、切换 Broker |
4.2 消息堆积处理
#!/bin/bash
# 消息堆积应急处理脚本
TOPIC="order-topic"
GROUP="order-consumer-group"
# 1. 查看堆积情况
echo "=== 消费组状态 ==="
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group $GROUP
# 2. 查看消费者状态
echo "=== 消费者状态 ==="
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group $GROUP --members --verbose
# 3. 临时增加消费者
echo "=== 启动临时消费者 ==="
nohup java -jar consumer.jar --group=$GROUP-temp &
# 4. 如需要,跳过部分消息(慎用)
echo "=== 重置偏移量 ==="
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group $GROUP --topic $TOPIC --reset-offsets --to-latest --execute
4.3 Broker 故障处理
#!/bin/bash
# Broker 故障切换脚本
FAILEDBROKER="kafka-broker-1"
NEWBROKER="kafka-broker-4"
# 1. 停止故障 Broker
echo "停止故障 Broker..."
ssh $FAILEDBROKER "systemctl stop kafka"
# 2. 启动备用 Broker
echo "启动备用 Broker..."
ssh $NEWBROKER "systemctl start kafka"
# 3. 检查状态
echo "检查 Broker 状态..."
kafka-broker-api-versions.sh --bootstrap-server $NEWBROKER:9092
# 4. 发送告警
sendAlert "Broker 故障切换完成:$FAILEDBROKER -> $NEWBROKER"
五、性能优化
5.1 吞吐量优化
// 高吞吐配置
public class HighThroughputConfig {
public static Properties getProducerProps() {
Properties props = new Properties();
// 批量发送
props.put("batch.size", 131072); // 128KB
props.put("linger.ms", 20); // 20ms
// 压缩
props.put("compression.type", "lz4");
// 可靠性(适当降低)
props.put("acks", "1");
return props;
}
public static Properties getConsumerProps() {
Properties props = new Properties();
// 批量拉取
props.put("fetch.min.bytes", 65536); // 64KB
props.put("fetch.max.wait.ms", 200); // 200ms
props.put("max.poll.records", 1000); // 1000 条
// 增加并发
props.put("max.poll.interval.ms", 600000);
return props;
}
}
5.2 延迟优化
// 低延迟配置
public class LowLatencyConfig {
public static Properties getProducerProps() {
Properties props = new Properties();
// 不等待
props.put("batch.size", 16384); // 16KB
props.put("linger.ms", 0); // 不等待
// 不压缩(减少 CPU 开销)
props.put("compression.type", "none");
// 可靠性
props.put("acks", "all");
return props;
}
}
六、监控告警
6.1 关键指标
| 指标 | 告警阈值 | 说明 |
|---|---|---|
| 生产 TPS | < 1000 | 写入速率过低 |
| 消费 TPS | < 1000 | 消费速率过低 |
| Consumer Lag | > 10000 | 消息堆积 |
| 发送失败率 | > 1% | 发送异常 |
| 消费失败率 | > 1% | 消费异常 |
| UnderReplicated | > 0 | 未同步副本 |
6.2 Prometheus 告警
groups:
- name: kafka
rules:
- alert: KafkaBrokerDown
expr: up{job="kafka-broker"} == 0
for: 1m
labels:
severity: critical
annotations:
summary: "Broker 宕机:{{ $labels.instance }}"
- alert: KafkaConsumerLag
expr: kafka_consumer_group_lag > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "消费滞后:{{ $labels.group }} - {{ $value }}"
- alert: KafkaUnderReplicatedPartitions
expr: kafka_server_ReplicaManager_UnderReplicatedPartitions > 0
for: 5m
labels:
severity: warning
annotations:
summary: "存在未同步副本"
七、运维管理
7.1 日常巡检
#!/bin/bash
# 日常巡检脚本
echo "=== Kafka 日常巡检 ==="
echo "时间:$(date)"
# 1. 检查 Broker
echo -e "\n=== Broker 状态 ==="
for broker in kafka-1 kafka-2 kafka-3; do
echo "检查 $broker..."
kafka-broker-api-versions.sh --bootstrap-server $broker:9092 &>/dev/null
if [ $? -eq 0 ]; then
echo " ✓ $broker 正常"
else
echo " ✗ $broker 异常"
fi
done
# 2. 检查 Topic
echo -e "\n=== Topic 统计 ==="
kafka-topics.sh --bootstrap-server localhost:9092 --describe | wc -l
# 3. 检查消费组
echo -e "\n=== 消费组进度 ==="
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# 4. 检查磁盘
echo -e "\n=== 磁盘使用 ==="
df -h /data/kafka-logs
# 5. 检查未同步副本
echo -e "\n=== 未同步副本 ==="
kafka-topics.sh --bootstrap-server localhost:9092 --describe | grep -c "Isr:"
7.2 备份策略
#!/bin/bash
# 元数据备份脚本
BACKUP_DIR="/backup/kafka"
DATE=$(date +%Y%m%d)
# 备份 Topic 配置
kafka-topics.sh --bootstrap-server localhost:9092 --describe > \
$BACKUP_DIR/topics_$DATE.txt
# 备份消费组偏移量
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --all-groups > \
$BACKUP_DIR/consumer-groups_$DATE.txt
# 备份 ACL
kafka-acls.sh --bootstrap-server localhost:9092 --list > \
$BACKUP_DIR/acls_$DATE.txt
# 保留 30 天
find $BACKUP_DIR -name "*.txt" -mtime +30 -delete
八、经验总结
8.1 设计原则
- 消息要轻量:消息体尽量小于 1KB
- Key 要合理:用于分区和顺序保证
- 消费要幂等:防止重复消费
- 监控要完善:关键指标告警
- 配置要合理:根据场景选择
8.2 避坑指南
| 坑 | 表现 | 避免方法 |
|---|---|---|
| 消息重复 | 数据重复 | 实现幂等性 |
| 消息丢失 | 数据不一致 | acks=all + 幂等性 |
| 消息堆积 | 延迟高 | 监控 + 扩容 |
| 顺序错乱 | 业务异常 | 合理设置 Key |
| Rebalance 频繁 | 消费中断 | 调整超时配置 |
8.3 检查清单
上线前检查:
- Topic 和 Group 已创建
- 分区数合理
- 监控告警已配置
- 幂等性已实现
- 异常处理完善
- 日志记录完整
运维检查:
- 每日巡检
- 每周备份
- 每月容量评估
- 每季度演练
总结
Kafka 最佳实践的核心要点:
- 架构设计:合理部署、Topic 规划、消息设计
- 配置优化:Producer、Consumer、Broker 配置
- 消息可靠:发送可靠、消费可靠、消息追踪
- 故障处理:常见问题、应急处理
- 运维管理:日常巡检、备份策略
核心要点:
- 遵循命名规范和设计原则
- 实现消息幂等性和可靠性
- 建立完善的监控告警体系
- 定期巡检和备份
参考资料
- Kafka 最佳实践官方文档
- Kafka 运维指南
- 《Kafka 权威指南》第 11 章