Skip to content
清晨的一缕阳光
返回

Kafka Controller 控制器详解与高可用

Kafka Controller 是 Kafka 集群的管理者,负责分区 Leader 选举、元数据管理等核心任务。本文将深入探讨 Controller 的实现原理和高可用机制。

一、Controller 基础

1.1 什么是 Controller?

定义

Controller = Kafka 集群的管理者

职责:
- 分区 Leader 选举
- 元数据管理
- 分区重分配
- Broker 状态监控

1.2 Controller 演进

ZooKeeper 模式(Kafka 2.x)

graph TB
    subgraph Kafka 集群
        B1[Broker 1]
        B2[Broker 2]
        B3[Broker 3]
    end
    
    subgraph ZooKeeper
        ZK[ZK 集群]
    end
    
    B1 -->|注册 | ZK
    B2 -->|注册 | ZK
    B3 -->|注册 | ZK
    
    ZK -->|选举 | B1
    B1 -.->|Controller | B2
    B1 -.->|Controller | B3

KRaft 模式(Kafka 3.x+)

graph TB
    subgraph Kafka 集群
        B1[Broker 1<br/>Controller]
        B2[Broker 2<br/>Controller]
        B3[Broker 3<br/>Controller]
    end
    
    B1 -.->|Raft | B2
    B2 -.->|Raft | B3
    B3 -.->|Raft | B1
    
    B1 -->|管理 | B1
    B1 -->|管理 | B2
    B1 -->|管理 | B3

1.3 版本对比

特性ZK 模式KRaft 模式
依赖ZooKeeper内置 Raft
选举ZK WatchRaft 选举
性能
扩展性有限优秀
推荐

二、Controller 选举

2.1 ZK 模式选举

选举流程

sequenceDiagram
    participant B1 as Broker 1
    participant B2 as Broker 2
    participant ZK as ZooKeeper
    
    B1->>ZK: 创建 /controller 节点
    ZK-->>B1: 创建成功(成为 Controller)
    
    B2->>ZK: 创建 /controller 节点
    ZK-->>B2: 创建失败(已存在)
    
    B2->>ZK: 注册 Watch
    
    B1 --> B1: 运行中
    
    B1 --> B1: 宕机
    ZK->>B2: 触发 Watch
    
    B2->>ZK: 创建 /controller 节点
    ZK-->>B2: 创建成功(成为新 Controller)

实现代码

// KafkaController 选举逻辑
public class KafkaController {
    
    private final ZooKeeper zk;
    private final int brokerId;
    
    public void startup() {
        // 1. 注册 Broker
        registerBroker();
        
        // 2. 尝试成为 Controller
        electController();
        
        // 3. 注册 Watch
        registerWatch();
    }
    
    private void electController() {
        try {
            // 创建临时顺序节点
            String path = "/controller";
            String data = brokerId + ":" + timestamp;
            
            zk.create(path, data.getBytes(), 
                Ids.OPEN_ACL_UNSAFE, 
                CreateMode.EPHEMERAL);
            
            // 创建成功,成为 Controller
            becomeController();
            
        } catch (KeeperException.NodeExistsException e) {
            // 节点已存在,不是 Controller
            waitForControllerChange();
        }
    }
    
    private void waitForControllerChange() {
        // 注册 Watch
        Stat stat = zk.exists("/controller", true);
        
        // 当 Controller 变化时,重新选举
    }
}

2.2 KRaft 模式选举

选举流程

sequenceDiagram
    participant B1 as Broker 1
    participant B2 as Broker 2
    participant B3 as Broker 3
    
    B1->>B1: 启动,成为 Follower
    B2->>B2: 启动,成为 Follower
    B3->>B3: 启动,成为 Follower
    
    B1->>B2: Request Vote
    B1->>B3: Request Vote
    B2-->>B1: Vote Yes
    B3-->>B1: Vote Yes
    
    B1->>B1: 成为 Leader(Controller)
    B1->>B2: 心跳
    B1->>B3: 心跳

三、Controller 职责

3.1 分区管理

分区状态机

stateDiagram-v2
    [*] --> NewPartition
    NewPartition --> OfflinePartition
    OfflinePartition --> OnlinePartition
    OnlinePartition --> OfflinePartition
    OnlinePartition --> NonExistentPartition
    NonExistentPartition --> [*]

分区重分配

public class PartitionReassigner {
    
