Kafka 的高可用架构是其作为企业级消息队列的核心竞争力。本文将深入探讨 Kafka 的副本机制、Leader 选举、多机房部署等高可用方案。
一、高可用架构概览
1.1 整体架构
graph TB
subgraph 数据中心 1
subgraph Kafka 集群 1
B1[Broker 1<br/>Leader]
B2[Broker 2<br/>Follower]
B3[Broker 3<br/>Follower]
end
ZK1[ZooKeeper 集群]
end
subgraph 数据中心 2
subgraph Kafka 集群 2
B4[Broker 4<br/>Follower]
B5[Broker 5<br/>Follower]
end
ZK2[ZooKeeper 集群]
end
B1 -.->|副本复制 | B4
B2 -.->|副本复制 | B5
1.2 高可用特性
| 特性 | 说明 | 实现方式 |
|---|---|---|
| 副本冗余 | 数据多副本存储 | Replication Factor |
| 故障转移 | Broker 故障自动切换 | Leader Election |
| 数据同步 | 副本间数据一致 | ISR 机制 |
| 多机房 | 跨机房容灾 | Rack Awareness |
二、副本机制
2.1 副本类型
graph TB
subgraph Partition
L[Leader Replica]
F1[Follower Replica 1]
F2[Follower Replica 2]
end
L -->|读写 | P[Producer/Consumer]
F1 -->|同步 | L
F2 -->|同步 | L
style L fill:#9f9,stroke:#333
style F1 fill:#ff9,stroke:#333
style F2 fill:#ff9,stroke:#333
| 副本类型 | 说明 | 读写权限 |
|---|---|---|
| Leader | 主副本 | 可读写 |
| Follower | 从副本 | 只读(同步用) |
2.2 ISR(In-Sync Replicas)
sequenceDiagram
participant L as Leader
participant F1 as Follower 1
participant F2 as Follower 2
participant P as Producer
P->>L: 写入消息
L->>F1: 复制消息
L->>F2: 复制消息
F1-->>L: ACK
F2-->>L: ACK
L-->>P: 返回成功
note over L,F2: 所有 ISR 完成复制
ISR 动态调整:
ISR = {Leader} ∪ {Follower | lag < max.lag.time}
2.3 副本配置
# Broker 配置
default.replication.factor=3 # 默认副本数
min.insync.replicas=2 # 最小 ISR 数量
unclean.leader.election.enable=false # 禁止非 ISR 选举
replica.lag.time.max.ms=30000 # 副本最大延迟
2.4 HW(High Watermark)
graph LR
subgraph Leader
L1[消息 1 ✓]
L2[消息 2 ✓]
L3[消息 3 ✓]
L4[消息 4 ⏳]
end
subgraph Follower 1
F1[消息 1 ✓]
F2[消息 2 ✓]
F3[消息 3 ✓]
F4[消息 4 ⏳]
end
subgraph Follower 2
FF1[消息 1 ✓]
FF2[消息 2 ✓]
FF3[消息 3 ⏳]
end
HW[HW=3<br/>消费者可见]
L3 -.-> HW
HW 作用:
- 标记已同步的消息偏移量
- 消费者只能看到 HW 之前的消息
- 保证故障后数据不丢失
三、Leader 选举
3.1 选举触发条件
| 条件 | 说明 | 示例 |
|---|---|---|
| Leader 故障 | Broker 宕机 | 网络分区、硬件故障 |
| Leader 离线 | Broker 主动关闭 | 维护、升级 |
| ISR 变化 | 当前 Leader 离开 ISR | 同步延迟 |
3.2 选举流程
sequenceDiagram
participant ZK as ZooKeeper
participant C as Controller
participant B1 as Broker 1
participant B2 as Broker 2
B1->>ZK: 心跳停止
ZK->>C: 通知 Broker 下线
C->>C: 选择新 Leader
C->>B2: 通知成为 Leader
B2->>C: 确认
C->>ZK: 更新元数据
3.3 选举策略
优先选择 ISR 内副本:
// Controller 选举逻辑
public void electLeader(TopicPartition tp) {
List<Replica> isr = partition.getISR();
if (isr.isEmpty()) {
if (uncleanLeaderElectionEnabled) {
// 从非 ISR 选举(可能丢失数据)
electUncleanLeader(tp);
} else {
// 等待 ISR 恢复
log.warn("No ISR available, waiting...");
}
} else {
// 选择 ISR 中第一个副本作为 Leader
Replica newLeader = isr.get(0);
partition.setLeader(newLeader);
}
}
3.4 选举配置
# 禁止非 ISR 选举(推荐)
unclean.leader.election.enable=false
# Controller 配置
controller.socket.timeout.ms=30000
controller.message.max.bytes=10485760
四、多机房部署
4.1 Rack Awareness
graph TB
subgraph Rack A
B1[Broker 1<br/>Leader]
B2[Broker 2<br/>Follower]
end
subgraph Rack B
B3[Broker 3<br/>Follower]
B4[Broker 4<br/>Follower]
end
subgraph Rack C
B5[Broker 5<br/>Follower]
B6[Broker 6<br/>Follower]
end
B1 -.-> B3
B1 -.-> B5
4.2 配置方式
# Broker 配置
broker.rack=rack-1 # 指定机架
# Topic 配置
# 创建 Topic 时指定副本分配
kafka-topics.sh --create \
--topic my-topic \
--partitions 3 \
--replication-factor 3 \
--config min.insync.replicas=2
4.3 副本分配策略
// Rack 感知副本分配
public class RackAwareReplicaAssigner {
public Map<Integer, List<Integer>> assignReplicas(
int numBrokers,
List<String> racks) {
Map<Integer, List<Integer>> assignment = new HashMap<>();
for (int partition = 0; partition < numPartitions; partition++) {
List<Integer> replicas = new ArrayList<>();
// 选择不同 Rack 的 Broker
Set<String> selectedRacks = new HashSet<>();
for (int broker : brokers) {
String rack = racks.get(broker);
if (!selectedRacks.contains(rack)) {
replicas.add(broker);
selectedRacks.add(rack);
}
}
assignment.put(partition, replicas);
}
return assignment;
}
}
五、多活架构
5.1 MirrorMaker 2.0
graph TB
subgraph 数据中心 1
K1[Kafka 集群 1]
MM1[MirrorMaker 2]
end
subgraph 数据中心 2
K2[Kafka 集群 2]
MM2[MirrorMaker 2]
end
K1 -->|复制 | MM1
MM1 -->|同步 | K2
K2 -->|复制 | MM2
MM2 -->|同步 | K1
5.2 配置示例
# clusters.properties
clusters = dc1, dc2
dc1.bootstrap.servers = dc1-kafka:9092
dc2.bootstrap.servers = dc2-kafka:9092
# connector 配置
connectors = dc1-to-dc2, dc2-to-dc1
dc1-to-dc2.source.cluster.alias = dc1
dc1-to-dc2.target.cluster.alias = dc2
dc1-to-dc2.topics = .*
dc2-to-dc1.source.cluster.alias = dc2
dc2-to-dc1.target.cluster.alias = dc1
dc2-to-dc1.topics = .*
5.3 故障切换
sequenceDiagram
participant App as 应用程序
participant DNS as DNS
participant DC1 as 数据中心 1
participant DC2 as 数据中心 2
App->>DNS: 请求 Kafka 地址
DNS-->>App: dc1.kafka.com
DC1 --> DC1: 故障
App->>DC1: 连接失败
App->>DNS: 重新解析
DNS-->>App: dc2.kafka.com
App->>DC2: 连接成功
六、监控与告警
6.1 关键指标
| 指标 | 说明 | 告警阈值 |
|---|---|---|
UnderReplicatedPartitions | 未同步副本数 | > 0 |
OfflinePartitionsCount | 离线分区数 | > 0 |
ActiveControllerCount | Controller 数量 | != 1 |
IsrShrinkRate | ISR 收缩速率 | > 1/分钟 |
IsrExpandRate | ISR 扩展速率 | > 1/分钟 |
6.2 监控配置
# Prometheus 配置
scrape_configs:
- job_name: 'kafka'
static_configs:
- targets: ['kafka-1:9090', 'kafka-2:9090']
metrics_path: '/metrics'
# 告警规则
groups:
- name: kafka
rules:
- alert: UnderReplicatedPartitions
expr: kafka_server_ReplicaManager_UnderReplicatedPartitions > 0
for: 5m
labels:
severity: warning
annotations:
summary: "Kafka 有未同步副本"
6.3 健康检查
#!/bin/bash
# Kafka 集群健康检查脚本
# 检查 Controller
controller=$(kafka-metadata.sh --snapshot /tmp/kafka-logs/__cluster_metadata-0/00000000000000000000.log | grep Leader)
if [ -z "$controller" ]; then
echo "CRITICAL: No Controller"
exit 2
fi
# 检查未同步副本
under_replicated=$(kafka-topics.sh --describe | grep -c "Isr:")
if [ "$under_replicated" -gt 0 ]; then
echo "WARNING: $under_replicated 未同步副本"
exit 1
fi
echo "OK: 集群健康"
exit 0
七、容灾演练
7.1 演练场景
| 场景 | 预期结果 | 恢复时间 |
|---|---|---|
| Broker 宕机 | Leader 切换,数据不丢失 | < 30 秒 |
| 机房故障 | 流量切换到备用机房 | < 1 分钟 |
| ZooKeeper 故障 | Controller 重新选举 | < 1 分钟 |
7.2 演练步骤
# 1. 模拟 Broker 宕机
kill -9 $(ps -ef | grep kafka | grep -v grep | awk '{print $2}')
# 2. 观察 Leader 选举
kafka-topics.sh --describe --topic my-topic
# 3. 验证数据完整性
kafka-console-consumer.sh --topic my-topic --from-beginning
# 4. 恢复 Broker
./kafka-server-start.sh config/server.properties
八、最佳实践
8.1 副本配置建议
| 场景 | 副本数 | min.insync.replicas | unclean.leader.election |
|---|---|---|---|
| 开发环境 | 1 | 1 | true |
| 测试环境 | 2 | 1 | false |
| 生产环境 | 3 | 2 | false |
| 金融场景 | 5 | 3 | false |
8.2 部署建议
# 推荐部署架构
- 3 个 Broker(跨 3 个机架)
- 3 个 ZooKeeper(跨 3 个机架)
- 副本因子 = 3
- min.insync.replicas = 2
8.3 运维建议
- 定期演练:每季度进行一次故障演练
- 监控告警:配置关键指标告警
- 备份策略:定期备份元数据
- 升级方案:滚动升级,避免服务中断
总结
Kafka 高可用架构的核心机制:
- 副本机制:Leader/Follower、ISR、HW
- Leader 选举:Controller 负责、ISR 优先
- 多机房部署:Rack Awareness、MirrorMaker
- 监控告警:关键指标、健康检查
- 容灾演练:定期演练、快速恢复
核心要点:
- 理解 ISR 机制和 HW 的作用
- 禁止非 ISR 选举保证数据不丢失
- 跨机房部署提高容灾能力
- 监控关键指标及时发现异常
参考资料
- Kafka Replication 官方文档
- KIP-101: Improved Partition Assignment
- 《Kafka 权威指南》第 5 章