Skip to content
清晨的一缕阳光
返回

Spring Boot Kafka 事件驱动集成

前言

Kafka 是 LinkedIn 开源的分布式流处理平台,广泛用于日志收集、实时数据处理等场景。Spring Boot 通过 spring-kafka 可以方便地集成 Kafka。

快速开始

1. 添加依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2. 基础配置

spring:
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: all
      retries: 3
      properties:
        enable.idempotence: true
    consumer:
      group-id: demo-consumer-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: earliest
      enable-auto-commit: false
      properties:
        isolation.level: read_committed
    listener:
      ack-mode: manual
      concurrency: 3

3. 发送消息

@Service
@RequiredArgsConstructor
public class KafkaMessageService {
    
    private final KafkaTemplate<String, String> kafkaTemplate;
    
    /**
     * 发送简单消息
     */
    public void sendSimpleMessage(String message) {
        kafkaTemplate.send("demo-topic", message);
    }
    
    /**
     * 发送带 key 的消息
     */
    public void sendWithKey(String key, String message) {
        kafkaTemplate.send("demo-topic", key, message);
    }
    
    /**
     * 发送到指定分区
     */
    public void sendToPartition(int partition, String message) {
        kafkaTemplate.send("demo-topic", partition, null, message);
    }
    
    /**
     * 发送并获取结果
     */
    public void sendWithResult(String message) {
        SendResult<String, String> result = 
            kafkaTemplate.send("demo-topic", message).get();
        
        log.info("发送结果:{}", result.getRecordMetadata());
    }
    
    /**
     * 异步发送
     */
    public void sendAsync(String message) {
        kafkaTemplate.send("demo-topic", message)
            .whenComplete((result, ex) -> {
                if (ex == null) {
                    log.info("发送成功:{}", result.getRecordMetadata());
                } else {
                    log.error("发送失败", ex);
                }
            });
    }
    
    /**
     * 发送对象
     */
    public void sendObject(UserDTO user) {
        kafkaTemplate.send("user-topic", user);
    }
}

4. 消费消息

@Component
public class KafkaConsumer {
    
    /**
     * 基础消费
     */
    @KafkaListener(topics = "demo-topic")
    public void consume(String message) {
        log.info("收到消息:{}", message);
        
        // 业务处理
        processMessage(message);
    }
    
    /**
     * 消费带 key 的消息
     */
    @KafkaListener(topics = "demo-topic")
    public void consumeWithKey(
        @Payload String message,
        @Header(KafkaHeaders.RECEIVED_KEY) String key
    ) {
        log.info("收到消息:key={}, value={}", key, message);
    }
    
    /**
     * 消费对象
     */
    @KafkaListener(topics = "user-topic")
    public void consumeUser(UserDTO user) {
        log.info("收到用户消息:{}", user);
        
        userService.process(user);
    }
    
    /**
     * 批量消费
     */
    @KafkaListener(topics = "batch-topic")
    public void consumeBatch(List<String> messages) {
        log.info("批量收到 {} 条消息", messages.size());
        
        for (String message : messages) {
            processMessage(message);
        }
    }
    
    private void processMessage(String message) {
        // 业务逻辑
    }
}

5. 手动提交 Offset

@Component
public class ManualCommitConsumer {
    
    @KafkaListener(topics = "manual-commit-topic")
    public void consume(
        String message,
        Acknowledgment acknowledgment
    ) {
        try {
            // 业务处理
            processMessage(message);
            
            // 手动提交
            acknowledgment.acknowledge();
            
            log.info("消息处理成功并提交 offset");
        } catch (Exception e) {
            log.error("消息处理失败", e);
            // 不提交 offset,会重试
            throw e;
        }
    }
}

高级特性

1. 消息过滤

@Component
public class FilterConsumer {
    
    @KafkaListener(
        topics = "filter-topic",
        property = "spring.json.value.default.type=com.example.demo.dto.UserDTO"
    )
    public void consume(UserDTO user) {
        // 只消费 UserDTO 类型的消息
        log.info("收到用户消息:{}", user);
    }
}
@Configuration
public class KafkaConfig {
    
    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "filter-consumer-group");
        
        return new DefaultKafkaConsumerFactory<>(
            props,
            new StringDeserializer(),
            new JsonDeserializer<>(Object.class, false)
        );
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        
        // 设置消息过滤器
        factory.setRecordFilterStrategy(record -> {
            String messageType = record.headers()
                .lastHeader("message-type")
                .value().toString();
            
            // 只消费特定类型的消息
            return !"important".equals(messageType);
        });
        
        return factory;
    }
}

2. 错误处理

@Configuration
public class KafkaConfig {
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        
        // 配置错误处理器
        factory.setCommonErrorHandler(new DefaultErrorHandler(
            (record, exception) -> {
                // 错误处理逻辑
                log.error("消费失败:{}", record.value(), exception);
                
                // 发送到死信队列
                sendToDLQ(record, exception);
            },
            new FixedBackOff(1000L, 3L) // 重试 3 次,间隔 1 秒
        ));
        
        return factory;
    }
    
    private void sendToDLQ(ConsumerRecord<?, ?> record, Exception ex) {
        // 发送到死信队列
        kafkaTemplate.send("dlq-topic", record.value());
    }
}

