Kafka 性能调优是保障消息系统稳定运行的关键。本文将从 Producer、Consumer、Broker 三个维度,深入探讨 Kafka 性能调优的实战技巧。
一、性能指标
1.1 核心指标
| 指标 | 说明 | 目标值 |
|---|---|---|
| 吞吐量 | 每秒处理消息数 | > 10 万 TPS |
| 延迟 | 消息发送/消费延迟 | < 10ms |
| 可用性 | 系统可用时间比例 | > 99.99% |
| 消息堆积 | 未消费消息数量 | < 1000 |
1.2 监控指标
graph TB
subgraph Producer 指标
P1[record-send-rate]
P2[request-latency]
P3[compression-rate]
end
subgraph Broker 指标
B1[bytes-in-rate]
B2[bytes-out-rate]
B3[request-handler-rate]
end
subgraph Consumer 指标
C1[records-consumed-rate]
C2[records-lag-max]
C3[fetch-latency]
end
P1 --> B1
B2 --> C1
C2 --> P1
二、Producer 优化
2.1 批量发送优化
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// 批量配置
props.put("batch.size", 65536); // 64KB(默认 16KB)
props.put("linger.ms", 10); // 等待 10ms(默认 0)
props.put("buffer.memory", 67108864); // 64MB(默认 32MB)
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
优化效果:
- batch.size 从 16KB → 64KB:吞吐量提升 2-3 倍
- linger.ms 从 0 → 10ms:批量率提升 5-10 倍
2.2 压缩优化
// 启用压缩
props.put("compression.type", "lz4"); // 或 gzip、snappy、zstd
// 压缩对比
| 压缩算法 | 压缩比 | CPU 开销 | 适用场景 |
|----------|--------|----------|----------|
| **none** | 1:1 | 最低 | 低延迟场景 |
| **gzip** | 1:5 | 中等 | 冷数据存储 |
| **snappy** | 1:3 | 低 | 一般场景 |
| **lz4** | 1:3 | 最低 | 推荐默认 |
| **zstd** | 1:5 | 低 | 高压缩需求 |
2.3 分区优化
// 增加分区数
// 分区数 = 最大消费者并发度
int partitionCount = maxConsumerThreads;
// 合理设置分区数
// 分区数过多会导致:
// - Broker 内存占用增加
// - 文件句柄消耗增加
// - Leader 选举时间变长
// 推荐:每个 Broker 10-100 个分区
int optimalPartitions = brokerCount * 50;
2.4 发送优化
// 异步发送(推荐)
producer.send(record, (metadata, exception) -> {
if (exception == null) {
successCount.incrementAndGet();
} else {
failCount.incrementAndGet();
}
});
// 避免同步发送
// ❌ 不推荐
SendResult result = producer.send(record).get();
// 批量发送优化
List<ProducerRecord<String, String>> batch = new ArrayList<>();
for (int i = 0; i < 100; i++) {
batch.add(new ProducerRecord<>("topic", "key", "value"));
}
// 使用 Callback 聚合
for (ProducerRecord<String, String> record : batch) {
producer.send(record, callback);
}
三、Consumer 优化
3.1 拉取配置优化
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer-group");
// 拉取配置
props.put("fetch.min.bytes", 1024); // 最小 1KB(默认 1)
props.put("fetch.max.wait.ms", 100); // 最多等待 100ms(默认 500)
props.put("fetch.max.bytes", 52428800); // 最大 50MB(默认 50MB)
props.put("max.poll.records", 500); // 每次最多 500 条(默认 500)
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
3.2 并发消费优化
// 方案一:多线程消费
ExecutorService executor = Executors.newFixedThreadPool(10);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
executor.submit(() -> process(record));
}
}
// 方案二:多消费者实例
// 每个消费者独立线程
List<KafkaConsumer<String, String>> consumers = new ArrayList<>();
for (int i = 0; i < 10; i++) {
consumers.add(createConsumer());
}
// 方案三:使用 Kafka Streams
KStream<String, String> stream = builder.stream("topic");
stream.foreach((key, value) -> process(value));
3.3 位移提交优化
// 异步提交(推荐)
props.put("enable.auto.commit", "false");
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 处理消息
for (ConsumerRecord<String, String> record : records) {
process(record);
}
// 异步提交
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
log.error("提交失败", exception);
}
});
}
// 批量提交
int count = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
process(record);
count++;
// 每 100 条提交一次
if (count % 100 == 0) {
consumer.commitSync();
}
}
}
四、Broker 优化
4.1 JVM 优化
# Kafka 启动脚本配置
export KAFKA_HEAP_OPTS="-Xmx6G -Xms6G" # 堆内存
export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC"
# G1 GC 配置
-XX:+UseG1GC
-XX:MaxGCPauseMillis=20
-XX:InitiatingHeapOccupancyPercent=35
-XX:+ExplicitGCInvokesConcurrent
-XX:MaxGCPauseMills=20
4.2 IO 优化
# server.properties 配置
# 日志目录(多磁盘挂载)
log.dirs=/mnt/disk1/kafka-logs,/mnt/disk2/kafka-logs,/mnt/disk3/kafka-logs
# 刷盘配置(异步刷盘)
log.flush.interval.messages=10000
log.flush.interval.ms=1000
# 网络配置
num.network.threads=8 # 网络线程数
num.io.threads=16 # IO 线程数
num.replica.fetchers=4 # 副本拉取线程
# Socket 配置
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
4.3 日志配置优化
# Segment 配置
log.segment.bytes=1073741824 # 1GB
log.segment.delete.delay.ms=60000 # 60 秒
# 保留策略
log.retention.hours=168 # 7 天
log.retention.bytes=-1 # 不限制
# 清理配置
log.cleanup.policy=delete
log.cleaner.enable=true
log.cleaner.threads=2
log.cleaner.io.max.bytes.per.second=104857600
4.4 副本优化
# 副本配置
default.replication.factor=3
min.insync.replicas=2
unclean.leader.election.enable=false
# 副本同步
replica.lag.time.max.ms=30000
replica.fetch.max.bytes=10485760
replica.fetch.wait.max.ms=500
num.replica.fetchers=2
五、性能测试
5.1 基准测试工具
# Producer 性能测试
kafka-producer-perf-test.sh \
--topic test-topic \
--num-records 1000000 \
--record-size 1024 \
--throughput 10000 \
--producer-props bootstrap.servers=localhost:9092 \
batch.size=65536 \
linger.ms=10 \
compression.type=lz4
# Consumer 性能测试
kafka-consumer-perf-test.sh \
--topic test-topic \
--messages 1000000 \
--bootstrap-server localhost:9092 \
--group test-group \
--fetch-size 65536
5.2 测试结果分析
# Producer 测试结果
Records sent: 1000000
Bytes sent: 1048576000
Throughput: 95000 records/sec
99.53 MB/sec
Avg latency: 5.2 ms
Max latency: 45.8 ms
# Consumer 测试结果
Records consumed: 1000000
Bytes consumed: 1048576000
Throughput: 88000 records/sec
92.17 MB/sec
Avg latency: 3.1 ms
Max latency: 28.5 ms
六、调优实战
6.1 高吞吐场景
// Producer 配置(高吞吐)
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "1"); // 降低可靠性要求
props.put("batch.size", 131072); // 128KB
props.put("linger.ms", 20); // 等待 20ms
props.put("compression.type", "lz4"); // 压缩
props.put("buffer.memory", 134217728); // 128MB
// Consumer 配置(高吞吐)
props.put("fetch.min.bytes", 65536); // 64KB
props.put("fetch.max.wait.ms", 200); // 200ms
props.put("max.poll.records", 1000); // 1000 条
6.2 低延迟场景
// Producer 配置(低延迟)
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all"); // 保证可靠性
props.put("batch.size", 16384); // 16KB(默认)
props.put("linger.ms", 0); // 不等待
props.put("compression.type", "none"); // 不压缩
props.put("max.in.flight.requests.per.connection", 1);
// Consumer 配置(低延迟)
props.put("fetch.min.bytes", 1); // 有消息就返回
props.put("fetch.max.wait.ms", 100); // 100ms
props.put("max.poll.records", 100); // 少量多次
6.3 大消息场景
// Broker 配置
message.max.bytes=10485760 # 10MB
replica.fetch.max.bytes=10485760 # 10MB
max.message.bytes=10485760 # 10MB
// Producer 配置
props.put("max.request.size", 10485760); # 10MB
props.put("buffer.memory", 209715200); # 200MB
// Consumer 配置
props.put("fetch.max.bytes", 10485760); # 10MB
props.put("max.poll.records", 50); # 减少每次拉取数量
七、常见问题排查
7.1 吞吐量下降
原因:
- 批量配置不当
- 压缩开销大
- 网络带宽不足
解决:
// 调整批量配置
props.put("batch.size", 65536);
props.put("linger.ms", 10);
// 更换压缩算法
props.put("compression.type", "snappy"); // 或 lz4
// 增加网络线程
num.network.threads=8
7.2 延迟过高
原因:
- acks=all 等待时间长
- 批量等待时间过长
- GC 停顿
解决:
// 降低可靠性要求
props.put("acks", "1");
// 减少等待时间
props.put("linger.ms", 5);
// 优化 GC
-XX:+UseG1GC
-XX:MaxGCPauseMillis=20
7.3 消息堆积
原因:
- 消费者处理慢
- 分区数不足
- 消费者故障
解决:
// 增加消费者数量
// 增加消费线程
executor = Executors.newFixedThreadPool(20);
// 增加分区数
kafka-topics.sh --alter --topic my-topic --partitions 32
// 优化处理逻辑
// 异步处理、批量处理
八、监控告警
8.1 Prometheus 监控
# prometheus.yml
scrape_configs:
- job_name: 'kafka'
static_configs:
- targets: ['kafka-1:9090', 'kafka-2:9090']
- job_name: 'kafka-consumer'
static_configs:
- targets: ['consumer-1:9090']
# alerting_rules.yml
groups:
- name: kafka
rules:
- alert: KafkaConsumerLag
expr: kafka_consumer_group_lag > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "Consumer Lag 过高"
- alert: KafkaUnderReplicatedPartitions
expr: kafka_server_ReplicaManager_UnderReplicatedPartitions > 0
for: 5m
labels:
severity: critical
annotations:
summary: "存在未同步副本"
8.2 关键指标
| 指标名称 | 告警阈值 | 说明 |
|---|---|---|
kafka_consumer_group_lag | > 10000 | 消费滞后 |
kafka_producer_request_latency | > 100ms | 发送延迟 |
kafka_server_BrokerTopicMetrics_FailedFetchRate | > 1% | 拉取失败率 |
kafka_network_RequestMetrics_RequestQueueTimeMs | > 500ms | 请求排队时间 |
总结
Kafka 性能调优的核心要点:
- Producer 优化:批量发送、压缩、分区策略
- Consumer 优化:拉取配置、并发消费、位移提交
- Broker 优化:JVM、IO 配置、日志管理
- 性能测试:基准测试、结果分析
- 监控告警:关键指标、告警规则
核心要点:
- 根据场景选择配置(高吞吐 vs 低延迟)
- 批量配置是性能关键
- 合理设置分区数和副本数
- 持续监控和调优
参考资料
- Kafka Performance Tuning 官方文档
- Kafka IP: 性能优化建议
- 《Kafka 权威指南》第 8 章