Skip to content
清晨的一缕阳光
返回

Kafka 性能调优实战指南

Kafka 性能调优是保障消息系统稳定运行的关键。本文将从 Producer、Consumer、Broker 三个维度,深入探讨 Kafka 性能调优的实战技巧。

一、性能指标

1.1 核心指标

指标说明目标值
吞吐量每秒处理消息数> 10 万 TPS
延迟消息发送/消费延迟< 10ms
可用性系统可用时间比例> 99.99%
消息堆积未消费消息数量< 1000

1.2 监控指标

graph TB
    subgraph Producer 指标
        P1[record-send-rate]
        P2[request-latency]
        P3[compression-rate]
    end
    
    subgraph Broker 指标
        B1[bytes-in-rate]
        B2[bytes-out-rate]
        B3[request-handler-rate]
    end
    
    subgraph Consumer 指标
        C1[records-consumed-rate]
        C2[records-lag-max]
        C3[fetch-latency]
    end
    
    P1 --> B1
    B2 --> C1
    C2 --> P1

二、Producer 优化

2.1 批量发送优化

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");

// 批量配置
props.put("batch.size", 65536);        // 64KB(默认 16KB)
props.put("linger.ms", 10);            // 等待 10ms(默认 0)
props.put("buffer.memory", 67108864);  // 64MB(默认 32MB)

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

优化效果

2.2 压缩优化

// 启用压缩
props.put("compression.type", "lz4");  // 或 gzip、snappy、zstd

// 压缩对比
| 压缩算法 | 压缩比 | CPU 开销 | 适用场景 |
|----------|--------|----------|----------|
| **none** | 1:1 | 最低 | 低延迟场景 |
| **gzip** | 1:5 | 中等 | 冷数据存储 |
| **snappy** | 1:3 || 一般场景 |
| **lz4** | 1:3 | 最低 | 推荐默认 |
| **zstd** | 1:5 || 高压缩需求 |

2.3 分区优化

// 增加分区数
// 分区数 = 最大消费者并发度
int partitionCount = maxConsumerThreads;

// 合理设置分区数
// 分区数过多会导致:
// - Broker 内存占用增加
// - 文件句柄消耗增加
// - Leader 选举时间变长

// 推荐:每个 Broker 10-100 个分区
int optimalPartitions = brokerCount * 50;

2.4 发送优化

// 异步发送(推荐)
producer.send(record, (metadata, exception) -> {
    if (exception == null) {
        successCount.incrementAndGet();
    } else {
        failCount.incrementAndGet();
    }
});

// 避免同步发送
// ❌ 不推荐
SendResult result = producer.send(record).get();

// 批量发送优化
List<ProducerRecord<String, String>> batch = new ArrayList<>();
for (int i = 0; i < 100; i++) {
    batch.add(new ProducerRecord<>("topic", "key", "value"));
}

// 使用 Callback 聚合
for (ProducerRecord<String, String> record : batch) {
    producer.send(record, callback);
}

三、Consumer 优化

3.1 拉取配置优化

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer-group");

// 拉取配置
props.put("fetch.min.bytes", 1024);        // 最小 1KB(默认 1)
props.put("fetch.max.wait.ms", 100);       // 最多等待 100ms(默认 500)
props.put("fetch.max.bytes", 52428800);    // 最大 50MB(默认 50MB)
props.put("max.poll.records", 500);        // 每次最多 500 条(默认 500)

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

3.2 并发消费优化

// 方案一:多线程消费
ExecutorService executor = Executors.newFixedThreadPool(10);

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    for (ConsumerRecord<String, String> record : records) {
        executor.submit(() -> process(record));
    }
}

// 方案二:多消费者实例
// 每个消费者独立线程
List<KafkaConsumer<String, String>> consumers = new ArrayList<>();
for (int i = 0; i < 10; i++) {
    consumers.add(createConsumer());
}

// 方案三:使用 Kafka Streams
KStream<String, String> stream = builder.stream("topic");
stream.foreach((key, value) -> process(value));

3.3 位移提交优化

// 异步提交(推荐)
props.put("enable.auto.commit", "false");

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    // 处理消息
    for (ConsumerRecord<String, String> record : records) {
        process(record);
    }
    
    // 异步提交
    consumer.commitAsync((offsets, exception) -> {
        if (exception != null) {
            log.error("提交失败", exception);
        }
    });
}

// 批量提交
int count = 0;
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    for (ConsumerRecord<String, String> record : records) {
        process(record);
        count++;
        
        // 每 100 条提交一次
        if (count % 100 == 0) {
            consumer.commitSync();
        }
    }
}

四、Broker 优化

4.1 JVM 优化

# Kafka 启动脚本配置
export KAFKA_HEAP_OPTS="-Xmx6G -Xms6G"  # 堆内存
export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC"

# G1 GC 配置
-XX:+UseG1GC
-XX:MaxGCPauseMillis=20
-XX:InitiatingHeapOccupancyPercent=35
-XX:+ExplicitGCInvokesConcurrent
-XX:MaxGCPauseMills=20

4.2 IO 优化

# server.properties 配置

# 日志目录(多磁盘挂载)
log.dirs=/mnt/disk1/kafka-logs,/mnt/disk2/kafka-logs,/mnt/disk3/kafka-logs

# 刷盘配置(异步刷盘)
log.flush.interval.messages=10000
log.flush.interval.ms=1000

# 网络配置
num.network.threads=8          # 网络线程数
num.io.threads=16              # IO 线程数
num.replica.fetchers=4         # 副本拉取线程

# Socket 配置
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

4.3 日志配置优化

