前言
Kafka 是 LinkedIn 开源的分布式流处理平台,广泛用于日志收集、实时数据处理等场景。Spring Boot 通过 spring-kafka 可以方便地集成 Kafka。
快速开始
1. 添加依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
2. 基础配置
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
retries: 3
properties:
enable.idempotence: true
consumer:
group-id: demo-consumer-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
enable-auto-commit: false
properties:
isolation.level: read_committed
listener:
ack-mode: manual
concurrency: 3
3. 发送消息
@Service
@RequiredArgsConstructor
public class KafkaMessageService {
private final KafkaTemplate<String, String> kafkaTemplate;
/**
* 发送简单消息
*/
public void sendSimpleMessage(String message) {
kafkaTemplate.send("demo-topic", message);
}
/**
* 发送带 key 的消息
*/
public void sendWithKey(String key, String message) {
kafkaTemplate.send("demo-topic", key, message);
}
/**
* 发送到指定分区
*/
public void sendToPartition(int partition, String message) {
kafkaTemplate.send("demo-topic", partition, null, message);
}
/**
* 发送并获取结果
*/
public void sendWithResult(String message) {
SendResult<String, String> result =
kafkaTemplate.send("demo-topic", message).get();
log.info("发送结果:{}", result.getRecordMetadata());
}
/**
* 异步发送
*/
public void sendAsync(String message) {
kafkaTemplate.send("demo-topic", message)
.whenComplete((result, ex) -> {
if (ex == null) {
log.info("发送成功:{}", result.getRecordMetadata());
} else {
log.error("发送失败", ex);
}
});
}
/**
* 发送对象
*/
public void sendObject(UserDTO user) {
kafkaTemplate.send("user-topic", user);
}
}
4. 消费消息
@Component
public class KafkaConsumer {
/**
* 基础消费
*/
@KafkaListener(topics = "demo-topic")
public void consume(String message) {
log.info("收到消息:{}", message);
// 业务处理
processMessage(message);
}
/**
* 消费带 key 的消息
*/
@KafkaListener(topics = "demo-topic")
public void consumeWithKey(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_KEY) String key
) {
log.info("收到消息:key={}, value={}", key, message);
}
/**
* 消费对象
*/
@KafkaListener(topics = "user-topic")
public void consumeUser(UserDTO user) {
log.info("收到用户消息:{}", user);
userService.process(user);
}
/**
* 批量消费
*/
@KafkaListener(topics = "batch-topic")
public void consumeBatch(List<String> messages) {
log.info("批量收到 {} 条消息", messages.size());
for (String message : messages) {
processMessage(message);
}
}
private void processMessage(String message) {
// 业务逻辑
}
}
5. 手动提交 Offset
@Component
public class ManualCommitConsumer {
@KafkaListener(topics = "manual-commit-topic")
public void consume(
String message,
Acknowledgment acknowledgment
) {
try {
// 业务处理
processMessage(message);
// 手动提交
acknowledgment.acknowledge();
log.info("消息处理成功并提交 offset");
} catch (Exception e) {
log.error("消息处理失败", e);
// 不提交 offset,会重试
throw e;
}
}
}
高级特性
1. 消息过滤
@Component
public class FilterConsumer {
@KafkaListener(
topics = "filter-topic",
property = "spring.json.value.default.type=com.example.demo.dto.UserDTO"
)
public void consume(UserDTO user) {
// 只消费 UserDTO 类型的消息
log.info("收到用户消息:{}", user);
}
}
@Configuration
public class KafkaConfig {
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "filter-consumer-group");
return new DefaultKafkaConsumerFactory<>(
props,
new StringDeserializer(),
new JsonDeserializer<>(Object.class, false)
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 设置消息过滤器
factory.setRecordFilterStrategy(record -> {
String messageType = record.headers()
.lastHeader("message-type")
.value().toString();
// 只消费特定类型的消息
return !"important".equals(messageType);
});
return factory;
}
}
2. 错误处理
@Configuration
public class KafkaConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 配置错误处理器
factory.setCommonErrorHandler(new DefaultErrorHandler(
(record, exception) -> {
// 错误处理逻辑
log.error("消费失败:{}", record.value(), exception);
// 发送到死信队列
sendToDLQ(record, exception);
},
new FixedBackOff(1000L, 3L) // 重试 3 次,间隔 1 秒
));
return factory;
}
private void sendToDLQ(ConsumerRecord<?, ?> record, Exception ex) {
// 发送到死信队列
kafkaTemplate.send("dlq-topic", record.value());
}
}
3. Kafka Streams
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Bean
public KStream<String, Order> orderStream(StreamsBuilder streamsBuilder) {
KStream<String, Order> orderStream = streamsBuilder.stream("order-topic");
// 过滤有效订单
KStream<String, Order> validOrders = orderStream
.filter((key, order) -> order.getAmount() > 0);
// 按用户分组
KGroupedStream<String, Order> groupedByUser = validOrders
.groupBy((key, order) -> String.valueOf(order.getUserId()));
// 聚合统计
KTable<String, Long> userOrderCount = groupedByUser
.count(Materialized.as("user-order-count-store"));
// 输出结果
userOrderCount.toStream()
.peek((userId, count) -> log.info("用户 {} 订单数:{}", userId, count))
.to("user-order-count-topic");
return validOrders;
}
@Bean
public KStream<String, String> processStream(StreamsBuilder streamsBuilder) {
KStream<String, String> stream = streamsBuilder.stream("input-topic");
return stream
.mapValues(value -> value.toUpperCase())
.filter((key, value) -> !value.isEmpty())
.to("output-topic");
}
}
4. 事务支持
spring:
kafka:
producer:
transaction-id-prefix: tx-
consumer:
isolation-level: read_committed
@Service
@RequiredArgsConstructor
public class TransactionService {
private final KafkaTemplate<String, String> kafkaTemplate;
@Transactional
public void sendTransactionalMessages() {
// 数据库操作
Order order = createOrder();
// Kafka 消息(与数据库操作在同一事务中)
kafkaTemplate.send("order-topic", order.getId().toString(), order);
// 如果数据库操作回滚,Kafka 消息也会回滚
}
}
5. 分区管理
@Configuration
public class KafkaConfig {
@Bean
public NewTopic demoTopic() {
return TopicBuilder.name("demo-topic")
.partitions(6)
.replicas(3)
.configs(Map.of(
"retention.ms", "604800000", // 7 天
"segment.bytes", "1073741824" // 1GB
))
.build();
}
@Bean
public NewTopic userTopic() {
return TopicBuilder.name("user-topic")
.partitions(3)
.replicas(3)
.build();
}
}
最佳实践
1. 消息序列化
@Configuration
public class KafkaConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
2. 消费幂等性
@Component
public class IdempotentConsumer {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@KafkaListener(topics = "idempotent-topic")
public void consume(
String message,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset
) {
String key = String.format("kafka:processed:%d:%d", partition, offset);
Boolean isNew = redisTemplate.opsForValue()
.setIfAbsent(key, "processed", 24, TimeUnit.HOURS);
if (Boolean.FALSE.equals(isNew)) {
log.warn("消息已处理,跳过");
return;
}
processMessage(message);
}
}
3. 监控指标
@Component
public class KafkaMetrics {
private final MeterRegistry meterRegistry;
public KafkaMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
// 注册 Kafka 指标
new KafkaMetrics().bindTo(meterRegistry);
}
public void recordSend(String topic, long bytes) {
meterRegistry.counter("kafka.message.send",
"topic", topic
).increment();
meterRegistry.summary("kafka.message.bytes",
"topic", topic
).record(bytes);
}
}
4. 配置优化
spring:
kafka:
producer:
# 批量发送
batch-size: 16384
buffer-memory: 33554432
# 压缩
properties:
compression.type: lz4
# 确认机制
acks: all
# 重试
retries: 3
retry-backoff-ms: 1000
consumer:
# 批量拉取
max-poll-records: 500
fetch-min-size: 1
fetch-max-wait-ms: 500
# Session 超时
session-timeout-ms: 30000
heartbeat-interval-ms: 10000
5. 死信队列
@Configuration
public class KafkaConfig {
@Bean
public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
KafkaTemplate<String, Object> template
) {
return new DeadLetterPublishingRecoverer(template,
(record, exception) -> {
// 死信队列命名:原 topic.DLT
return new TopicPartition(
record.topic() + ".DLT",
record.partition()
);
}
);
}
@Bean
public DefaultErrorHandler errorHandler(
DeadLetterPublishingRecoverer recoverer
) {
return new DefaultErrorHandler(recoverer, new FixedBackOff(1000L, 3L));
}
}
@Component
public class DLQConsumer {
@KafkaListener(topics = "demo-topic.DLT")
public void consumeDLQ(
@Payload String message,
@Header(KafkaHeaders.DLT_EXCEPTION_MESSAGE) String exceptionMessage
) {
log.error("死信消息:{}, 异常:{}", message, exceptionMessage);
// 记录日志、发送告警
alertService.sendAlert("死信消息:" + message);
}
}
总结
Kafka 集成要点:
- ✅ 基础使用 - 发送、消费消息
- ✅ Kafka Streams - 流处理
- ✅ 事务支持 - Exactly-Once 语义
- ✅ 错误处理 - 重试、死信队列
- ✅ 最佳实践 - 序列化、幂等性、监控
Kafka 是事件驱动架构的核心组件。