Skip to content
清晨的一缕阳光
返回

Kafka 精确一次语义(EOS)详解

Kafka 的精确一次语义(Exactly-Once Semantics,EOS)是消息队列的终极目标。本文将深入探讨 Kafka 如何通过幂等性 Producer 和事务机制实现 EOS。

一、消息语义对比

1.1 三种语义

graph LR
    subgraph At-Most-Once
        A1[最多一次] --> A2[可能丢失]
    end
    
    subgraph At-Least-Once
        B1[至少一次] --> B2[可能重复]
    end
    
    subgraph Exactly-Once
        C1[精确一次] --> C2[不重不漏]
    end
语义说明实现方式适用场景
At-Most-Once最多一次,可能丢失acks=0日志收集
At-Least-Once至少一次,可能重复acks=1/all + 重试一般业务
Exactly-Once精确一次,不重不漏幂等性 + 事务金融交易

1.2 重复和丢失的原因

消息丢失

消息重复

二、幂等性 Producer

2.1 实现原理

graph TB
    subgraph Producer
        P1[消息 1, Seq=0]
        P2[消息 2, Seq=1]
        P3[消息 3, Seq=2]
    end
    
    subgraph Broker
        B1[消息 1, Seq=0 ✓]
        B2[消息 2, Seq=1 ✓]
        B3[消息 3, Seq=2 ✓]
    end
    
    P1 -->|重试 | B1
    P2 --> B2
    P3 --> B3
    
    P1 -.->|重复,Seq=0 | BD[丢弃]

2.2 配置方式

// 开启幂等性
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("enable.idempotence", true);  // 关键配置
props.put("acks", "all");  // 必须为 all
props.put("retries", Integer.MAX_VALUE);  // 无限重试
props.put("max.in.flight.requests.per.connection", 5);  // 最多 5 个
props.put("retry.backoff.ms", 100);

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

2.3 配置说明

参数推荐值说明
enable.idempotencetrue开启幂等性
acksall所有 ISR 确认
retriesInteger.MAX_VALUE无限重试
max.in.flight.requests.per.connection5最多 5 个请求

2.4 使用示例

public class IdempotentProducerService {
    private KafkaProducer<String, String> producer;
    
    @PostConstruct
    public void init() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("enable.idempotence", true);
        props.put("acks", "all");
        props.put("retries", Integer.MAX_VALUE);
        props.put("max.in.flight.requests.per.connection", 5);
        props.put("key.serializer", StringSerializer.class);
        props.put("value.serializer", StringSerializer.class);
        
        producer = new KafkaProducer<>(props);
    }
    
    public void send(String topic, String key, String value) {
        ProducerRecord<String, String> record = 
            new ProducerRecord<>(topic, key, value);
        
        producer.send(record, (metadata, exception) -> {
            if (exception == null) {
                log.info("发送成功:offset={}", metadata.offset());
            } else {
                log.error("发送失败", exception);
            }
        });
    }
    
    @PreDestroy
    public void close() {
        if (producer != null) {
            producer.flush();
            producer.close();
        }
    }
}

2.5 限制

限制说明
单分区有序只能保证单个分区的顺序
5 个并发最多 5 个未确认请求
单 SessionProducer 重启后 PID 变化

三、事务机制

3.1 事务架构

sequenceDiagram
    participant P as Producer
    participant C as Coordinator
    participant B1 as Broker 1
    participant B2 as Broker 2
    
    P->>C: 1. 初始化事务
    C-->>P: 返回 ProducerId
    
    P->>B1: 2a. 发送消息(预提交)
    P->>B2: 2b. 发送消息(预提交)
    
    P->>C: 3. 提交事务
    C->>B1: 4a. 标记可提交
    C->>B2: 4b. 标记可提交
    
    B1->>B1: 5a. 消息可见
    B2->>B2: 5b. 消息可见

3.2 配置方式

// 事务 Producer 配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("enable.idempotence", true);  // 自动开启
props.put("transactional.id", "my-tx-id");  // 事务 ID
props.put("acks", "all");
props.put("retries", Integer.MAX_VALUE);
props.put("max.in.flight.requests.per.connection", 5);

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

3.3 使用示例

public class TransactionProducerService {
    private KafkaProducer<String, String> producer;
    
    @PostConstruct
    public void init() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("transactional.id", "order-tx-id");
        props.put("key.serializer", StringSerializer.class);
        props.put("value.serializer", StringSerializer.class);
        
        producer = new KafkaProducer<>(props);
        // 初始化事务
        producer.initTransactions();
    }
    
    /**
     * 发送单分区事务消息
     */
    public void sendInTransaction(String topic, String key, String value) {
        try {
            producer.beginTransaction();
            
            ProducerRecord<String, String> record = 
                new ProducerRecord<>(topic, key, value);
            
            producer.send(record);
            
            producer.commitTransaction();
            log.info("事务提交成功");
        } catch (ProducerFencedException e) {
            log.error("Producer 被隔离", e);
            producer.close();
        } catch (Exception e) {
            log.error("事务失败,回滚", e);
            try {
                producer.abortTransaction();
            } catch (Exception ex) {
                log.error("回滚失败", ex);
            }
        }
    }
    
    /**
     * 跨 Topic 事务
     */
    public void sendToMultipleTopics(Map<String, String> messages) {
        try {
            producer.beginTransaction();
            
            for (Map.Entry<String, String> entry : messages.entrySet()) {
                ProducerRecord<String, String> record = 
                    new ProducerRecord<>("topic-" + entry.getKey(), entry.getValue());
                producer.send(record);
            }
            
            producer.commitTransaction();
        } catch (Exception e) {
            producer.abortTransaction();
            throw e;
        }
    }
    
    @PreDestroy
    public void close() {
        if (producer != null) {
            producer.close();
        }
    }
}

