Spring Kafka 提供了 Kafka 的 Spring 风格抽象,简化了 Kafka 应用的开发。本文将深入探讨 Spring Kafka 的配置、使用和最佳实践。
一、Spring Kafka 基础
1.1 依赖配置
Maven 依赖:
<dependencies>
<!-- Spring Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.0.0</version>
</dependency>
<!-- Kafka Clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
</dependencies>
1.2 基础配置
application.yml:
spring:
kafka:
bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
# Producer 配置
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
retries: 3
batch-size: 16384
buffer-memory: 33554432
# Consumer 配置
consumer:
group-id: my-consumer-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
enable-auto-commit: false
auto-commit-interval: 1000
# 监听器配置
listener:
ack-mode: manual
concurrency: 3
二、Producer 开发
2.1 基础 Producer
@Service
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* 发送消息
*/
public void sendMessage(String topic, String key, String value) {
kafkaTemplate.send(topic, key, value);
}
/**
* 发送消息带回调
*/
public void sendMessageWithCallback(String topic, String key, String value) {
ListenableFuture<SendResult<String, String>> future =
kafkaTemplate.send(topic, key, value);
future.addCallback(
result -> {
log.info("发送成功:topic={}, partition={}, offset={}",
result.getRecordMetadata().topic(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
},
ex -> {
log.error("发送失败:{}", ex.getMessage());
}
);
}
/**
* 发送消息并等待结果
*/
public SendResult<String, String> sendMessageSync(String topic, String key, String value)
throws ExecutionException, InterruptedException {
return kafkaTemplate.send(topic, key, value).get();
}
}
2.2 自定义 Producer
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Order> orderProducerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
config.put(ProducerConfig.ACKS_CONFIG, "all");
config.put(ProducerConfig.RETRIES_CONFIG, 3);
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Order> orderKafkaTemplate() {
return new KafkaTemplate<>(orderProducerFactory());
}
}
2.3 消息转换器
@Configuration
public class KafkaConverterConfig {
@Bean
public MessageConverter jsonMessageConverter() {
Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
// 设置类型映射
Map<String, Class<?>> typeMappings = new HashMap<>();
typeMappings.put("order", Order.class);
typeMappings.put("payment", Payment.class);
converter.setTypePrecedence(MessageConverterTypePrecedence.TYPE_ID);
converter.setTypeIdMappings(typeMappings);
return converter;
}
}
// 使用
@Service
public class OrderProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
public void sendOrder(Order order) {
kafkaTemplate.send("order-topic", "order", order);
}
}
三、Consumer 开发
3.1 基础 Consumer
@Component
public class KafkaConsumerService {
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void consume(String message) {
log.info("收到消息:{}", message);
// 处理消息
}
@KafkaListener(topics = "order-topic", groupId = "order-group")
public void consumeOrder(Order order) {
log.info("收到订单:{}", order);
// 处理订单
}
}
3.2 手动 Ack
@Component
public class ManualAckConsumer {
@KafkaListener(topics = "manual-ack-topic", groupId = "manual-ack-group")
public void consume(ConsumerRecord<String, String> record,
Acknowledgment acknowledgment) {
try {
log.info("收到消息:key={}, value={}", record.key(), record.value());
// 处理消息
processMessage(record.value());
// 手动 Ack
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("处理失败", e);
// 不 Ack,触发重试
throw e;
}
}
}
3.3 批量消费
@Component
public class BatchConsumer {
@KafkaListener(topics = "batch-topic", groupId = "batch-group")
public void consumeBatch(List<ConsumerRecord<String, String>> records) {
log.info("批量收到 {} 条消息", records.size());
for (ConsumerRecord<String, String> record : records) {
processMessage(record.value());
}
}
}
3.4 分区监听
@Component
public class PartitionConsumer {
@KafkaListener(
topicPartitions = @TopicPartition(
topic = "partition-topic",
partitions = {"0", "1", "2"}
),
groupId = "partition-group"
)
public void consumePartition(ConsumerRecord<String, String> record) {
log.info("收到分区消息:partition={}, offset={}",
record.partition(), record.offset());
}
}
四、事务支持
4.1 事务配置
@Configuration
public class KafkaTransactionConfig {
@Bean
public ProducerFactory<String, Object> transactionalProducerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
config.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
config.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-1");
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Object> transactionalKafkaTemplate() {
KafkaTemplate<String, Object> template = new KafkaTemplate<>(transactionalProducerFactory());
template.setTransactionIdPrefix("tx-");
return template;
}
@Bean
public PlatformTransactionManager transactionManager(KafkaTemplate<String, Object> kafkaTemplate) {
return new KafkaTransactionManager<>(kafkaTemplate.getProducerFactory());
}
}
4.2 事务使用
@Service
public class TransactionalService {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@Autowired
private OrderRepository orderRepository;
/**
* 事务发送消息
*/
@Transactional
public void processOrderWithTransaction(Order order) {
// 1. 保存订单到数据库
orderRepository.save(order);
// 2. 发送 Kafka 消息(自动参与事务)
kafkaTemplate.send("order-topic", "order", order);
// 3. 如果任何一步失败,整个事务回滚
}
}
五、错误处理
5.1 错误处理器
@Configuration
public class KafkaErrorHandlerConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
// 配置错误处理器
factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler(
(record, ex) -> {
// 处理失败记录
log.error("消息处理失败:{}", record.value(), ex);
saveFailedMessage(record, ex);
},
new FixedBackOff(1000L, 3L) // 重试 3 次,间隔 1 秒
));
return factory;
}
}
5.2 死信队列
@Configuration
public class DeadLetterQueueConfig {
@Bean
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
KafkaTemplate<String, Object> kafkaTemplate) {
return new DeadLetterPublishingRecoverer(kafkaTemplate,
(record, ex) -> new TopicPartition(record.topic() + ".DLT", record.partition()));
}
@Bean
public SeekToCurrentErrorHandler errorHandler(KafkaTemplate<String, Object> kafkaTemplate) {
DeadLetterPublishingRecoverer recoverer = deadLetterPublishingRecoverer(kafkaTemplate);
return new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(1000L, 3L));
}
}
5.3 重试配置
@Configuration
public class KafkaRetryConfig {
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
// 重试策略
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
retryTemplate.setRetryPolicy(retryPolicy);
// 退避策略
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000);
backOffPolicy.setMultiplier(2.0);
backOffPolicy.setMaxInterval(10000);
retryTemplate.setBackOffPolicy(backOffPolicy);
return retryTemplate;
}
}
六、实战案例
6.1 订单处理
@Service
public class OrderService {
@Autowired
private KafkaTemplate<String, Order> kafkaTemplate;
/**
* 创建订单
*/
@Transactional
public Order createOrder(OrderDTO dto) {
// 1. 创建订单
Order order = new Order();
order.setId(UUID.randomUUID().toString());
order.setUserId(dto.getUserId());
order.setAmount(dto.getAmount());
order.setStatus(OrderStatus.CREATED);
orderRepository.save(order);
// 2. 发送订单创建事件
kafkaTemplate.send("order-events", order.getId(),
OrderEvent.builder()
.type("ORDER_CREATED")
.orderId(order.getId())
.build());
return order;
}
/**
* 监听订单事件
*/
@KafkaListener(topics = "order-events", groupId = "order-processor")
public void processOrderEvent(OrderEvent event) {
switch (event.getType()) {
case "ORDER_CREATED":
handleOrderCreated(event.getOrderId());
break;
case "ORDER_PAID":
handleOrderPaid(event.getOrderId());
break;
case "ORDER_SHIPPED":
handleOrderShipped(event.getOrderId());
break;
}
}
}
6.2 日志收集
@Service
public class LogCollector {
@Autowired
private KafkaTemplate<String, LogEntry> kafkaTemplate;
/**
* 收集日志
*/
public void collectLog(LogEntry log) {
kafkaTemplate.send("logs-topic", log.getService(), log);
}
}
// 异步日志拦截器
@Component
public class LogInterceptor implements HandlerInterceptor {
@Autowired
private LogCollector logCollector;
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response,
Object handler, Exception ex) {
LogEntry log = LogEntry.builder()
.service("api-gateway")
.method(request.getMethod())
.url(request.getRequestURI())
.status(response.getStatus())
.timestamp(System.currentTimeMillis())
.build();
// 异步发送
CompletableFuture.runAsync(() -> logCollector.collectLog(log));
}
}
6.3 配置中心
@Component
public class ConfigCenter {
private final Map<String, Object> configs = new ConcurrentHashMap<>();
@KafkaListener(topics = "config-center", groupId = "config-subscriber")
public void listenConfig(ConfigUpdateEvent event) {
log.info("配置更新:key={}, value={}", event.getKey(), event.getValue());
configs.put(event.getKey(), event.getValue());
}
public Object getConfig(String key) {
return configs.get(key);
}
}
七、监控运维
7.1 监控指标
@Configuration
public class KafkaMetricsConfig {
@Bean
public MeterBinder kafkaMetrics(ConsumerFactory<String, String> consumerFactory,
ProducerFactory<String, String> producerFactory) {
return (registry) -> {
// Consumer 指标
Gauge.builder("kafka.consumer.records.consumed", consumerFactory, f -> getRecordsConsumed())
.register(registry);
// Producer 指标
Gauge.builder("kafka.producer.records.sent", producerFactory, f -> getRecordsSent())
.register(registry);
};
}
}
7.2 健康检查
@Component
public class KafkaHealthIndicator implements HealthIndicator {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Override
public Health health() {
try {
// 发送测试消息
kafkaTemplate.send("health-check", "test").get(5, TimeUnit.SECONDS);
return Health.up().build();
} catch (Exception e) {
return Health.down(e).build();
}
}
}
八、最佳实践
8.1 配置建议
# 生产环境配置
spring:
kafka:
producer:
acks: all
retries: 3
enable-idempotence: true
properties:
max.in.flight.requests.per.connection: 5
consumer:
enable-auto-commit: false
auto-offset-reset: earliest
max-poll-records: 500
listener:
ack-mode: manual
concurrency: 3
missing-topics-fatal: false
8.2 代码规范
// ✅ 推荐
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void consume(@Payload Order order,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
process(order);
}
// ❌ 不推荐
@KafkaListener(topics = "my-topic")
public void consume(String message) {
// 手动解析 JSON
}
8.3 检查清单
开发检查:
- [ ] 配置序列化器
- [ ] 配置错误处理
- [ ] 配置重试机制
- [ ] 配置死信队列
运维检查:
- [ ] 监控消费滞后
- [ ] 监控错误率
- [ ] 配置告警
- [ ] 定期备份配置
总结
Spring Kafka 的核心要点:
- 基础配置:依赖、YAML 配置、Bean 配置
- Producer 开发:KafkaTemplate、自定义配置、消息转换
- Consumer 开发:@KafkaListener、手动 Ack、批量消费
- 事务支持:事务配置、事务使用
- 错误处理:错误处理器、死信队列、重试机制
- 监控运维:指标、健康检查
核心要点:
- 使用 Spring Kafka 简化开发
- 合理配置错误处理和重试
- 实现事务保证数据一致
- 建立完善的监控体系
参考资料
- Spring Kafka 官方文档
- Spring Kafka GitHub
- 《Spring 实战》第 12 章