Skip to content
清晨的一缕阳光
返回

Spring Kafka 集成开发实战指南

Spring Kafka 提供了 Kafka 的 Spring 风格抽象,简化了 Kafka 应用的开发。本文将深入探讨 Spring Kafka 的配置、使用和最佳实践。

一、Spring Kafka 基础

1.1 依赖配置

Maven 依赖

<dependencies>
    <!-- Spring Kafka -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <version>3.0.0</version>
    </dependency>
    
    <!-- Kafka Clients -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.4.0</version>
    </dependency>
</dependencies>

1.2 基础配置

application.yml

spring:
  kafka:
    bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
    
    # Producer 配置
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: all
      retries: 3
      batch-size: 16384
      buffer-memory: 33554432
    
    # Consumer 配置
    consumer:
      group-id: my-consumer-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: earliest
      enable-auto-commit: false
      auto-commit-interval: 1000
    
    # 监听器配置
    listener:
      ack-mode: manual
      concurrency: 3

二、Producer 开发

2.1 基础 Producer

@Service
public class KafkaProducerService {
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    /**
     * 发送消息
     */
    public void sendMessage(String topic, String key, String value) {
        kafkaTemplate.send(topic, key, value);
    }
    
    /**
     * 发送消息带回调
     */
    public void sendMessageWithCallback(String topic, String key, String value) {
        ListenableFuture<SendResult<String, String>> future = 
            kafkaTemplate.send(topic, key, value);
        
        future.addCallback(
            result -> {
                log.info("发送成功:topic={}, partition={}, offset={}", 
                    result.getRecordMetadata().topic(),
                    result.getRecordMetadata().partition(),
                    result.getRecordMetadata().offset());
            },
            ex -> {
                log.error("发送失败:{}", ex.getMessage());
            }
        );
    }
    
    /**
     * 发送消息并等待结果
     */
    public SendResult<String, String> sendMessageSync(String topic, String key, String value) 
            throws ExecutionException, InterruptedException {
        return kafkaTemplate.send(topic, key, value).get();
    }
}

2.2 自定义 Producer

@Configuration
public class KafkaProducerConfig {
    
    @Bean
    public ProducerFactory<String, Order> orderProducerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        config.put(ProducerConfig.ACKS_CONFIG, "all");
        config.put(ProducerConfig.RETRIES_CONFIG, 3);
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        
        return new DefaultKafkaProducerFactory<>(config);
    }
    
    @Bean
    public KafkaTemplate<String, Order> orderKafkaTemplate() {
        return new KafkaTemplate<>(orderProducerFactory());
    }
}

2.3 消息转换器

@Configuration
public class KafkaConverterConfig {
    
    @Bean
    public MessageConverter jsonMessageConverter() {
        Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
        
        // 设置类型映射
        Map<String, Class<?>> typeMappings = new HashMap<>();
        typeMappings.put("order", Order.class);
        typeMappings.put("payment", Payment.class);
        
        converter.setTypePrecedence(MessageConverterTypePrecedence.TYPE_ID);
        converter.setTypeIdMappings(typeMappings);
        
        return converter;
    }
}

// 使用
@Service
public class OrderProducer {
    
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    
    public void sendOrder(Order order) {
        kafkaTemplate.send("order-topic", "order", order);
    }
}

三、Consumer 开发

3.1 基础 Consumer

@Component
public class KafkaConsumerService {
    
    @KafkaListener(topics = "my-topic", groupId = "my-group")
    public void consume(String message) {
        log.info("收到消息:{}", message);
        // 处理消息
    }
    
    @KafkaListener(topics = "order-topic", groupId = "order-group")
    public void consumeOrder(Order order) {
        log.info("收到订单:{}", order);
        // 处理订单
    }
}

3.2 手动 Ack

@Component
public class ManualAckConsumer {
    
    @KafkaListener(topics = "manual-ack-topic", groupId = "manual-ack-group")
    public void consume(ConsumerRecord<String, String> record, 
                       Acknowledgment acknowledgment) {
        try {
            log.info("收到消息:key={}, value={}", record.key(), record.value());
            
            // 处理消息
            processMessage(record.value());
            
            // 手动 Ack
            acknowledgment.acknowledge();
            
        } catch (Exception e) {
            log.error("处理失败", e);
            // 不 Ack,触发重试
            throw e;
        }
    }
}

3.3 批量消费

@Component
public class BatchConsumer {
    
