Kafka Producer 是 Kafka 消息系统的入口,其发送机制直接影响了整个系统的吞吐量和延迟。本文将深入探讨 Kafka Producer 的发送流程、RecordAccumulator 缓冲区设计以及批量发送机制。
一、Producer 发送流程
1.1 整体流程
Kafka Producer 的发送流程可以分为以下几个步骤:
sequenceDiagram
participant App as 应用程序
participant P as Producer
participant RA as RecordAccumulator
participant S as Sender 线程
participant B as Broker
App->>P: send(record)
P->>RA: 消息添加到缓冲区
P-->>App: 返回 Future
S->>RA: 批量获取消息
S->>B: 发送请求
B-->>S: 响应
S->>RA: 更新发送结果
1.2 核心代码
// 1. 创建 Producer 实例
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 3);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 2. 发送消息
ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.println("发送成功:" + metadata.offset());
} else {
System.out.println("发送失败:" + exception.getMessage());
}
});
// 3. 关闭 Producer
producer.close();
二、RecordAccumulator 缓冲区
2.1 数据结构
RecordAccumulator 是 Producer 的核心组件,负责消息的缓冲和批量处理:
graph TB
subgraph RecordAccumulator
CH[ConcurrentMap<TopicPartition, Deque<RecordBatch>>]
B1[Batch 1]
B2[Batch 2]
B3[Batch 3]
end
CH --> B1
CH --> B2
CH --> B3
B1 --> R1[Record 1]
B1 --> R2[Record 2]
B1 --> R3[Record 3]
2.2 批量机制
| 参数 | 默认值 | 说明 |
|---|---|---|
batch.size | 16384 (16KB) | 每个批次的大小 |
linger.ms | 0 | 等待批次的延迟时间 |
buffer.memory | 33554432 (32MB) | 总缓冲区大小 |
// 批量发送配置示例
props.put("batch.size", 32768); // 32KB
props.put("linger.ms", 5); // 等待 5ms
2.3 批次创建流程
graph TD
A[消息到达] --> B{当前 Batch 是否可用?}
B -->|是 | C[追加到 Batch]
B -->|否 | D[创建新 Batch]
D --> C
C --> E{Batch 满或 linger 时间到?}
E -->|是 | F[标记为可发送]
E -->|否 | G[继续等待]
三、分区器与序列化
3.1 分区策略
Kafka Producer 默认使用轮询分区策略:
public class DefaultPartitioner implements Partitioner {
private int counter = 0;
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
// 无 key,使用轮询
return counter++ % numPartitions;
} else {
// 有 key,使用 hash
return Utils.toPositive(Utils murmur2(keyBytes)) % numPartitions;
}
}
}
3.2 自定义分区器
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
// 根据用户 ID 分区
String userId = extractUserId(value);
return Math.abs(userId.hashCode()) % cluster.partitionsForTopic(topic).size();
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
四、可靠性保证
4.1 acks 配置
| acks 值 | 说明 | 吞吐量 | 可靠性 |
|---|---|---|---|
0 | 不等待确认 | 最高 | 最低 |
1 | Leader 确认即可 | 中等 | 中等 |
all | 所有 ISR 确认 | 最低 | 最高 |
// 高可靠性配置
props.put("acks", "all");
props.put("min.insync.replicas", 2);
props.put("retries", Integer.MAX_VALUE);
props.put("max.in.flight.requests.per.connection", 5);
props.put("enable.idempotence", true); // 幂等性
4.2 重试机制
graph TD
A[发送失败] --> B{可重试错误?}
B -->|是 | C[等待 backoff]
C --> D[重试发送]
D --> E{成功?}
E -->|否 | F{达到最大重试?}
F -->|否 | C
F -->|是 | G[返回错误]
B -->|否 | G
E -->|是 | H[发送成功]
4.3 幂等性 Producer
// 开启幂等性
props.put("enable.idempotence", true);
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);
props.put("max.in.flight.requests.per.connection", 5);
五、性能优化
5.1 批量优化
// 批量发送配置
props.put("batch.size", 65536); // 64KB
props.put("linger.ms", 10); // 等待 10ms
props.put("compression.type", "lz4"); // 压缩
5.2 异步发送
// ✅ 推荐:异步发送
producer.send(record, (metadata, exception) -> {
// 回调处理
});
// ❌ 不推荐:同步发送
SendResult result = producer.send(record).get();
5.3 连接池优化
// 网络配置
props.put("max.block.ms", 60000);
props.put("request.timeout.ms", 30000);
props.put("delivery.timeout.ms", 120000);
六、常见问题排查
6.1 发送超时
问题:TimeoutException
原因:
- Broker 不可用
- 网络问题
- 缓冲区满
解决:
props.put("request.timeout.ms", 60000);
props.put("delivery.timeout.ms", 180000);
props.put("buffer.memory", 67108864); // 64MB
6.2 缓冲区满
问题:BufferExhaustedException
原因:
- 发送速度过快
- Broker 处理慢
- 缓冲区太小
解决:
props.put("buffer.memory", 134217728); // 128MB
props.put("max.block.ms", 10000); // 阻塞等待
6.3 消息丢失
问题:消息未到达 Broker
原因:
- acks=0 或 1
- 重试次数不足
- 未开启幂等性
解决:
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);
props.put("enable.idempotence", true);
props.put("min.insync.replicas", 2);
七、最佳实践
7.1 配置建议
| 场景 | acks | retries | 幂等性 | batch.size |
|---|---|---|---|---|
| 日志收集 | 0 | 0 | 关闭 | 32KB |
| 一般业务 | 1 | 3 | 关闭 | 16KB |
| 金融交易 | all | MAX | 开启 | 8KB |
7.2 监控指标
record-send-rate:每秒发送记录数records-per-request:每个请求的平均记录数record-error-rate:错误率request-latency-avg:平均延迟
7.3 代码模板
public class KafkaProducerService {
private KafkaProducer<String, String> producer;
@PostConstruct
public void init() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 3);
props.put("enable.idempotence", true);
producer = new KafkaProducer<>(props);
}
public void send(String topic, String key, String value) {
ProducerRecord<String, String> record =
new ProducerRecord<>(topic, key, value);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
log.error("发送失败", exception);
}
});
}
@PreDestroy
public void close() {
if (producer != null) {
producer.flush();
producer.close();
}
}
}
总结
Kafka Producer 的发送机制是其高性能的关键:
- RecordAccumulator:批量缓冲,减少网络请求
- 异步发送:非阻塞,提高吞吐量
- 分区策略:合理分区,均衡负载
- 可靠性配置:acks、重试、幂等性保证
- 性能优化:批量大小、linger 时间、压缩
核心要点:
- 理解批量发送机制,合理配置 batch.size 和 linger.ms
- 根据业务需求选择合适的 acks 级别
- 开启幂等性保证 exactly-once 语义
- 监控关键指标,及时发现性能问题
参考资料
- Kafka Producer 官方文档
- Kafka Producer 源码
- 《Kafka 权威指南》第 4 章