Skip to content
清晨的一缕阳光
返回

Kafka 生产者发送机制详解

Kafka Producer 是 Kafka 消息系统的入口,其发送机制直接影响了整个系统的吞吐量和延迟。本文将深入探讨 Kafka Producer 的发送流程、RecordAccumulator 缓冲区设计以及批量发送机制。

一、Producer 发送流程

1.1 整体流程

Kafka Producer 的发送流程可以分为以下几个步骤:

sequenceDiagram
    participant App as 应用程序
    participant P as Producer
    participant RA as RecordAccumulator
    participant S as Sender 线程
    participant B as Broker
    
    App->>P: send(record)
    P->>RA: 消息添加到缓冲区
    P-->>App: 返回 Future
    S->>RA: 批量获取消息
    S->>B: 发送请求
    B-->>S: 响应
    S->>RA: 更新发送结果

1.2 核心代码

// 1. 创建 Producer 实例
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 3);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// 2. 发送消息
ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
producer.send(record, (metadata, exception) -> {
    if (exception == null) {
        System.out.println("发送成功:" + metadata.offset());
    } else {
        System.out.println("发送失败:" + exception.getMessage());
    }
});

// 3. 关闭 Producer
producer.close();

二、RecordAccumulator 缓冲区

2.1 数据结构

RecordAccumulator 是 Producer 的核心组件,负责消息的缓冲和批量处理:

graph TB
    subgraph RecordAccumulator
        CH[ConcurrentMap<TopicPartition, Deque<RecordBatch>>]
        B1[Batch 1]
        B2[Batch 2]
        B3[Batch 3]
    end
    
    CH --> B1
    CH --> B2
    CH --> B3
    
    B1 --> R1[Record 1]
    B1 --> R2[Record 2]
    B1 --> R3[Record 3]

2.2 批量机制

参数默认值说明
batch.size16384 (16KB)每个批次的大小
linger.ms0等待批次的延迟时间
buffer.memory33554432 (32MB)总缓冲区大小
// 批量发送配置示例
props.put("batch.size", 32768);  // 32KB
props.put("linger.ms", 5);       // 等待 5ms

2.3 批次创建流程

graph TD
    A[消息到达] --> B{当前 Batch 是否可用?}
    B -->|是 | C[追加到 Batch]
    B -->|否 | D[创建新 Batch]
    D --> C
    C --> E{Batch 满或 linger 时间到?}
    E -->|是 | F[标记为可发送]
    E -->|否 | G[继续等待]

三、分区器与序列化

3.1 分区策略

Kafka Producer 默认使用轮询分区策略:

public class DefaultPartitioner implements Partitioner {
    private int counter = 0;
    
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                        Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        
        if (keyBytes == null) {
            // 无 key,使用轮询
            return counter++ % numPartitions;
        } else {
            // 有 key,使用 hash
            return Utils.toPositive(Utils murmur2(keyBytes)) % numPartitions;
        }
    }
}

3.2 自定义分区器

public class CustomPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                        Object value, byte[] valueBytes, Cluster cluster) {
        // 根据用户 ID 分区
        String userId = extractUserId(value);
        return Math.abs(userId.hashCode()) % cluster.partitionsForTopic(topic).size();
    }
    
    @Override
    public void close() {}
    
    @Override
    public void configure(Map<String, ?> configs) {}
}

四、可靠性保证

4.1 acks 配置

acks 值说明吞吐量可靠性
0不等待确认最高最低
1Leader 确认即可中等中等
all所有 ISR 确认最低最高
// 高可靠性配置
props.put("acks", "all");
props.put("min.insync.replicas", 2);
props.put("retries", Integer.MAX_VALUE);
props.put("max.in.flight.requests.per.connection", 5);
props.put("enable.idempotence", true);  // 幂等性

4.2 重试机制

graph TD
    A[发送失败] --> B{可重试错误?}
    B -->|是 | C[等待 backoff]
    C --> D[重试发送]
    D --> E{成功?}
    E -->|否 | F{达到最大重试?}
    F -->|否 | C
    F -->|是 | G[返回错误]
    B -->|否 | G
    E -->|是 | H[发送成功]

4.3 幂等性 Producer

// 开启幂等性
props.put("enable.idempotence", true);
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);
props.put("max.in.flight.requests.per.connection", 5);

五、性能优化

5.1 批量优化

// 批量发送配置
props.put("batch.size", 65536);     // 64KB
props.put("linger.ms", 10);         // 等待 10ms
props.put("compression.type", "lz4"); // 压缩

5.2 异步发送

// ✅ 推荐:异步发送
producer.send(record, (metadata, exception) -> {
    // 回调处理
});

// ❌ 不推荐:同步发送
SendResult result = producer.send(record).get();

5.3 连接池优化

// 网络配置
props.put("max.block.ms", 60000);
props.put("request.timeout.ms", 30000);
props.put("delivery.timeout.ms", 120000);

六、常见问题排查

6.1 发送超时

问题TimeoutException

原因

解决

props.put("request.timeout.ms", 60000);
props.put("delivery.timeout.ms", 180000);
props.put("buffer.memory", 67108864); // 64MB

6.2 缓冲区满

问题BufferExhaustedException

原因

解决

props.put("buffer.memory", 134217728); // 128MB
props.put("max.block.ms", 10000);      // 阻塞等待

6.3 消息丢失

问题:消息未到达 Broker

原因

解决

props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);
props.put("enable.idempotence", true);
props.put("min.insync.replicas", 2);

七、最佳实践

7.1 配置建议

场景acksretries幂等性batch.size
日志收集00关闭32KB
一般业务13关闭16KB
金融交易allMAX开启8KB

7.2 监控指标

7.3 代码模板

public class KafkaProducerService {
    private KafkaProducer<String, String> producer;
    
    @PostConstruct
    public void init() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 3);
        props.put("enable.idempotence", true);
        
        producer = new KafkaProducer<>(props);
    }
    
    public void send(String topic, String key, String value) {
        ProducerRecord<String, String> record = 
            new ProducerRecord<>(topic, key, value);
        
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                log.error("发送失败", exception);
            }
        });
    }
    
    @PreDestroy
    public void close() {
        if (producer != null) {
            producer.flush();
            producer.close();
        }
    }
}

总结

Kafka Producer 的发送机制是其高性能的关键:

  1. RecordAccumulator:批量缓冲,减少网络请求
  2. 异步发送:非阻塞,提高吞吐量
  3. 分区策略:合理分区,均衡负载
  4. 可靠性配置:acks、重试、幂等性保证
  5. 性能优化:批量大小、linger 时间、压缩

核心要点

参考资料


分享这篇文章到:

上一篇文章
RocketMQ 核心源码分析与解读
下一篇文章
RocketMQ 安全加固与权限管理实战