Kafka Controller 是 Kafka 集群的管理者,负责分区 Leader 选举、元数据管理等核心任务。本文将深入探讨 Controller 的实现原理和高可用机制。
一、Controller 基础
1.1 什么是 Controller?
定义:
Controller = Kafka 集群的管理者
职责:
- 分区 Leader 选举
- 元数据管理
- 分区重分配
- Broker 状态监控
1.2 Controller 演进
ZooKeeper 模式(Kafka 2.x):
graph TB
subgraph Kafka 集群
B1[Broker 1]
B2[Broker 2]
B3[Broker 3]
end
subgraph ZooKeeper
ZK[ZK 集群]
end
B1 -->|注册 | ZK
B2 -->|注册 | ZK
B3 -->|注册 | ZK
ZK -->|选举 | B1
B1 -.->|Controller | B2
B1 -.->|Controller | B3
KRaft 模式(Kafka 3.x+):
graph TB
subgraph Kafka 集群
B1[Broker 1<br/>Controller]
B2[Broker 2<br/>Controller]
B3[Broker 3<br/>Controller]
end
B1 -.->|Raft | B2
B2 -.->|Raft | B3
B3 -.->|Raft | B1
B1 -->|管理 | B1
B1 -->|管理 | B2
B1 -->|管理 | B3
1.3 版本对比
| 特性 | ZK 模式 | KRaft 模式 |
|---|---|---|
| 依赖 | ZooKeeper | 内置 Raft |
| 选举 | ZK Watch | Raft 选举 |
| 性能 | 中 | 高 |
| 扩展性 | 有限 | 优秀 |
| 推荐 | ❌ | ✅ |
二、Controller 选举
2.1 ZK 模式选举
选举流程:
sequenceDiagram
participant B1 as Broker 1
participant B2 as Broker 2
participant ZK as ZooKeeper
B1->>ZK: 创建 /controller 节点
ZK-->>B1: 创建成功(成为 Controller)
B2->>ZK: 创建 /controller 节点
ZK-->>B2: 创建失败(已存在)
B2->>ZK: 注册 Watch
B1 --> B1: 运行中
B1 --> B1: 宕机
ZK->>B2: 触发 Watch
B2->>ZK: 创建 /controller 节点
ZK-->>B2: 创建成功(成为新 Controller)
实现代码:
// KafkaController 选举逻辑
public class KafkaController {
private final ZooKeeper zk;
private final int brokerId;
public void startup() {
// 1. 注册 Broker
registerBroker();
// 2. 尝试成为 Controller
electController();
// 3. 注册 Watch
registerWatch();
}
private void electController() {
try {
// 创建临时顺序节点
String path = "/controller";
String data = brokerId + ":" + timestamp;
zk.create(path, data.getBytes(),
Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
// 创建成功,成为 Controller
becomeController();
} catch (KeeperException.NodeExistsException e) {
// 节点已存在,不是 Controller
waitForControllerChange();
}
}
private void waitForControllerChange() {
// 注册 Watch
Stat stat = zk.exists("/controller", true);
// 当 Controller 变化时,重新选举
}
}
2.2 KRaft 模式选举
选举流程:
sequenceDiagram
participant B1 as Broker 1
participant B2 as Broker 2
participant B3 as Broker 3
B1->>B1: 启动,成为 Follower
B2->>B2: 启动,成为 Follower
B3->>B3: 启动,成为 Follower
B1->>B2: Request Vote
B1->>B3: Request Vote
B2-->>B1: Vote Yes
B3-->>B1: Vote Yes
B1->>B1: 成为 Leader(Controller)
B1->>B2: 心跳
B1->>B3: 心跳
三、Controller 职责
3.1 分区管理
分区状态机:
stateDiagram-v2
[*] --> NewPartition
NewPartition --> OfflinePartition
OfflinePartition --> OnlinePartition
OnlinePartition --> OfflinePartition
OnlinePartition --> NonExistentPartition
NonExistentPartition --> [*]
分区重分配:
public class PartitionReassigner {
/**
* 执行分区重分配
*/
public void reassignPartitions(ReassignmentPlan plan) {
// 1. 验证重分配计划
validatePlan(plan);
// 2. 更新分区 ISR
updateIsr(plan);
// 3. 更新 Leader
updateLeader(plan);
// 4. 监控重分配进度
monitorProgress(plan);
}
/**
* 更新分区 Leader
*/
private void updateLeader(PartitionInfo partition) {
// 1. 选择新 Leader
Replica newLeader = selectLeader(partition);
// 2. 发送 LeaderAndIsr 请求
sendLeaderAndIsrRequest(partition, newLeader);
// 3. 更新元数据
updateMetadata(partition, newLeader);
}
}
3.2 Broker 管理
Broker 注册:
public class BrokerManager {
private final Map<Integer, BrokerInfo> brokers = new ConcurrentHashMap<>();
/**
* Broker 注册
*/
public void registerBroker(int brokerId, BrokerInfo info) {
brokers.put(brokerId, info);
// 通知所有 Broker
broadcastBrokerRegistration(brokerId, info);
log.info("Broker {} 注册成功", brokerId);
}
/**
* Broker 下线
*/
public void unregisterBroker(int brokerId) {
brokers.remove(brokerId);
// 触发分区重分配
reassignPartitionsForBroker(brokerId);
log.info("Broker {} 下线", brokerId);
}
}
3.3 元数据管理
元数据结构:
Metadata Cache
├── Topics
│ ├── topic-1
│ │ ├── Partition 0: Leader=1, ISR=[1,2,3]
│ │ └── Partition 1: Leader=2, ISR=[2,3,1]
│ └── topic-2
│ └── ...
├── Brokers
│ ├── Broker 1: host:port
│ ├── Broker 2: host:port
│ └── Broker 3: host:port
└── Configs
└── ...
元数据更新:
public class MetadataCache {
private final Map<String, TopicMetadata> topics = new ConcurrentHashMap<>();
/**
* 更新 Topic 元数据
*/
public void updateTopicMetadata(String topic, TopicMetadata metadata) {
topics.put(topic, metadata);
// 通知所有 Broker
broadcastMetadataUpdate(topic, metadata);
}
/**
* 获取 Topic 元数据
*/
public TopicMetadata getTopicMetadata(String topic) {
return topics.get(topic);
}
}
四、故障恢复
4.1 Controller 故障
故障检测:
public class ControllerFaultDetector {
private final ZooKeeper zk;
/**
* 检测 Controller 故障
*/
public void detectFault() {
// 注册 Watch
zk.exists("/controller", new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeDeleted) {
// Controller 节点删除,触发选举
triggerElection();
}
}
});
}
private void triggerElection() {
// 重新选举 Controller
electNewController();
}
}
故障恢复:
graph TB
A[Controller 宕机] --> B[ZooKeeper 检测]
B --> C[触发 Watch]
C --> D[Broker 选举]
D --> E[新 Controller 上线]
E --> F[恢复分区管理]
F --> G[集群恢复正常]
4.2 分区故障
分区恢复:
public class PartitionRecovery {
/**
* 恢复离线分区
*/
public void recoverOfflinePartitions() {
// 1. 查找离线分区
List<PartitionInfo> offlinePartitions = findOfflinePartitions();
// 2. 为每个分区选举新 Leader
for (PartitionInfo partition : offlinePartitions) {
Replica newLeader = electNewLeader(partition);
// 3. 更新 ISR
updateIsr(partition, newLeader);
// 4. 通知 Broker
notifyBrokers(partition, newLeader);
}
}
/**
* 选举新 Leader
*/
private Replica electNewLeader(PartitionInfo partition) {
// 从 ISR 中选择第一个副本作为 Leader
List<Replica> isr = partition.getISR();
if (isr.isEmpty()) {
throw new NoISRException("No ISR available");
}
return isr.get(0);
}
}
五、KRaft 模式
5.1 KRaft 架构
graph TB
subgraph Controller Quorum
C1[Controller 1<br/>Leader]
C2[Controller 2<br/>Follower]
C3[Controller 3<br/>Follower]
end
subgraph Broker Pool
B1[Broker 1]
B2[Broker 2]
B3[Broker 3]
end
C1 -.->|Raft | C2
C1 -.->|Raft | C3
C1 -->|管理 | B1
C1 -->|管理 | B2
C1 -->|管理 | B3
5.2 KRaft 配置
# server.properties
# KRaft 模式配置
process.roles=broker,controller # 同时担任 Broker 和 Controller
# 节点 ID
node.id=1
# Controller Quorum
controller.quorum.voters=1@controller-1:9093,2@controller-2:9093,3@controller-3:9093
# Controller 监听
controller.listener.names=CONTROLLER
# 监听器
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
advertised.listeners=PLAINTEXT://broker-1:9092
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
# 元数据日志
metadata.log.dir=/var/kafka/metadata
5.3 KRaft 部署
#!/bin/bash
# KRaft 集群部署脚本
# 1. 生成 Cluster ID
CLUSTER_ID=$(kafka-storage.sh random-uuid)
echo "Cluster ID: $CLUSTER_ID"
# 2. 格式化存储
kafka-storage.sh format -t $CLUSTER_ID -c /etc/kafka/kraft/server.properties
# 3. 启动 Broker
kafka-server-start.sh /etc/kafka/kraft/server.properties
# 4. 验证集群
kafka-broker-api-versions.sh --bootstrap-server localhost:9092
六、监控运维
6.1 监控指标
| 指标 | 说明 | 告警阈值 |
|---|---|---|
kafka_controller_KafkaController_ActiveControllerCount | Controller 数量 | != 1 |
kafka_controller_KafkaController_OfflinePartitionsCount | 离线分区数 | > 0 |
kafka_controller_KafkaController_PreferredReplicaImbalanceCount | 分区不平衡数 | > 100 |
kafka_controller_KafkaController_GlobalTopicCount | Topic 总数 | - |
kafka_controller_KafkaController_GlobalPartitionCount | 分区总数 | - |
6.2 运维命令
# 查看 Controller
kafka-get-offsets.sh --broker-list localhost:9092 \
--topic __consumer_offsets | grep -i controller
# 触发分区重平衡
kafka-preferred-replica-election.sh --bootstrap-server localhost:9092
# 查看分区状态
kafka-topics.sh --bootstrap-server localhost:9092 \
--describe --topic my-topic
# 手动触发 Leader 选举
kafka-leader-election.sh --bootstrap-server localhost:9092 \
--election-type preferred --all-topic-partitions
6.3 故障排查
Controller 频繁切换:
# 1. 检查 ZooKeeper
echo stat | nc zk1:2181
# 2. 检查网络
ping zk1
# 3. 检查 Broker 日志
tail -f /var/log/kafka/controller.log | grep -i "state change"
# 4. 检查 GC
jstat -gcutil <pid> 1000
分区不平衡:
# 1. 查看分区分布
kafka-topics.sh --bootstrap-server localhost:9092 \
--describe --topic my-topic
# 2. 执行重平衡
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
--reassignment-json-file reassign.json --execute
# 3. 验证重平衡
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
--reassignment-json-file reassign.json --verify
七、最佳实践
7.1 Controller 配置
# Controller 配置建议
# ZK 模式
zookeeper.connection.timeout.ms=18000
zookeeper.session.timeout.ms=18000
# KRaft 模式
controller.quorum.election.timeout.ms=1000
controller.quorum.fetch.timeout.ms=2000
controller.quorum.request.timeout.ms=3000
7.2 高可用部署
推荐配置:
- Controller 数量:3 或 5(奇数)
- 跨可用区部署
- 独立 Controller 节点(大规模集群)
- 监控告警完善
7.3 性能优化
# 批量处理
controlled.shutdown.max.retries=3
controlled.shutdown.retry.backoff.ms=5000
# 元数据缓存
metadata.cache.size=10000
metadata.cache.expiry.ms=300000
总结
Kafka Controller 的核心要点:
- 选举机制:ZK 模式、KRaft 模式
- 核心职责:分区管理、Broker 管理、元数据管理
- 故障恢复:Controller 故障、分区故障
- KRaft 模式:架构、配置、部署
- 监控运维:指标、命令、故障排查
核心要点:
- 理解 Controller 选举原理
- 掌握分区管理和恢复
- 优先使用 KRaft 模式
- 建立完善的监控体系
参考资料
- Kafka Controller 官方文档
- KIP-500: KRaft
- 《Kafka 权威指南》第 4 章