    @KafkaListener(topics = "batch-topic", groupId = "batch-group")
    public void consumeBatch(List<ConsumerRecord<String, String>> records) {
        log.info("批量收到 {} 条消息", records.size());
        
        for (ConsumerRecord<String, String> record : records) {
            processMessage(record.value());
        }
    }
}

3.4 分区监听

@Component
public class PartitionConsumer {
    
    @KafkaListener(
        topicPartitions = @TopicPartition(
            topic = "partition-topic",
            partitions = {"0", "1", "2"}
        ),
        groupId = "partition-group"
    )
    public void consumePartition(ConsumerRecord<String, String> record) {
        log.info("收到分区消息:partition={}, offset={}", 
            record.partition(), record.offset());
    }
}

四、事务支持

4.1 事务配置

@Configuration
public class KafkaTransactionConfig {
    
    @Bean
    public ProducerFactory<String, Object> transactionalProducerFactory() {
        Map<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
        config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-1");
        
        return new DefaultKafkaProducerFactory<>(config);
    }
    
    @Bean
    public KafkaTemplate<String, Object> transactionalKafkaTemplate() {
        KafkaTemplate<String, Object> template = new KafkaTemplate<>(transactionalProducerFactory());
        template.setTransactionIdPrefix("tx-");
        return template;
    }
    
    @Bean
    public PlatformTransactionManager transactionManager(KafkaTemplate<String, Object> kafkaTemplate) {
        return new KafkaTransactionManager<>(kafkaTemplate.getProducerFactory());
    }
}

4.2 事务使用

@Service
public class TransactionalService {
    
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    
    @Autowired
    private OrderRepository orderRepository;
    
    /**
     * 事务发送消息
     */
    @Transactional
    public void processOrderWithTransaction(Order order) {
        // 1. 保存订单到数据库
        orderRepository.save(order);
        
        // 2. 发送 Kafka 消息(自动参与事务)
        kafkaTemplate.send("order-topic", "order", order);
        
        // 3. 如果任何一步失败,整个事务回滚
    }
}

五、错误处理

5.1 错误处理器

@Configuration
public class KafkaErrorHandlerConfig {
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
            ConsumerFactory<String, String> consumerFactory) {
        
        ConcurrentKafkaListenerContainerFactory<String, String> factory = 
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        
        // 配置错误处理器
        factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler(
            (record, ex) -> {
                // 处理失败记录
                log.error("消息处理失败:{}", record.value(), ex);
                saveFailedMessage(record, ex);
            },
            new FixedBackOff(1000L, 3L)  // 重试 3 次,间隔 1 秒
        ));
        
        return factory;
    }
}

5.2 死信队列

@Configuration
public class DeadLetterQueueConfig {
    
    @Bean
    public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
            KafkaTemplate<String, Object> kafkaTemplate) {
        return new DeadLetterPublishingRecoverer(kafkaTemplate,
            (record, ex) -> new TopicPartition(record.topic() + ".DLT", record.partition()));
    }
    
    @Bean
    public SeekToCurrentErrorHandler errorHandler(KafkaTemplate<String, Object> kafkaTemplate) {
        DeadLetterPublishingRecoverer recoverer = deadLetterPublishingRecoverer(kafkaTemplate);
        
        return new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(1000L, 3L));
    }
}

5.3 重试配置

@Configuration
public class KafkaRetryConfig {
    
    @Bean
    public RetryTemplate retryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();
        
        // 重试策略
        SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
        retryPolicy.setMaxAttempts(3);
        retryTemplate.setRetryPolicy(retryPolicy);
        
        // 退避策略
        ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
        backOffPolicy.setInitialInterval(1000);
        backOffPolicy.setMultiplier(2.0);
        backOffPolicy.setMaxInterval(10000);
        retryTemplate.setBackOffPolicy(backOffPolicy);
        
        return retryTemplate;
    }
}

六、实战案例

6.1 订单处理

@Service
public class OrderService {
    
    @Autowired
    private KafkaTemplate<String, Order> kafkaTemplate;
    
    /**
     * 创建订单
     */
    @Transactional
    public Order createOrder(OrderDTO dto) {
        // 1. 创建订单
        Order order = new Order();
        order.setId(UUID.randomUUID().toString());
        order.setUserId(dto.getUserId());
        order.setAmount(dto.getAmount());
        order.setStatus(OrderStatus.CREATED);
        
        orderRepository.save(order);
        
        // 2. 发送订单创建事件
        kafkaTemplate.send("order-events", order.getId(), 
            OrderEvent.builder()
                .type("ORDER_CREATED")
                .orderId(order.getId())
                .build());
        
        return order;
    }
    