# Segment 配置
log.segment.bytes=1073741824       # 1GB
log.segment.delete.delay.ms=60000  # 60 秒

# 保留策略
log.retention.hours=168            # 7 天
log.retention.bytes=-1             # 不限制

# 清理配置
log.cleanup.policy=delete
log.cleaner.enable=true
log.cleaner.threads=2
log.cleaner.io.max.bytes.per.second=104857600

4.4 副本优化

# 副本配置
default.replication.factor=3
min.insync.replicas=2
unclean.leader.election.enable=false

# 副本同步
replica.lag.time.max.ms=30000
replica.fetch.max.bytes=10485760
replica.fetch.wait.max.ms=500
num.replica.fetchers=2

五、性能测试

5.1 基准测试工具

# Producer 性能测试
kafka-producer-perf-test.sh \
  --topic test-topic \
  --num-records 1000000 \
  --record-size 1024 \
  --throughput 10000 \
  --producer-props bootstrap.servers=localhost:9092 \
    batch.size=65536 \
    linger.ms=10 \
    compression.type=lz4

# Consumer 性能测试
kafka-consumer-perf-test.sh \
  --topic test-topic \
  --messages 1000000 \
  --bootstrap-server localhost:9092 \
  --group test-group \
  --fetch-size 65536

5.2 测试结果分析

# Producer 测试结果
Records sent:       1000000
Bytes sent:         1048576000
Throughput:         95000 records/sec
                  99.53 MB/sec
Avg latency:        5.2 ms
Max latency:        45.8 ms

# Consumer 测试结果
Records consumed:   1000000
Bytes consumed:     1048576000
Throughput:         88000 records/sec
                  92.17 MB/sec
Avg latency:        3.1 ms
Max latency:        28.5 ms

六、调优实战

6.1 高吞吐场景

// Producer 配置(高吞吐)
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "1");                    // 降低可靠性要求
props.put("batch.size", 131072);           // 128KB
props.put("linger.ms", 20);                // 等待 20ms
props.put("compression.type", "lz4");      // 压缩
props.put("buffer.memory", 134217728);     // 128MB

// Consumer 配置(高吞吐)
props.put("fetch.min.bytes", 65536);       // 64KB
props.put("fetch.max.wait.ms", 200);       // 200ms
props.put("max.poll.records", 1000);       // 1000 条

6.2 低延迟场景

// Producer 配置(低延迟)
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");                  // 保证可靠性
props.put("batch.size", 16384);            // 16KB(默认)
props.put("linger.ms", 0);                 // 不等待
props.put("compression.type", "none");     // 不压缩
props.put("max.in.flight.requests.per.connection", 1);

// Consumer 配置(低延迟)
props.put("fetch.min.bytes", 1);           // 有消息就返回
props.put("fetch.max.wait.ms", 100);       // 100ms
props.put("max.poll.records", 100);        // 少量多次

6.3 大消息场景

// Broker 配置
message.max.bytes=10485760           # 10MB
replica.fetch.max.bytes=10485760     # 10MB
max.message.bytes=10485760           # 10MB

// Producer 配置
props.put("max.request.size", 10485760);     # 10MB
props.put("buffer.memory", 209715200);       # 200MB

// Consumer 配置
props.put("fetch.max.bytes", 10485760);      # 10MB
props.put("max.poll.records", 50);           # 减少每次拉取数量

七、常见问题排查

7.1 吞吐量下降

原因

解决

// 调整批量配置
props.put("batch.size", 65536);
props.put("linger.ms", 10);

// 更换压缩算法
props.put("compression.type", "snappy");  // 或 lz4

// 增加网络线程
num.network.threads=8

7.2 延迟过高

原因

解决

// 降低可靠性要求
props.put("acks", "1");

// 减少等待时间
props.put("linger.ms", 5);

// 优化 GC
-XX:+UseG1GC
-XX:MaxGCPauseMillis=20

7.3 消息堆积

原因

解决

// 增加消费者数量
// 增加消费线程
executor = Executors.newFixedThreadPool(20);

// 增加分区数
kafka-topics.sh --alter --topic my-topic --partitions 32

// 优化处理逻辑
// 异步处理、批量处理

八、监控告警

8.1 Prometheus 监控

# prometheus.yml
scrape_configs:
  - job_name: 'kafka'
    static_configs:
      - targets: ['kafka-1:9090', 'kafka-2:9090']
    
  - job_name: 'kafka-consumer'
    static_configs:
      - targets: ['consumer-1:9090']

# alerting_rules.yml
groups:
  - name: kafka
    rules:
      - alert: KafkaConsumerLag
        expr: kafka_consumer_group_lag > 10000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Consumer Lag 过高"
      
      - alert: KafkaUnderReplicatedPartitions
        expr: kafka_server_ReplicaManager_UnderReplicatedPartitions > 0
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "存在未同步副本"

8.2 关键指标

指标名称告警阈值说明
kafka_consumer_group_lag> 10000消费滞后
kafka_producer_request_latency> 100ms发送延迟
kafka_server_BrokerTopicMetrics_FailedFetchRate> 1%拉取失败率
kafka_network_RequestMetrics_RequestQueueTimeMs> 500ms请求排队时间

总结

Kafka 性能调优的核心要点:

  1. Producer 优化:批量发送、压缩、分区策略
  2. Consumer 优化:拉取配置、并发消费、位移提交
  3. Broker 优化:JVM、IO 配置、日志管理
  4. 性能测试:基准测试、结果分析
  5. 监控告警:关键指标、告警规则

核心要点

参考资料


分享这篇文章到:

上一篇文章
Kafka 运维监控与故障排查指南
下一篇文章
Kafka Connect 数据集成实战指南