Kafka 的精确一次语义(Exactly-Once Semantics,EOS)是消息队列的终极目标。本文将深入探讨 Kafka 如何通过幂等性 Producer 和事务机制实现 EOS。
一、消息语义对比
1.1 三种语义
graph LR
subgraph At-Most-Once
A1[最多一次] --> A2[可能丢失]
end
subgraph At-Least-Once
B1[至少一次] --> B2[可能重复]
end
subgraph Exactly-Once
C1[精确一次] --> C2[不重不漏]
end
| 语义 | 说明 | 实现方式 | 适用场景 |
|---|---|---|---|
| At-Most-Once | 最多一次,可能丢失 | acks=0 | 日志收集 |
| At-Least-Once | 至少一次,可能重复 | acks=1/all + 重试 | 一般业务 |
| Exactly-Once | 精确一次,不重不漏 | 幂等性 + 事务 | 金融交易 |
1.2 重复和丢失的原因
消息丢失:
- acks=0 或 1
- Producer 发送失败未重试
- Broker 故障
消息重复:
- 重试机制
- 消费者重复消费
- Rebalance
二、幂等性 Producer
2.1 实现原理
graph TB
subgraph Producer
P1[消息 1, Seq=0]
P2[消息 2, Seq=1]
P3[消息 3, Seq=2]
end
subgraph Broker
B1[消息 1, Seq=0 ✓]
B2[消息 2, Seq=1 ✓]
B3[消息 3, Seq=2 ✓]
end
P1 -->|重试 | B1
P2 --> B2
P3 --> B3
P1 -.->|重复,Seq=0 | BD[丢弃]
2.2 配置方式
// 开启幂等性
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("enable.idempotence", true); // 关键配置
props.put("acks", "all"); // 必须为 all
props.put("retries", Integer.MAX_VALUE); // 无限重试
props.put("max.in.flight.requests.per.connection", 5); // 最多 5 个
props.put("retry.backoff.ms", 100);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
2.3 配置说明
| 参数 | 推荐值 | 说明 |
|---|---|---|
enable.idempotence | true | 开启幂等性 |
acks | all | 所有 ISR 确认 |
retries | Integer.MAX_VALUE | 无限重试 |
max.in.flight.requests.per.connection | 5 | 最多 5 个请求 |
2.4 使用示例
public class IdempotentProducerService {
private KafkaProducer<String, String> producer;
@PostConstruct
public void init() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("enable.idempotence", true);
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);
props.put("max.in.flight.requests.per.connection", 5);
props.put("key.serializer", StringSerializer.class);
props.put("value.serializer", StringSerializer.class);
producer = new KafkaProducer<>(props);
}
public void send(String topic, String key, String value) {
ProducerRecord<String, String> record =
new ProducerRecord<>(topic, key, value);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
log.info("发送成功:offset={}", metadata.offset());
} else {
log.error("发送失败", exception);
}
});
}
@PreDestroy
public void close() {
if (producer != null) {
producer.flush();
producer.close();
}
}
}
2.5 限制
| 限制 | 说明 |
|---|---|
| 单分区有序 | 只能保证单个分区的顺序 |
| 5 个并发 | 最多 5 个未确认请求 |
| 单 Session | Producer 重启后 PID 变化 |
三、事务机制
3.1 事务架构
sequenceDiagram
participant P as Producer
participant C as Coordinator
participant B1 as Broker 1
participant B2 as Broker 2
P->>C: 1. 初始化事务
C-->>P: 返回 ProducerId
P->>B1: 2a. 发送消息(预提交)
P->>B2: 2b. 发送消息(预提交)
P->>C: 3. 提交事务
C->>B1: 4a. 标记可提交
C->>B2: 4b. 标记可提交
B1->>B1: 5a. 消息可见
B2->>B2: 5b. 消息可见
3.2 配置方式
// 事务 Producer 配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("enable.idempotence", true); // 自动开启
props.put("transactional.id", "my-tx-id"); // 事务 ID
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);
props.put("max.in.flight.requests.per.connection", 5);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
3.3 使用示例
public class TransactionProducerService {
private KafkaProducer<String, String> producer;
@PostConstruct
public void init() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "order-tx-id");
props.put("key.serializer", StringSerializer.class);
props.put("value.serializer", StringSerializer.class);
producer = new KafkaProducer<>(props);
// 初始化事务
producer.initTransactions();
}
/**
* 发送单分区事务消息
*/
public void sendInTransaction(String topic, String key, String value) {
try {
producer.beginTransaction();
ProducerRecord<String, String> record =
new ProducerRecord<>(topic, key, value);
producer.send(record);
producer.commitTransaction();
log.info("事务提交成功");
} catch (ProducerFencedException e) {
log.error("Producer 被隔离", e);
producer.close();
} catch (Exception e) {
log.error("事务失败,回滚", e);
try {
producer.abortTransaction();
} catch (Exception ex) {
log.error("回滚失败", ex);
}
}
}
/**
* 跨 Topic 事务
*/
public void sendToMultipleTopics(Map<String, String> messages) {
try {
producer.beginTransaction();
for (Map.Entry<String, String> entry : messages.entrySet()) {
ProducerRecord<String, String> record =
new ProducerRecord<>("topic-" + entry.getKey(), entry.getValue());
producer.send(record);
}
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
throw e;
}
}
@PreDestroy
public void close() {
if (producer != null) {
producer.close();
}
}
}
四、事务 Consumer
4.1 配置方式
// 事务 Consumer 配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "transaction-consumer-group");
props.put("isolation.level", "read_committed"); // 只读已提交
props.put("key.deserializer", StringDeserializer.class);
props.put("value.deserializer", StringDeserializer.class);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
4.2 隔离级别
| isolation.level | 说明 |
|---|---|
read_uncommitted | 读取所有消息(包括事务中) |
read_committed | 只读取已提交的消息 |
4.3 使用示例
public class TransactionConsumerService {
private KafkaConsumer<String, String> consumer;
@PostConstruct
public void init() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "transaction-consumer-group");
props.put("isolation.level", "read_committed");
props.put("key.deserializer", StringDeserializer.class);
props.put("value.deserializer", StringDeserializer.class);
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic-1", "topic-2"));
}
public void consume() {
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
process(record);
}
consumer.commitSync();
}
}
@PreDestroy
public void close() {
if (consumer != null) {
consumer.close();
}
}
}
五、端到端精确一次
5.1 Kafka Streams
// Kafka Streams EOS 配置
Properties props = new Properties();
props.put("application.id", "stream-app");
props.put("bootstrap.servers", "localhost:9092");
props.put("processing.guarantee", "exactly_once_v2"); // 关键配置
StreamsBuilder builder = new StreamsBuilder();
builder.stream("input-topic")
.mapValues(value -> process(value))
.to("output-topic");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
5.2 Kafka Connect
{
"name": "my-connector",
"config": {
"connector.class": "JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://localhost:3306/db",
"topic.prefix": "mysql-",
"exactly.once.source.support": "enabled",
"transactional.id": "jdbc-connector-tx"
}
}
5.3 端到端架构
graph TB
subgraph Source
DB[(数据库)]
end
subgraph Kafka
KC[Kafka Connect<br/>EOS]
KS[Kafka Streams<br/>EOS]
end
subgraph Sink
ES[Elasticsearch]
S3[S3]
end
DB -->|事务 | KC
KC -->|事务 | KS
KS -->|事务 | ES
KS -->|事务 | S3
六、性能对比
6.1 吞吐量对比
| 配置 | 吞吐量 | 延迟 | 可靠性 |
|---|---|---|---|
| acks=1 | 100% | 低 | 中 |
| acks=all | 80% | 中 | 高 |
| 幂等性 | 75% | 中 | 高 |
| 事务 | 50% | 高 | 最高 |
6.2 性能测试
// 性能测试代码
public class PerformanceTest {
public static void main(String[] args) {
// 测试不同配置的吞吐量
testThroughput("acks=1", createProps("1"));
testThroughput("acks=all", createProps("all"));
testThroughput("idempotent", createIdempotentProps());
testThroughput("transaction", createTransactionalProps());
}
private static void testThroughput(String name, Properties props) {
long start = System.currentTimeMillis();
int count = 100000;
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < count; i++) {
producer.send(new ProducerRecord<>("test", "key", "value"));
}
producer.flush();
producer.close();
long duration = System.currentTimeMillis() - start;
System.out.printf("%s: %d msg/s%n", name, count * 1000 / duration);
}
}
七、常见问题排查
7.1 ProducerFencedException
原因:
- 事务 ID 重复
- Producer 超时
解决:
try {
producer.beginTransaction();
// 发送消息
producer.commitTransaction();
} catch (ProducerFencedException e) {
log.error("Producer 被隔离", e);
producer.close(); // 必须关闭
// 重新创建 Producer
}
7.2 事务超时
原因:
- 事务执行时间过长
- 网络问题
解决:
# Broker 配置
transaction.max.timeout.ms=900000 # 15 分钟
# Producer 配置
request.timeout.ms=30000
7.3 性能下降
原因:
- 事务开销大
- 批量过小
解决:
// 批量发送
List<ProducerRecord<String, String>> records = new ArrayList<>();
for (int i = 0; i < 100; i++) {
records.add(new ProducerRecord<>("topic", "key", "value"));
}
producer.beginTransaction();
for (ProducerRecord<String, String> record : records) {
producer.send(record);
}
producer.commitTransaction();
八、最佳实践
8.1 配置建议
| 场景 | 幂等性 | 事务 | 说明 |
|---|---|---|---|
| 日志收集 | 关闭 | 关闭 | 性能优先 |
| 一般业务 | 开启 | 关闭 | 平衡性能和可靠性 |
| 金融交易 | 开启 | 开启 | 可靠性优先 |
| 数据同步 | 开启 | 开启 | 保证一致性 |
8.2 代码模板
public class ExactlyOnceProducer {
private KafkaProducer<String, String> producer;
@PostConstruct
public void init() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-tx-id");
props.put("enable.idempotence", true);
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);
producer = new KafkaProducer<>(props);
producer.initTransactions();
}
public void sendWithExactlyOnce(String topic, String key, String value) {
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>(topic, key, value));
producer.commitTransaction();
} catch (Exception e) {
try {
producer.abortTransaction();
} catch (Exception ex) {
log.error("回滚失败", ex);
}
throw e;
}
}
}
总结
Kafka 精确一次语义的核心机制:
- 幂等性 Producer:单分区精确一次,通过 PID+Seq 去重
- 事务机制:跨分区精确一次,支持原子写入
- 事务 Consumer:isolation.level=read_committed
- 端到端 EOS:Kafka Streams + Connect 支持
核心要点:
- 理解幂等性和事务的区别
- 根据场景选择合适的配置
- 注意事务的性能开销
- 正确处理 ProducerFencedException
参考资料
- Kafka 幂等性和事务官方文档
- KIP-98: Exactly Once Delivery
- 《Kafka 权威指南》第 7 章