四、事务 Consumer

4.1 配置方式

// 事务 Consumer 配置
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "transaction-consumer-group");
props.put("isolation.level", "read_committed");  // 只读已提交
props.put("key.deserializer", StringDeserializer.class);
props.put("value.deserializer", StringDeserializer.class);

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

4.2 隔离级别

isolation.level说明
read_uncommitted读取所有消息(包括事务中)
read_committed只读取已提交的消息

4.3 使用示例

public class TransactionConsumerService {
    private KafkaConsumer<String, String> consumer;
    
    @PostConstruct
    public void init() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "transaction-consumer-group");
        props.put("isolation.level", "read_committed");
        props.put("key.deserializer", StringDeserializer.class);
        props.put("value.deserializer", StringDeserializer.class);
        
        consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("topic-1", "topic-2"));
    }
    
    public void consume() {
        while (true) {
            ConsumerRecords<String, String> records = 
                consumer.poll(Duration.ofMillis(100));
            
            for (ConsumerRecord<String, String> record : records) {
                process(record);
            }
            
            consumer.commitSync();
        }
    }
    
    @PreDestroy
    public void close() {
        if (consumer != null) {
            consumer.close();
        }
    }
}

五、端到端精确一次

5.1 Kafka Streams

// Kafka Streams EOS 配置
Properties props = new Properties();
props.put("application.id", "stream-app");
props.put("bootstrap.servers", "localhost:9092");
props.put("processing.guarantee", "exactly_once_v2");  // 关键配置

StreamsBuilder builder = new StreamsBuilder();
builder.stream("input-topic")
    .mapValues(value -> process(value))
    .to("output-topic");

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

5.2 Kafka Connect

{
  "name": "my-connector",
  "config": {
    "connector.class": "JdbcSourceConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:mysql://localhost:3306/db",
    "topic.prefix": "mysql-",
    "exactly.once.source.support": "enabled",
    "transactional.id": "jdbc-connector-tx"
  }
}

5.3 端到端架构

graph TB
    subgraph Source
        DB[(数据库)]
    end
    
    subgraph Kafka
        KC[Kafka Connect<br/>EOS]
        KS[Kafka Streams<br/>EOS]
    end
    
    subgraph Sink
        ES[Elasticsearch]
        S3[S3]
    end
    
    DB -->|事务 | KC
    KC -->|事务 | KS
    KS -->|事务 | ES
    KS -->|事务 | S3

六、性能对比

6.1 吞吐量对比

配置吞吐量延迟可靠性
acks=1100%
acks=all80%
幂等性75%
事务50%最高

6.2 性能测试

// 性能测试代码
public class PerformanceTest {
    public static void main(String[] args) {
        // 测试不同配置的吞吐量
        testThroughput("acks=1", createProps("1"));
        testThroughput("acks=all", createProps("all"));
        testThroughput("idempotent", createIdempotentProps());
        testThroughput("transaction", createTransactionalProps());
    }
    
    private static void testThroughput(String name, Properties props) {
        long start = System.currentTimeMillis();
        int count = 100000;
        
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        
        for (int i = 0; i < count; i++) {
            producer.send(new ProducerRecord<>("test", "key", "value"));
        }
        
        producer.flush();
        producer.close();
        
        long duration = System.currentTimeMillis() - start;
        System.out.printf("%s: %d msg/s%n", name, count * 1000 / duration);
    }
}

七、常见问题排查

7.1 ProducerFencedException

原因

解决

try {
    producer.beginTransaction();
    // 发送消息
    producer.commitTransaction();
} catch (ProducerFencedException e) {
    log.error("Producer 被隔离", e);
    producer.close();  // 必须关闭
    // 重新创建 Producer
}

7.2 事务超时

原因

解决

# Broker 配置
transaction.max.timeout.ms=900000  # 15 分钟

# Producer 配置
request.timeout.ms=30000

7.3 性能下降

原因

解决

// 批量发送
List<ProducerRecord<String, String>> records = new ArrayList<>();
for (int i = 0; i < 100; i++) {
    records.add(new ProducerRecord<>("topic", "key", "value"));
}

producer.beginTransaction();
for (ProducerRecord<String, String> record : records) {
    producer.send(record);
}
producer.commitTransaction();

八、最佳实践

8.1 配置建议

场景幂等性事务说明
日志收集关闭关闭性能优先
一般业务开启关闭平衡性能和可靠性
金融交易开启开启可靠性优先
数据同步开启开启保证一致性

8.2 代码模板

public class ExactlyOnceProducer {
    private KafkaProducer<String, String> producer;
    
    @PostConstruct
    public void init() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("transactional.id", "my-tx-id");
        props.put("enable.idempotence", true);
        props.put("acks", "all");
        props.put("retries", Integer.MAX_VALUE);
        
        producer = new KafkaProducer<>(props);
        producer.initTransactions();
    }
    
    public void sendWithExactlyOnce(String topic, String key, String value) {
        try {
            producer.beginTransaction();
            producer.send(new ProducerRecord<>(topic, key, value));
            producer.commitTransaction();
        } catch (Exception e) {
            try {
                producer.abortTransaction();
            } catch (Exception ex) {
                log.error("回滚失败", ex);
            }
            throw e;
        }
    }
}

总结

Kafka 精确一次语义的核心机制:

  1. 幂等性 Producer:单分区精确一次,通过 PID+Seq 去重
  2. 事务机制:跨分区精确一次,支持原子写入
  3. 事务 Consumer:isolation.level=read_committed
  4. 端到端 EOS:Kafka Streams + Connect 支持

核心要点

参考资料


分享这篇文章到:

上一篇文章
Kafka 运维自动化与 DevOps 实践
下一篇文章
Kafka 集群迁移与升级实战指南