    /**
     * 执行分区重分配
     */
    public void reassignPartitions(ReassignmentPlan plan) {
        // 1. 验证重分配计划
        validatePlan(plan);
        
        // 2. 更新分区 ISR
        updateIsr(plan);
        
        // 3. 更新 Leader
        updateLeader(plan);
        
        // 4. 监控重分配进度
        monitorProgress(plan);
    }
    
    /**
     * 更新分区 Leader
     */
    private void updateLeader(PartitionInfo partition) {
        // 1. 选择新 Leader
        Replica newLeader = selectLeader(partition);
        
        // 2. 发送 LeaderAndIsr 请求
        sendLeaderAndIsrRequest(partition, newLeader);
        
        // 3. 更新元数据
        updateMetadata(partition, newLeader);
    }
}

3.2 Broker 管理

Broker 注册

public class BrokerManager {
    
    private final Map<Integer, BrokerInfo> brokers = new ConcurrentHashMap<>();
    
    /**
     * Broker 注册
     */
    public void registerBroker(int brokerId, BrokerInfo info) {
        brokers.put(brokerId, info);
        
        // 通知所有 Broker
        broadcastBrokerRegistration(brokerId, info);
        
        log.info("Broker {} 注册成功", brokerId);
    }
    
    /**
     * Broker 下线
     */
    public void unregisterBroker(int brokerId) {
        brokers.remove(brokerId);
        
        // 触发分区重分配
        reassignPartitionsForBroker(brokerId);
        
        log.info("Broker {} 下线", brokerId);
    }
}

3.3 元数据管理

元数据结构

Metadata Cache
├── Topics
│   ├── topic-1
│   │   ├── Partition 0: Leader=1, ISR=[1,2,3]
│   │   └── Partition 1: Leader=2, ISR=[2,3,1]
│   └── topic-2
│       └── ...
├── Brokers
│   ├── Broker 1: host:port
│   ├── Broker 2: host:port
│   └── Broker 3: host:port
└── Configs
    └── ...

元数据更新

public class MetadataCache {
    
    private final Map<String, TopicMetadata> topics = new ConcurrentHashMap<>();
    
    /**
     * 更新 Topic 元数据
     */
    public void updateTopicMetadata(String topic, TopicMetadata metadata) {
        topics.put(topic, metadata);
        
        // 通知所有 Broker
        broadcastMetadataUpdate(topic, metadata);
    }
    
    /**
     * 获取 Topic 元数据
     */
    public TopicMetadata getTopicMetadata(String topic) {
        return topics.get(topic);
    }
}

四、故障恢复

4.1 Controller 故障

故障检测

public class ControllerFaultDetector {
    
    private final ZooKeeper zk;
    
    /**
     * 检测 Controller 故障
     */
    public void detectFault() {
        // 注册 Watch
        zk.exists("/controller", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                if (event.getType() == Event.EventType.NodeDeleted) {
                    // Controller 节点删除,触发选举
                    triggerElection();
                }
            }
        });
    }
    
    private void triggerElection() {
        // 重新选举 Controller
        electNewController();
    }
}

故障恢复

graph TB
    A[Controller 宕机] --> B[ZooKeeper 检测]
    B --> C[触发 Watch]
    C --> D[Broker 选举]
    D --> E[新 Controller 上线]
    E --> F[恢复分区管理]
    F --> G[集群恢复正常]

4.2 分区故障

分区恢复

public class PartitionRecovery {
    
    /**
     * 恢复离线分区
     */
    public void recoverOfflinePartitions() {
        // 1. 查找离线分区
        List<PartitionInfo> offlinePartitions = findOfflinePartitions();
        
        // 2. 为每个分区选举新 Leader
        for (PartitionInfo partition : offlinePartitions) {
            Replica newLeader = electNewLeader(partition);
            
            // 3. 更新 ISR
            updateIsr(partition, newLeader);
            
            // 4. 通知 Broker
            notifyBrokers(partition, newLeader);
        }
    }
    
    /**
     * 选举新 Leader
     */
    private Replica electNewLeader(PartitionInfo partition) {
        // 从 ISR 中选择第一个副本作为 Leader
        List<Replica> isr = partition.getISR();
        if (isr.isEmpty()) {
            throw new NoISRException("No ISR available");
        }
        return isr.get(0);
    }
}

五、KRaft 模式

5.1 KRaft 架构

