Kafka 集成
Kafka 简介
核心特性
高吞吐:
- 每秒处理百万级消息
- 低延迟消息传输
- 水平扩展能力
持久化:
- 消息持久化到磁盘
- 支持消息回溯
- 长期存储能力
流处理:
- 支持流式计算
- 实时数据处理
- 事件溯源
架构设计
┌─────────────┐ ┌─────────────┐
│ Producer │ │ Consumer │
│ (生产者) │ │ (消费者) │
└──────┬──────┘ └──────▲──────┘
│ │
│ 发布消息 │ 订阅消息
▼ │
┌─────────────────────────────────┐
│ Kafka Cluster │
│ ┌─────────┐ ┌─────────┐ │
│ │ Broker 1│ │ Broker 2│ ... │
│ │ ┌─────┐ │ │ ┌─────┐ │ │
│ │ │Topic│ │ │ │Topic│ │ │
│ │ │Part │ │ │ │Part │ │ │
│ │ └─────┘ │ │ └─────┘ │ │
│ └─────────┘ └─────────┘ │
└─────────────────────────────────┘
▲
│
┌─────────────┐
│ ZooKeeper │
│ (元数据管理) │
└─────────────┘
核心概念
主题(Topic):
- 消息的分类
- 支持多分区
分区(Partition):
- Topic 的物理分区
- 支持并行处理
- 有序的消息序列
副本(Replica):
- 分区的副本
- Leader/Follower 角色
- 高可用保障
消费者组(Consumer Group):
- 消费者的逻辑分组
- 组内负载均衡
- 组间广播消费
快速开始
1. Docker 部署
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.4.0
container_name: kafka
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
kafka-ui:
image: provectuslabs/kafka-ui:latest
container_name: kafka-ui
depends_on:
- kafka
ports:
- 8088:8080
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
2. 添加依赖
<dependencies>
<!-- Spring for Apache Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Kafka Streams (可选) -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
</dependencies>
3. 基础配置
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all # 所有副本确认
retries: 3 # 重试次数
batch-size: 16384 # 批量大小
buffer-memory: 33554432 # 缓冲内存
consumer:
group-id: order-consumer-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: latest # 从最新开始消费
enable-auto-commit: true # 自动提交偏移量
auto-commit-interval: 1000 # 提交间隔
listener:
ack-mode: batch # 批量确认
concurrency: 3 # 并发消费者数量
4. 发送消息
基础发送:
@Service
public class OrderProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendOrderCreated(Order order) {
String topic = "order-created";
String key = order.getId().toString();
String value = JSON.toJSONString(order);
kafkaTemplate.send(topic, key, value);
log.info("订单消息已发送:{}", order.getId());
}
}
发送并获取结果:
@Service
public class OrderProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public SendResult<String, String> sendOrder(Order order) {
String topic = "order-created";
String key = order.getId().toString();
String value = JSON.toJSONString(order);
// 同步发送
SendResult<String, String> result = kafkaTemplate.send(
topic, key, value
).get(5, TimeUnit.SECONDS);
log.info("消息发送成功:offset={}, partition={}",
result.getRecordMetadata().offset(),
result.getRecordMetadata().partition()
);
return result;
}
}
异步回调:
@Service
public class OrderProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendOrderAsync(Order order) {
String topic = "order-created";
String key = order.getId().toString();
String value = JSON.toJSONString(order);
ListenableFuture<SendResult<String, String>> future =
kafkaTemplate.send(topic, key, value);
future.addCallback(
result -> {
log.info("消息发送成功:offset={}",
result.getRecordMetadata().offset()
);
},
ex -> {
log.error("消息发送失败", ex);
}
);
}
}
5. 消费消息
基础消费者:
@Component
public class OrderConsumer {
@KafkaListener(
topics = "order-created",
groupId = "order-consumer-group"
)
public void consume(String message) {
log.info("收到消息:{}", message);
Order order = JSON.parseObject(message, Order.class);
processOrder(order);
}
private void processOrder(Order order) {
// 业务处理
}
}
指定分区:
@KafkaListener(
topics = "order-created",
groupId = "order-consumer-group",
containerFactory = "kafkaListenerContainerFactory"
)
public void consume(
@Payload String message,
@TopicPartition(topic = "order-created", partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "0"),
@PartitionOffset(partition = "1", initialOffset = "0")
})
) {
// 消费指定分区
}
批量消费:
@KafkaListener(
topics = "order-created",
groupId = "order-consumer-group"
)
public void consumeBatch(List<String> messages) {
log.info("批量收到 {} 条消息", messages.size());
for (String message : messages) {
Order order = JSON.parseObject(message, Order.class);
processOrder(order);
}
}
高级特性
1. 消息序列化
JSON 序列化:
@Configuration
public class KafkaConfig {
@Bean
public ProducerFactory<String, Order> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, Order> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ConsumerFactory<String, Order> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "order-consumer-group");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.model");
return new DefaultKafkaConsumerFactory<>(
config,
new StringDeserializer(),
new JsonDeserializer<>(Order.class)
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Order> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Order> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.setBatchListener(true);
return factory;
}
}
2. 消息过滤
消息过滤器:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 消息过滤
factory.setRecordFilterStrategy(record -> {
String value = record.value();
Order order = JSON.parseObject(value, Order.class);
// 过滤金额小于 100 的订单
return order.getAmount() < 100;
});
return factory;
}
3. 错误处理
错误处理器:
@Configuration
public class KafkaErrorHandlerConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory
) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
// 配置错误处理器
DefaultErrorHandler errorHandler = new DefaultErrorHandler(
(consumerRecord, exception) -> {
// 处理失败消息
log.error("消费失败:topic={}, partition={}, offset={}",
consumerRecord.topic(),
consumerRecord.partition(),
consumerRecord.offset(),
exception
);
// 发送到死信队列
sendToDeadLetterQueue(consumerRecord, exception);
},
new FixedBackOff(1000L, 3L) // 重试 3 次,间隔 1 秒
);
errorHandler.addNotRetryableExceptions(
IllegalArgumentException.class, // 不重试的异常
ClassCastException.class
);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
private void sendToDeadLetterQueue(ConsumerRecord<String, String> record, Exception e) {
// 发送到死信队列
kafkaTemplate.send("order-dlq", record.key(), record.value());
}
}
4. 手动提交偏移量
手动 ACK:
@Component
public class ManualAckConsumer {
@KafkaListener(
topics = "order-created",
groupId = "order-consumer-group",
containerFactory = "manualAckFactory"
)
public void consume(
String message,
Acknowledgment acknowledgment,
ConsumerRecord<?, ?> record
) {
try {
log.info("收到消息:topic={}, partition={}, offset={}",
record.topic(), record.partition(), record.offset()
);
Order order = JSON.parseObject(message, Order.class);
processOrder(order);
// 手动确认
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("消费失败", e);
// 不确认,会重新投递
throw e;
}
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> manualAckFactory(
ConsumerFactory<String, String> consumerFactory
) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
return factory;
}
}
5. 消息拦截器
生产者拦截器:
public class OrderProducerInterceptor implements ProducerInterceptor<String, String> {
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// 添加消息头
record.headers().add("timestamp",
String.valueOf(System.currentTimeMillis()).getBytes()
);
// 添加消息 ID
record.headers().add("message-id", UUID.randomUUID().toString().getBytes());
log.info("发送消息:topic={}, key={}", record.topic(), record.key());
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
if (exception != null) {
log.error("消息确认失败", exception);
} else {
log.info("消息确认成功:offset={}", metadata.offset());
}
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
Kafka Streams
1. 基础流处理
流处理应用:
@Configuration
public class KafkaStreamsConfig {
@Bean
public KStream<String, Order> orderStream(
StreamsBuilder streamsBuilder
) {
KStream<String, Order> stream = streamsBuilder
.stream("order-created",
Consumed.with(Serdes.String(), jsonSerde(Order.class))
);
// 处理逻辑
stream
.filter((key, order) -> order.getAmount() > 100) // 过滤
.mapValues(order -> {
order.setStatus("PROCESSED");
return order;
}) // 转换
.to("order-processed"); // 输出
return stream;
}
private <T> Serde<T> jsonSerde(Class<T> clazz) {
Map<String, Object> props = new HashMap<>();
props.put(JsonSerde.JSON_VALUE_TYPE, clazz);
return new JsonSerde<>(props);
}
}
2. 聚合操作
统计订单:
@Bean
public KTable<String, Long> orderCountByUser(
KStream<String, Order> orderStream,
StreamsBuilder streamsBuilder
) {
return orderStream
.groupBy((key, order) -> order.getUserId())
.count()
.toStream()
.peek((userId, count) ->
log.info("用户 {} 订单数:{}", userId, count)
)
.to("order-count-result");
}
窗口聚合:
@Bean
public KTable<Windowed<String>, Long> orderCountByWindow(
KStream<String, Order> orderStream
) {
return orderStream
.groupBy((key, order) -> order.getUserId())
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count();
}
3. 流表连接
订单与用户关联:
@Bean
public KStream<String, OrderUser> orderUserJoin(
KStream<String, Order> orderStream,
KTable<String, User> userTable
) {
return orderStream
.join(userTable,
(order, user) -> new OrderUser(order, user),
JoinWindows.of(Duration.ofMinutes(10))
);
}
监控告警
1. 监控指标
生产者指标:
- 发送速率
- 发送延迟
- 发送失败率
- 批量大小
消费者指标:
- 消费速率
- 消费延迟(Lag)
- 重平衡次数
- 处理时间
Broker 指标:
- 消息吞吐量
- 磁盘使用率
- 网络流量
- 副本同步延迟
2. Prometheus 集成
# Kafka 配置
kafka:
metrics:
reporters:
- io.prometheus.jmx.JmxReporter
# Prometheus 告警规则
groups:
- name: kafka
rules:
- alert: KafkaConsumerLag
expr: kafka_consumer_lag > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "Kafka 消费积压"
- alert: KafkaUnderReplicatedPartitions
expr: kafka_server_underreplicatedpartitions > 0
for: 5m
labels:
severity: critical
annotations:
summary: "Kafka 副本未同步"
最佳实践
1. Topic 设计
分区数规划:
- 根据吞吐量确定
- 考虑消费者并行度
- 预留扩展空间
# Topic 配置
num.partitions=12
default.replication.factor=3
retention.ms=604800000 # 7 天
2. 消息设计
消息大小:
- 控制在 1MB 以内
- 大对象使用外部存储
消息格式:
{
"id": "uuid",
"type": "ORDER_CREATED",
"timestamp": 1680000000000,
"version": "1.0",
"data": {}
}
3. 消费者优化
批量消费:
spring:
kafka:
consumer:
max-poll-records: 500 # 每次拉取数量
fetch-min-size: 1 # 最小拉取大小
fetch-max-wait: 500 # 最大等待时间
并发消费:
spring:
kafka:
listener:
concurrency: 6 # 并发消费者数量
max-poll-records: 500
4. 可靠性保障
生产者:
spring:
kafka:
producer:
acks: all # 所有副本确认
retries: 3
max-in-flight-requests-per-connection: 5
enable-idempotence: true # 幂等生产者
消费者:
spring:
kafka:
consumer:
enable-auto-commit: false # 手动提交
isolation-level: read_committed # 只读已提交
5. 故障处理
死信队列:
@Component
public class DeadLetterQueueHandler {
@KafkaListener(
topics = "order-dlq",
groupId = "dlq-consumer-group"
)
public void handleDlq(String message) {
// 记录失败消息
log.error("死信消息:{}", message);
// 告警或人工处理
alertService.sendAlert("Kafka 死信消息", message);
}
}
重平衡处理:
@Component
public class ConsumerRebalanceListenerImpl implements ConsumerRebalanceListener {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 提交偏移量
// 清理资源
log.info("分区被撤销:{}", partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 从指定位置开始消费
log.info("分区被分配:{}", partitions);
}
}
总结
Kafka 是高吞吐、分布式的消息系统,支持发布订阅、流处理、事件溯源等多种场景。
与 Spring Cloud 集成后,可以实现服务解耦、异步通信、数据同步等功能。
在生产环境中,需要做好 Topic 规划、监控告警和故障处理,确保 Kafka 集群的稳定运行。