    /**
     * 监听订单事件
     */
    @KafkaListener(topics = "order-events", groupId = "order-processor")
    public void processOrderEvent(OrderEvent event) {
        switch (event.getType()) {
            case "ORDER_CREATED":
                handleOrderCreated(event.getOrderId());
                break;
            case "ORDER_PAID":
                handleOrderPaid(event.getOrderId());
                break;
            case "ORDER_SHIPPED":
                handleOrderShipped(event.getOrderId());
                break;
        }
    }
}

6.2 日志收集

@Service
public class LogCollector {
    
    @Autowired
    private KafkaTemplate<String, LogEntry> kafkaTemplate;
    
    /**
     * 收集日志
     */
    public void collectLog(LogEntry log) {
        kafkaTemplate.send("logs-topic", log.getService(), log);
    }
}

// 异步日志拦截器
@Component
public class LogInterceptor implements HandlerInterceptor {
    
    @Autowired
    private LogCollector logCollector;
    
    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response,
                               Object handler, Exception ex) {
        LogEntry log = LogEntry.builder()
            .service("api-gateway")
            .method(request.getMethod())
            .url(request.getRequestURI())
            .status(response.getStatus())
            .timestamp(System.currentTimeMillis())
            .build();
        
        // 异步发送
        CompletableFuture.runAsync(() -> logCollector.collectLog(log));
    }
}

6.3 配置中心

@Component
public class ConfigCenter {
    
    private final Map<String, Object> configs = new ConcurrentHashMap<>();
    
    @KafkaListener(topics = "config-center", groupId = "config-subscriber")
    public void listenConfig(ConfigUpdateEvent event) {
        log.info("配置更新:key={}, value={}", event.getKey(), event.getValue());
        configs.put(event.getKey(), event.getValue());
    }
    
    public Object getConfig(String key) {
        return configs.get(key);
    }
}

七、监控运维

7.1 监控指标

@Configuration
public class KafkaMetricsConfig {
    
    @Bean
    public MeterBinder kafkaMetrics(ConsumerFactory<String, String> consumerFactory,
                                    ProducerFactory<String, String> producerFactory) {
        return (registry) -> {
            // Consumer 指标
            Gauge.builder("kafka.consumer.records.consumed", consumerFactory, f -> getRecordsConsumed())
                .register(registry);
            
            // Producer 指标
            Gauge.builder("kafka.producer.records.sent", producerFactory, f -> getRecordsSent())
                .register(registry);
        };
    }
}

7.2 健康检查

@Component
public class KafkaHealthIndicator implements HealthIndicator {
    
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    @Override
    public Health health() {
        try {
            // 发送测试消息
            kafkaTemplate.send("health-check", "test").get(5, TimeUnit.SECONDS);
            return Health.up().build();
        } catch (Exception e) {
            return Health.down(e).build();
        }
    }
}

八、最佳实践

8.1 配置建议

# 生产环境配置
spring:
  kafka:
    producer:
      acks: all
      retries: 3
      enable-idempotence: true
      properties:
        max.in.flight.requests.per.connection: 5
    
    consumer:
      enable-auto-commit: false
      auto-offset-reset: earliest
      max-poll-records: 500
    
    listener:
      ack-mode: manual
      concurrency: 3
      missing-topics-fatal: false

8.2 代码规范

// ✅ 推荐
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void consume(@Payload Order order, 
                    @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    process(order);
}

// ❌ 不推荐
@KafkaListener(topics = "my-topic")
public void consume(String message) {
    // 手动解析 JSON
}

8.3 检查清单

开发检查:
- [ ] 配置序列化器
- [ ] 配置错误处理
- [ ] 配置重试机制
- [ ] 配置死信队列

运维检查:
- [ ] 监控消费滞后
- [ ] 监控错误率
- [ ] 配置告警
- [ ] 定期备份配置

总结

Spring Kafka 的核心要点:

  1. 基础配置:依赖、YAML 配置、Bean 配置
  2. Producer 开发:KafkaTemplate、自定义配置、消息转换
  3. Consumer 开发:@KafkaListener、手动 Ack、批量消费
  4. 事务支持:事务配置、事务使用
  5. 错误处理:错误处理器、死信队列、重试机制
  6. 监控运维:指标、健康检查

核心要点

参考资料


分享这篇文章到:

上一篇文章
AI 应用生产环境问题排查指南
下一篇文章
Kafka Schema Registry 数据格式管理实战