Skip to content
清晨的一缕阳光
返回

RocketMQ Controller 控制器详解与高可用

RocketMQ Controller 是 RocketMQ 5.0 引入的新组件,负责集群元数据管理和 Broker 选举。本文将深入探讨 Controller 的实现原理和高可用机制。

一、Controller 基础

1.1 为什么需要 Controller?

NameServer 模式问题

问题:
- NameServer 无状态,元数据不一致
- Broker 注册信息可能冲突
- 需要外部协调

Controller 模式优势

优势:
- 集中式元数据管理
- 保证元数据一致性
- 支持 Broker 自动选举
- 简化运维

1.2 架构演进

NameServer 模式

graph TB
    subgraph NameServer 集群
        NS1[NameServer 1]
        NS2[NameServer 2]
        NS3[NameServer 3]
    end
    
    subgraph Broker 集群
        B1[Broker 1 Master]
        B2[Broker 1 Slave]
    end
    
    B1 -->|注册 | NS1
    B1 -->|注册 | NS2
    B1 -->|注册 | NS3
    
    NS1 -.->|元数据可能不一致 | NS2

Controller 模式

graph TB
    subgraph Controller 集群
        C1[Controller 1<br/>Leader]
        C2[Controller 2<br/>Follower]
        C3[Controller 3<br/>Follower]
    end
    
    subgraph Broker 集群
        B1[Broker 1]
        B2[Broker 2]
    end
    
    C1 -.->|Raft | C2
    C1 -.->|Raft | C3
    
    B1 -->|注册 | C1
    B2 -->|注册 | C1
    
    C1 -->|元数据一致 | B1
    C1 -->|元数据一致 | B2

1.3 模式对比

特性NameServer 模式Controller 模式
元数据最终一致强一致
选举手动/半自动自动
依赖内置 Raft
运维复杂简单
推荐兼容旧版新版推荐

二、Controller 选举

2.1 Raft 选举

选举流程

sequenceDiagram
    participant C1 as Controller 1
    participant C2 as Controller 2
    participant C3 as Controller 3
    
    C1->>C1: 启动,成为 Follower
    C2->>C2: 启动,成为 Follower
    C3->>C3: 启动,成为 Follower
    
    C1->>C2: Request Vote
    C1->>C3: Request Vote
    C2-->>C1: Vote Yes
    C3-->>C1: Vote Yes
    
    C1->>C1: 成为 Leader
    C1->>C2: 心跳
    C1->>C3: 心跳

2.2 选举配置

# controller.conf

# Controller 配置
controllerDLegerGroup=DefaultController
controllerDLegerPeers=n0-localhost:9878;n1-localhost:9879;n2-localhost:9880
controllerDLegerSelfId=n0

# 选举超时时间
controllerElectionTimeoutMs=2000

# 心跳间隔
controllerHeartbeatIntervalMs=1000

# 扫描间隔
controllerScanIntervalMs=2000

2.3 Broker 注册

注册流程

sequenceDiagram
    participant B as Broker
    participant C as Controller
    
    B->>C: 注册请求
    C->>C: 验证 Broker
    C->>C: 更新元数据
    C-->>B: 注册成功
    
    loop 心跳
        B->>C: 心跳
        C-->>B: 心跳响应
    end

注册代码

public class BrokerRegister {
    
    private final ControllerManager controllerManager;
    
    public void registerBroker(BrokerIdentity brokerIdentity) {
        // 1. 构建注册请求
        RegisterBrokerRequest request = new RegisterBrokerRequest();
        request.setBrokerName(brokerIdentity.getBrokerName());
        request.setBrokerAddr(brokerIdentity.getBrokerAddr());
        request.setCluster(brokerIdentity.getClusterName());
        
        // 2. 发送到 Controller
        RegisterBrokerResponse response = 
            controllerManager.registerBroker(request);
        
        // 3. 处理响应
        if (response.isSuccess()) {
            log.info("Broker 注册成功:{}", brokerIdentity);
        } else {
            log.error("Broker 注册失败:{}", response.getError());
        }
    }
}

三、元数据管理

3.1 元数据结构