graph TB
    subgraph Controller Quorum
        C1[Controller 1<br/>Leader]
        C2[Controller 2<br/>Follower]
        C3[Controller 3<br/>Follower]
    end
    
    subgraph Broker Pool
        B1[Broker 1]
        B2[Broker 2]
        B3[Broker 3]
    end
    
    C1 -.->|Raft | C2
    C1 -.->|Raft | C3
    
    C1 -->|管理 | B1
    C1 -->|管理 | B2
    C1 -->|管理 | B3

5.2 KRaft 配置

# server.properties

# KRaft 模式配置
process.roles=broker,controller  # 同时担任 Broker 和 Controller

# 节点 ID
node.id=1

# Controller Quorum
controller.quorum.voters=1@controller-1:9093,2@controller-2:9093,3@controller-3:9093

# Controller 监听
controller.listener.names=CONTROLLER

# 监听器
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
advertised.listeners=PLAINTEXT://broker-1:9092
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT

# 元数据日志
metadata.log.dir=/var/kafka/metadata

5.3 KRaft 部署

#!/bin/bash
# KRaft 集群部署脚本

# 1. 生成 Cluster ID
CLUSTER_ID=$(kafka-storage.sh random-uuid)
echo "Cluster ID: $CLUSTER_ID"

# 2. 格式化存储
kafka-storage.sh format -t $CLUSTER_ID -c /etc/kafka/kraft/server.properties

# 3. 启动 Broker
kafka-server-start.sh /etc/kafka/kraft/server.properties

# 4. 验证集群
kafka-broker-api-versions.sh --bootstrap-server localhost:9092

六、监控运维

6.1 监控指标

指标说明告警阈值
kafka_controller_KafkaController_ActiveControllerCountController 数量!= 1
kafka_controller_KafkaController_OfflinePartitionsCount离线分区数> 0
kafka_controller_KafkaController_PreferredReplicaImbalanceCount分区不平衡数> 100
kafka_controller_KafkaController_GlobalTopicCountTopic 总数-
kafka_controller_KafkaController_GlobalPartitionCount分区总数-

6.2 运维命令

# 查看 Controller
kafka-get-offsets.sh --broker-list localhost:9092 \
  --topic __consumer_offsets | grep -i controller

# 触发分区重平衡
kafka-preferred-replica-election.sh --bootstrap-server localhost:9092

# 查看分区状态
kafka-topics.sh --bootstrap-server localhost:9092 \
  --describe --topic my-topic

# 手动触发 Leader 选举
kafka-leader-election.sh --bootstrap-server localhost:9092 \
  --election-type preferred --all-topic-partitions

6.3 故障排查

Controller 频繁切换

# 1. 检查 ZooKeeper
echo stat | nc zk1:2181

# 2. 检查网络
ping zk1

# 3. 检查 Broker 日志
tail -f /var/log/kafka/controller.log | grep -i "state change"

# 4. 检查 GC
jstat -gcutil <pid> 1000

分区不平衡

# 1. 查看分区分布
kafka-topics.sh --bootstrap-server localhost:9092 \
  --describe --topic my-topic

# 2. 执行重平衡
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
  --reassignment-json-file reassign.json --execute

# 3. 验证重平衡
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
  --reassignment-json-file reassign.json --verify

七、最佳实践

7.1 Controller 配置

# Controller 配置建议

# ZK 模式
zookeeper.connection.timeout.ms=18000
zookeeper.session.timeout.ms=18000

# KRaft 模式
controller.quorum.election.timeout.ms=1000
controller.quorum.fetch.timeout.ms=2000
controller.quorum.request.timeout.ms=3000

7.2 高可用部署

推荐配置:
- Controller 数量:3 或 5(奇数)
- 跨可用区部署
- 独立 Controller 节点(大规模集群)
- 监控告警完善

7.3 性能优化

# 批量处理
controlled.shutdown.max.retries=3
controlled.shutdown.retry.backoff.ms=5000

# 元数据缓存
metadata.cache.size=10000
metadata.cache.expiry.ms=300000

总结

Kafka Controller 的核心要点:

  1. 选举机制:ZK 模式、KRaft 模式
  2. 核心职责:分区管理、Broker 管理、元数据管理
  3. 故障恢复:Controller 故障、分区故障
  4. KRaft 模式:架构、配置、部署
  5. 监控运维:指标、命令、故障排查

核心要点

参考资料


分享这篇文章到:

上一篇文章
Kafka Connect 高级应用与自定义开发
下一篇文章
RocketMQ 事务消息详解与实战