3. Kafka Streams

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
</dependency>
@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {
    
    @Bean
    public KStream<String, Order> orderStream(StreamsBuilder streamsBuilder) {
        KStream<String, Order> orderStream = streamsBuilder.stream("order-topic");
        
        // 过滤有效订单
        KStream<String, Order> validOrders = orderStream
            .filter((key, order) -> order.getAmount() > 0);
        
        // 按用户分组
        KGroupedStream<String, Order> groupedByUser = validOrders
            .groupBy((key, order) -> String.valueOf(order.getUserId()));
        
        // 聚合统计
        KTable<String, Long> userOrderCount = groupedByUser
            .count(Materialized.as("user-order-count-store"));
        
        // 输出结果
        userOrderCount.toStream()
            .peek((userId, count) -> log.info("用户 {} 订单数:{}", userId, count))
            .to("user-order-count-topic");
        
        return validOrders;
    }
    
    @Bean
    public KStream<String, String> processStream(StreamsBuilder streamsBuilder) {
        KStream<String, String> stream = streamsBuilder.stream("input-topic");
        
        return stream
            .mapValues(value -> value.toUpperCase())
            .filter((key, value) -> !value.isEmpty())
            .to("output-topic");
    }
}

4. 事务支持

spring:
  kafka:
    producer:
      transaction-id-prefix: tx-
    consumer:
      isolation-level: read_committed
@Service
@RequiredArgsConstructor
public class TransactionService {
    
    private final KafkaTemplate<String, String> kafkaTemplate;
    
    @Transactional
    public void sendTransactionalMessages() {
        // 数据库操作
        Order order = createOrder();
        
        // Kafka 消息(与数据库操作在同一事务中)
        kafkaTemplate.send("order-topic", order.getId().toString(), order);
        
        // 如果数据库操作回滚,Kafka 消息也会回滚
    }
}

5. 分区管理

@Configuration
public class KafkaConfig {
    
    @Bean
    public NewTopic demoTopic() {
        return TopicBuilder.name("demo-topic")
            .partitions(6)
            .replicas(3)
            .configs(Map.of(
                "retention.ms", "604800000", // 7 天
                "segment.bytes", "1073741824" // 1GB
            ))
            .build();
    }
    
    @Bean
    public NewTopic userTopic() {
        return TopicBuilder.name("user-topic")
            .partitions(3)
            .replicas(3)
            .build();
    }
}

最佳实践

1. 消息序列化

@Configuration
public class KafkaConfig {
    
    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        
        return new DefaultKafkaProducerFactory<>(props);
    }
    
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

2. 消费幂等性

@Component
public class IdempotentConsumer {
    
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    
    @KafkaListener(topics = "idempotent-topic")
    public void consume(
        String message,
        @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
        @Header(KafkaHeaders.OFFSET) long offset
    ) {
        String key = String.format("kafka:processed:%d:%d", partition, offset);
        
        Boolean isNew = redisTemplate.opsForValue()
            .setIfAbsent(key, "processed", 24, TimeUnit.HOURS);
        
        if (Boolean.FALSE.equals(isNew)) {
            log.warn("消息已处理,跳过");
            return;
        }
        
        processMessage(message);
    }
}

3. 监控指标

@Component
public class KafkaMetrics {
    
    private final MeterRegistry meterRegistry;
    
    public KafkaMetrics(MeterRegistry meterRegistry) {
        this.meterRegistry = meterRegistry;
        
        // 注册 Kafka 指标
        new KafkaMetrics().bindTo(meterRegistry);
    }
    
    public void recordSend(String topic, long bytes) {
        meterRegistry.counter("kafka.message.send",
            "topic", topic
        ).increment();
        
        meterRegistry.summary("kafka.message.bytes",
            "topic", topic
        ).record(bytes);
    }
}

4. 配置优化

spring:
  kafka:
    producer:
      # 批量发送
      batch-size: 16384
      buffer-memory: 33554432
      # 压缩
      properties:
        compression.type: lz4
      # 确认机制
      acks: all
      # 重试
      retries: 3
      retry-backoff-ms: 1000
    
    consumer:
      # 批量拉取
      max-poll-records: 500
      fetch-min-size: 1
      fetch-max-wait-ms: 500
      # Session 超时
      session-timeout-ms: 30000
      heartbeat-interval-ms: 10000

5. 死信队列

@Configuration
public class KafkaConfig {
    
    @Bean
    public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
        KafkaTemplate<String, Object> template
    ) {
        return new DeadLetterPublishingRecoverer(template,
            (record, exception) -> {
                // 死信队列命名:原 topic.DLT
                return new TopicPartition(
                    record.topic() + ".DLT",
                    record.partition()
                );
            }
        );
    }
    
    @Bean
    public DefaultErrorHandler errorHandler(
        DeadLetterPublishingRecoverer recoverer
    ) {
        return new DefaultErrorHandler(recoverer, new FixedBackOff(1000L, 3L));
    }
}
@Component
public class DLQConsumer {
    
    @KafkaListener(topics = "demo-topic.DLT")
    public void consumeDLQ(
        @Payload String message,
        @Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String exceptionMessage
    ) {
        log.error("死信消息:{}, 异常:{}", message, exceptionMessage);
        
        // 记录日志、发送告警
        alertService.sendAlert("死信消息:" + message);
    }
}

总结

Kafka 集成要点:

Kafka 是事件驱动架构的核心组件。


分享这篇文章到:

上一篇文章
Java 并发容器详解
下一篇文章
微服务日志规范