Metadata
├── ClusterInfo
│   ├── ClusterName
│   └── BrokerSet
├── BrokerSet
│   ├── BrokerName
│   ├── MasterBroker
│   └── SlaveBrokers
├── TopicConfig
│   ├── TopicName
│   ├── QueueConfig
│   └── Attributes
└── SubscriptionGroup
    ├── GroupName
    └── SubscriptionConfig

3.2 元数据同步

同步流程

graph TB
    subgraph Controller Leader
        CL[元数据]
    end
    
    subgraph Controller Follower
        CF1[元数据副本]
        CF2[元数据副本]
    end
    
    subgraph Broker
        B1[Broker 1]
        B2[Broker 2]
    end
    
    CL -->|Raft 复制 | CF1
    CL -->|Raft 复制 | CF2
    
    B1 -->|读取 | CL
    B2 -->|读取 | CL

3.3 元数据持久化

public class MetadataPersist {
    
    private final String metadataDir;
    
    /**
     * 持久化元数据
     */
    public void persistMetadata(Metadata metadata) {
        String filePath = metadataDir + "/metadata.json";
        
        try {
            // 1. 序列化元数据
            String json = JSON.toJSONString(metadata);
            
            // 2. 写入临时文件
            String tempFile = filePath + ".tmp";
            Files.write(Paths.get(tempFile), json.getBytes());
            
            // 3. 原子替换
            Files.move(Paths.get(tempFile), Paths.get(filePath), 
                StandardCopyOption.ATOMIC_MOVE);
            
            log.info("元数据持久化成功");
            
        } catch (IOException e) {
            log.error("元数据持久化失败", e);
        }
    }
    
    /**
     * 加载元数据
     */
    public Metadata loadMetadata() {
        String filePath = metadataDir + "/metadata.json";
        
        try {
            String json = new String(Files.readAllBytes(Paths.get(filePath)));
            return JSON.parseObject(json, Metadata.class);
        } catch (IOException e) {
            log.error("加载元数据失败", e);
            return new Metadata();
        }
    }
}

四、Broker 选举

4.1 主从选举

选举触发

graph TB
    A[Master 宕机] --> B[Controller 检测]
    B --> C{有 Slave?}
    C -->|是 | D[提升 Slave 为 Master]
    C -->|否 | E[等待新 Broker]
    D --> F[更新元数据]
    F --> G[通知 Producer/Consumer]

4.2 自动切换

配置

# Broker 配置
brokerRole=ASYNC_MASTER  # 或 SLAVE
enableAutoSwitch=true    # 启用自动切换

实现

public class BrokerAutoSwitch {
    
    private final ControllerManager controllerManager;
    
    /**
     * 检测 Master 故障
     */
    public void detectMasterFault() {
        scheduledExecutor.scheduleAtFixedRate(() -> {
            // 1. 检查 Master 心跳
            if (!checkMasterHeartbeat()) {
                // 2. 触发切换
                triggerSwitch();
            }
        }, 1000, 1000, TimeUnit.MILLISECONDS);
    }
    
    /**
     * 触发主从切换
     */
    private void triggerSwitch() {
        // 1. 选择新 Master
        BrokerIdentity newMaster = selectNewMaster();
        
        // 2. 更新元数据
        controllerManager.updateMaster(newMaster);
        
        // 3. 通知 Broker
        notifyBrokerChange(newMaster);
        
        log.info("主从切换完成:{}", newMaster);
    }
}

五、高可用部署

5.1 Controller 集群部署

#!/bin/bash
# Controller 集群部署脚本

# Controller 节点
CONTROLLERS=("controller-1" "controller-2" "controller-3")

# 部署每个 Controller
for i in "${!CONTROLLERS[@]}"; do
    controller=${CONTROLLERS[$i]}
    node_id="n$i"
    peers="n0-controller-1:9878;n1-controller-2:9879;n2-controller-3:9880"
    
    echo "部署 Controller: $controller (ID: $node_id)"
    
    # 创建配置文件
    cat > /etc/rocketmq/controller-$node_id.conf << EOF
