Kafka 是一个高吞吐量、分布式、基于发布/订阅模式的消息队列系统,广泛应用于日志收集、流处理、事件溯源等场景。本文将深入探讨 Kafka 的架构设计和核心概念。
一、Kafka 简介
1.1 核心特性
| 特性 | 说明 |
|---|---|
| 高吞吐 | 每秒可处理百万级消息 |
| 分布式 | 支持水平扩展,高可用 |
| 持久化 | 消息持久化到磁盘,支持回放 |
| 多订阅 | 支持多个消费者组订阅 |
| 实时性 | 低延迟,毫秒级消息传递 |
1.2 应用场景
graph TB
subgraph 应用场景
A[日志收集] --> B[ELK Stack]
C[流处理] --> D[Flink/Spark]
E[事件驱动] --> F[微服务解耦]
G[数据同步] --> H[CDC/ETL]
end
二、架构概览
2.1 核心组件
graph TB
subgraph Producer
P1[Producer 1]
P2[Producer 2]
end
subgraph Kafka Cluster
B1[Broker 1]
B2[Broker 2]
B3[Broker 3]
end
subgraph Consumer
C1[Consumer 1]
C2[Consumer 2]
end
P1 --> B1
P2 --> B2
B1 --> C1
B2 --> C2
B3 --> C1
核心组件说明:
| 组件 | 说明 |
|---|---|
| Producer | 消息生产者,负责发送消息到 Kafka |
| Consumer | 消息消费者,负责从 Kafka 消费消息 |
| Broker | Kafka 服务器,存储消息数据 |
| Topic | 消息主题,消息的逻辑分类 |
| Partition | 分区,Topic 的物理分片 |
| Replica | 副本,Partition 的数据备份 |
| Zookeeper | 分布式协调服务(Kafka 2.8+ 可选) |
2.2 Topic 与 Partition
Topic 结构:
Topic: orders
├── Partition 0
│ ├── Offset 0: {order_id: 1}
│ ├── Offset 1: {order_id: 2}
│ └── Offset 2: {order_id: 3}
├── Partition 1
│ ├── Offset 0: {order_id: 4}
│ └── Offset 1: {order_id: 5}
└── Partition 2
├── Offset 0: {order_id: 6}
└── Offset 1: {order_id: 7}
Partition 特点:
- ✅ 有序:每个 Partition 内消息有序
- ✅ 可扩展:增加 Partition 提升吞吐量
- ✅ 并行消费:多个消费者可并行消费不同 Partition
三、Producer 详解
3.1 发送流程
sequenceDiagram
participant App as 应用程序
participant Producer as Producer
participant Broker as Broker
App->>Producer: 发送消息
Producer->>Producer: 序列化
Producer->>Producer: 分区选择
Producer->>Broker: 发送请求
Broker-->>Producer: ACK 确认
Producer-->>App: 返回结果
3.2 分区策略
// 1. 指定 Key(默认策略)
ProducerRecord<String, String> record =
new ProducerRecord<>("topic", "key", "value");
// 2. 指定 Partition
ProducerRecord<String, String> record =
new ProducerRecord<>("topic", 0, "key", "value");
// 3. 自定义分区器
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
CustomPartitioner.class.getName());
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key,
byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) {
// 自定义分区逻辑
return Math.abs(key.hashCode()) % numPartitions;
}
}
3.3 ACK 确认机制
| ACK | 说明 | 可靠性 | 性能 |
|---|---|---|---|
| 0 | 不等待确认 | 低 | 高 |
| 1 | Leader 确认 | 中 | 中 |
| all | 所有副本确认 | 高 | 低 |
配置示例:
# 最高可靠性
acks=all
retries=3
min.insync.replicas=2
# 最高性能
acks=0
batch.size=65536
linger.ms=10
# 平衡方案
acks=1
retries=3
enable.idempotence=true
3.4 幂等性与事务
幂等性配置:
enable.idempotence=true
max.in.flight.requests.per.connection=5
事务示例:
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx-1");
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
四、Consumer 详解
4.1 消费者组
graph TB
subgraph Consumer Group
C1[Consumer 1]
C2[Consumer 2]
C3[Consumer 3]
end
subgraph Topic
P0[Partition 0]
P1[Partition 1]
P2[Partition 2]
end
C1 --> P0
C2 --> P1
C3 --> P2
消费者组特点:
- ✅ 一个 Partition 只能被组内一个消费者消费
- ✅ 消费者可以消费多个 Partition
- ✅ 多个消费者组可独立消费同一 Topic
4.2 分区分配策略
// Range 策略(默认)
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.RangeAssignor");
// RoundRobin 策略
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.RoundRobinAssignor");
// Sticky 策略
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.StickyAssignor");
4.3 Offset 管理
// 自动提交(默认 5 秒)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
// 手动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
// 同步提交
consumer.commitSync();
// 异步提交
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
// 处理失败
}
});
4.4 消费流程
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("topic"));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n",
record.offset(), record.key(), record.value());
}
// 手动提交
consumer.commitSync();
}
五、Broker 详解
5.1 副本机制
副本类型:
| 类型 | 说明 |
|---|---|
| Leader | 主副本,处理读写请求 |
| Follower | 从副本,同步 Leader 数据 |
| ISR | In-Sync Replicas,与 Leader 同步的副本 |
副本同步流程:
sequenceDiagram
participant Producer
participant Leader
participant Follower1
participant Follower2
Producer->>Leader: 写入消息
Leader->>Follower1: 同步消息
Leader->>Follower2: 同步消息
Follower1-->>Leader: ACK
Follower2-->>Leader: ACK
Leader-->>Producer: 确认写入
5.2 高可用配置
# 副本数
default.replication.factor=3
# 最小 ISR 副本数
min.insync.replicas=2
# 自动 Leader 选举
auto.leader.rebalance.enable=true
# 未复制的 Leader 选举
unclean.leader.election.enable=false
5.3 日志管理
日志段配置:
# 日志段大小
log.segment.bytes=1073741824
# 日志保留时间
log.retention.hours=168
# 日志保留大小
log.retention.bytes=-1
# 清理策略(delete/compact)
log.cleanup.policy=delete
六、实战应用
6.1 Spring Boot 集成
Maven 依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
配置文件:
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
retries: 3
consumer:
group-id: test-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: latest
enable-auto-commit: false
Producer 实现:
@Service
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String key, String message) {
ListenableFuture<SendResult<String, String>> future =
kafkaTemplate.send(topic, key, message);
future.addCallback(result -> {
System.out.println("发送成功:" + result.getRecordMetadata().offset());
}, ex -> {
System.out.println("发送失败:" + ex.getMessage());
});
}
}
Consumer 实现:
@Component
public class KafkaConsumer {
@KafkaListener(topics = "test-topic", groupId = "test-group")
public void consume(String message) {
System.out.println("收到消息:" + message);
}
@KafkaListener(topics = "test-topic", groupId = "test-group")
public void consumeWithKey(ConsumerRecord<String, String> record) {
System.out.printf("Key: %s, Value: %s, Offset: %d%n",
record.key(), record.value(), record.offset());
}
}
6.2 订单处理示例
// 订单事件
public class OrderEvent {
private String orderId;
private String userId;
private Double amount;
private OrderStatus status;
private LocalDateTime createTime;
}
// Producer
@Service
public class OrderProducer {
@Autowired
private KafkaTemplate<String, OrderEvent> kafkaTemplate;
public void sendOrderCreated(OrderEvent event) {
kafkaTemplate.send("order-events", event.getOrderId(), event);
}
}
// Consumer
@Component
public class OrderConsumer {
@KafkaListener(topics = "order-events", groupId = "order-group")
public void consume(OrderEvent event) {
switch (event.getStatus()) {
case CREATED:
handleOrderCreated(event);
break;
case PAID:
handleOrderPaid(event);
break;
case SHIPPED:
handleOrderShipped(event);
break;
}
}
}
七、性能优化
7.1 Producer 优化
| 参数 | 推荐值 | 说明 |
|---|---|---|
batch.size | 65536 | 批量大小 |
linger.ms | 10 | 等待时间 |
compression.type | lz4 | 压缩类型 |
buffer.memory | 33554432 | 缓冲区大小 |
7.2 Consumer 优化
| 参数 | 推荐值 | 说明 |
|---|---|---|
fetch.min.bytes | 1 | 最小拉取字节 |
fetch.max.wait.ms | 500 | 最大等待时间 |
max.poll.records | 500 | 每次拉取记录数 |
session.timeout.ms | 30000 | 会话超时时间 |
7.3 Broker 优化
# 网络线程
num.network.threads=8
# IO 线程
num.io.threads=16
# Socket 缓冲区
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# 日志刷新
log.flush.interval.messages=10000
log.flush.interval.ms=1000
八、监控与运维
8.1 关键指标
| 指标 | 说明 | 告警阈值 |
|---|---|---|
| Under Replicated Partitions | 未同步副本数 | > 0 |
| Offline Partitions Count | 离线分区数 | > 0 |
| Active Controller Count | Active Controller 数 | != 1 |
| Request Handler Avg Idle Percent | 请求处理空闲率 | < 30% |
| Network Handler Avg Idle Percent | 网络处理空闲率 | < 30% |
8.2 常用命令
# 查看 Topic 列表
kafka-topics.sh --bootstrap-server localhost:9092 --list
# 创建 Topic
kafka-topics.sh --create --topic test \
--partitions 3 --replication-factor 2 \
--bootstrap-server localhost:9092
# 查看 Consumer Group
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# 查看消费详情
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--describe --group test-group
# 重置 Offset
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group test-group --reset-offsets --to-latest --execute
九、总结
Kafka 核心要点
| 概念 | 要点 |
|---|---|
| Topic | 消息逻辑分类,可分多个 Partition |
| Partition | 物理分片,保证有序性 |
| Replica | 数据副本,保证高可用 |
| Producer | 消息发送,支持幂等和事务 |
| Consumer | 消息消费,支持消费者组 |
| Broker | 存储服务,Leader/Follower 角色 |
参考资料
- Kafka 官方文档
- 《Kafka 权威指南》- Neha Narkhede
- 《Kafka 源码剖析》- 张天旭