controllerDLegerGroup=DefaultController
controllerDLegerPeers=$peers
controllerDLegerSelfId=$node_id
controllerStorePath=/data/rocketmq/controller
listenPort=9878
EOF
    
    # 启动 Controller
    ssh $controller "mqcontroller -c /etc/rocketmq/controller-$node_id.conf &"
done

echo "Controller 集群部署完成"

5.2 Broker 配置

# broker-a.properties

# 基础配置
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0

# Controller 模式
controllerMode=true
controllerAddr=controller-1:9878;controller-2:9879;controller-3:9880

# 存储配置
storePathRootDir=/data/rocketmq/store
storePathCommitLog=/data/rocketmq/store/commitlog

5.3 混合模式

NameServer + Controller

# 兼容旧客户端
enableNameServer=true
nameServerAddr=ns1:9876;ns2:9876

# Controller 模式
controllerMode=true
controllerAddr=controller-1:9878;controller-2:9879;controller-3:9880

六、监控运维

6.1 监控指标

指标说明告警阈值
controller_leaderController Leader!= 1
controller_broker_countBroker 数量< 预期
controller_metadata_size元数据大小> 100MB
controller_raft_termRaft Term-

6.2 运维命令

# 查看 Controller 状态
mqadmin controllerStatus -n ns1:9876

# 查看 Broker 注册信息
mqadmin brokerStatus -n ns1:9876 -b broker-1:10911

# 查看元数据
mqadmin metadata -n ns1:9876

# 触发主从切换
mqadmin switchMaster -n ns1:9876 -b broker-a

6.3 故障排查

Controller 无法选举

# 1. 检查网络
ping controller-1
ping controller-2
ping controller-3

# 2. 检查端口
netstat -tlnp | grep 9878

# 3. 检查日志
tail -f /var/log/rocketmq/controller.log | grep -i "election"

# 4. 检查 Raft 状态
mqadmin controllerStatus -n ns1:9876

Broker 注册失败

# 1. 检查 Controller 状态
mqadmin controllerStatus -n ns1:9876

# 2. 检查 Broker 配置
cat /etc/rocketmq/broker.conf | grep controller

# 3. 检查 Broker 日志
tail -f /var/log/rocketmq/broker.log | grep -i "register"

# 4. 重新注册
mqadmin updateBrokerConfig -n ns1:9876 -b broker-1:10911

七、最佳实践

7.1 部署建议

推荐配置:
- Controller 数量:3(奇数)
- 跨可用区部署
- 独立 Controller 节点
- 监控告警完善

7.2 配置优化

# Controller 配置优化

# 选举超时(默认 2000ms)
controllerElectionTimeoutMs=2000

# 心跳间隔(默认 1000ms)
controllerHeartbeatIntervalMs=1000

# 扫描间隔(默认 2000ms)
controllerScanIntervalMs=2000

# 元数据持久化间隔
metadataPersistIntervalMs=5000

7.3 迁移方案

NameServer 迁移到 Controller

#!/bin/bash
# 迁移脚本

# 1. 部署 Controller 集群
./deploy-controller.sh

# 2. 配置 Broker 使用 Controller
for broker in broker-1 broker-2 broker-3; do
    ssh $broker "sed -i 's/#controllerMode=true/controllerMode=true/' /etc/rocketmq/broker.conf"
    ssh $broker "sed -i 's/#controllerAddr=/controllerAddr=controller-1:9878;controller-2:9879;controller-3:9880/' /etc/rocketmq/broker.conf"
    ssh $broker "systemctl restart rocketmq-broker"
done

# 3. 验证迁移
mqadmin controllerStatus -n ns1:9876

# 4. 停用 NameServer(可选)
# systemctl stop rocketmq-namesrv

总结

RocketMQ Controller 的核心要点:

  1. 选举机制:Raft 选举、Leader/Follower
  2. 元数据管理:集中式管理、强一致、持久化
  3. Broker 选举:主从切换、自动故障恢复
  4. 高可用部署:集群部署、配置优化
  5. 监控运维:指标、命令、故障排查

核心要点

参考资料


分享这篇文章到:

上一篇文章
Agent 记忆系统设计实战
下一篇文章
企业级 RAG 系统架构设计