RocketMQ Controller 是 RocketMQ 5.0 引入的新组件,负责集群元数据管理和 Broker 选举。本文将深入探讨 Controller 的实现原理和高可用机制。
一、Controller 基础
1.1 为什么需要 Controller?
NameServer 模式问题:
问题:
- NameServer 无状态,元数据不一致
- Broker 注册信息可能冲突
- 需要外部协调
Controller 模式优势:
优势:
- 集中式元数据管理
- 保证元数据一致性
- 支持 Broker 自动选举
- 简化运维
1.2 架构演进
NameServer 模式:
graph TB
subgraph NameServer 集群
NS1[NameServer 1]
NS2[NameServer 2]
NS3[NameServer 3]
end
subgraph Broker 集群
B1[Broker 1 Master]
B2[Broker 1 Slave]
end
B1 -->|注册 | NS1
B1 -->|注册 | NS2
B1 -->|注册 | NS3
NS1 -.->|元数据可能不一致 | NS2
Controller 模式:
graph TB
subgraph Controller 集群
C1[Controller 1<br/>Leader]
C2[Controller 2<br/>Follower]
C3[Controller 3<br/>Follower]
end
subgraph Broker 集群
B1[Broker 1]
B2[Broker 2]
end
C1 -.->|Raft | C2
C1 -.->|Raft | C3
B1 -->|注册 | C1
B2 -->|注册 | C1
C1 -->|元数据一致 | B1
C1 -->|元数据一致 | B2
1.3 模式对比
| 特性 | NameServer 模式 | Controller 模式 |
|---|---|---|
| 元数据 | 最终一致 | 强一致 |
| 选举 | 手动/半自动 | 自动 |
| 依赖 | 无 | 内置 Raft |
| 运维 | 复杂 | 简单 |
| 推荐 | 兼容旧版 | 新版推荐 |
二、Controller 选举
2.1 Raft 选举
选举流程:
sequenceDiagram
participant C1 as Controller 1
participant C2 as Controller 2
participant C3 as Controller 3
C1->>C1: 启动,成为 Follower
C2->>C2: 启动,成为 Follower
C3->>C3: 启动,成为 Follower
C1->>C2: Request Vote
C1->>C3: Request Vote
C2-->>C1: Vote Yes
C3-->>C1: Vote Yes
C1->>C1: 成为 Leader
C1->>C2: 心跳
C1->>C3: 心跳
2.2 选举配置
# controller.conf
# Controller 配置
controllerDLegerGroup=DefaultController
controllerDLegerPeers=n0-localhost:9878;n1-localhost:9879;n2-localhost:9880
controllerDLegerSelfId=n0
# 选举超时时间
controllerElectionTimeoutMs=2000
# 心跳间隔
controllerHeartbeatIntervalMs=1000
# 扫描间隔
controllerScanIntervalMs=2000
2.3 Broker 注册
注册流程:
sequenceDiagram
participant B as Broker
participant C as Controller
B->>C: 注册请求
C->>C: 验证 Broker
C->>C: 更新元数据
C-->>B: 注册成功
loop 心跳
B->>C: 心跳
C-->>B: 心跳响应
end
注册代码:
public class BrokerRegister {
private final ControllerManager controllerManager;
public void registerBroker(BrokerIdentity brokerIdentity) {
// 1. 构建注册请求
RegisterBrokerRequest request = new RegisterBrokerRequest();
request.setBrokerName(brokerIdentity.getBrokerName());
request.setBrokerAddr(brokerIdentity.getBrokerAddr());
request.setCluster(brokerIdentity.getClusterName());
// 2. 发送到 Controller
RegisterBrokerResponse response =
controllerManager.registerBroker(request);
// 3. 处理响应
if (response.isSuccess()) {
log.info("Broker 注册成功:{}", brokerIdentity);
} else {
log.error("Broker 注册失败:{}", response.getError());
}
}
}
三、元数据管理
3.1 元数据结构
Metadata
├── ClusterInfo
│ ├── ClusterName
│ └── BrokerSet
├── BrokerSet
│ ├── BrokerName
│ ├── MasterBroker
│ └── SlaveBrokers
├── TopicConfig
│ ├── TopicName
│ ├── QueueConfig
│ └── Attributes
└── SubscriptionGroup
├── GroupName
└── SubscriptionConfig
3.2 元数据同步
同步流程:
graph TB
subgraph Controller Leader
CL[元数据]
end
subgraph Controller Follower
CF1[元数据副本]
CF2[元数据副本]
end
subgraph Broker
B1[Broker 1]
B2[Broker 2]
end
CL -->|Raft 复制 | CF1
CL -->|Raft 复制 | CF2
B1 -->|读取 | CL
B2 -->|读取 | CL
3.3 元数据持久化
public class MetadataPersist {
private final String metadataDir;
/**
* 持久化元数据
*/
public void persistMetadata(Metadata metadata) {
String filePath = metadataDir + "/metadata.json";
try {
// 1. 序列化元数据
String json = JSON.toJSONString(metadata);
// 2. 写入临时文件
String tempFile = filePath + ".tmp";
Files.write(Paths.get(tempFile), json.getBytes());
// 3. 原子替换
Files.move(Paths.get(tempFile), Paths.get(filePath),
StandardCopyOption.ATOMIC_MOVE);
log.info("元数据持久化成功");
} catch (IOException e) {
log.error("元数据持久化失败", e);
}
}
/**
* 加载元数据
*/
public Metadata loadMetadata() {
String filePath = metadataDir + "/metadata.json";
try {
String json = new String(Files.readAllBytes(Paths.get(filePath)));
return JSON.parseObject(json, Metadata.class);
} catch (IOException e) {
log.error("加载元数据失败", e);
return new Metadata();
}
}
}
四、Broker 选举
4.1 主从选举
选举触发:
graph TB
A[Master 宕机] --> B[Controller 检测]
B --> C{有 Slave?}
C -->|是 | D[提升 Slave 为 Master]
C -->|否 | E[等待新 Broker]
D --> F[更新元数据]
F --> G[通知 Producer/Consumer]
4.2 自动切换
配置:
# Broker 配置
brokerRole=ASYNC_MASTER # 或 SLAVE
enableAutoSwitch=true # 启用自动切换
实现:
public class BrokerAutoSwitch {
private final ControllerManager controllerManager;
/**
* 检测 Master 故障
*/
public void detectMasterFault() {
scheduledExecutor.scheduleAtFixedRate(() -> {
// 1. 检查 Master 心跳
if (!checkMasterHeartbeat()) {
// 2. 触发切换
triggerSwitch();
}
}, 1000, 1000, TimeUnit.MILLISECONDS);
}
/**
* 触发主从切换
*/
private void triggerSwitch() {
// 1. 选择新 Master
BrokerIdentity newMaster = selectNewMaster();
// 2. 更新元数据
controllerManager.updateMaster(newMaster);
// 3. 通知 Broker
notifyBrokerChange(newMaster);
log.info("主从切换完成:{}", newMaster);
}
}
五、高可用部署
5.1 Controller 集群部署
#!/bin/bash
# Controller 集群部署脚本
# Controller 节点
CONTROLLERS=("controller-1" "controller-2" "controller-3")
# 部署每个 Controller
for i in "${!CONTROLLERS[@]}"; do
controller=${CONTROLLERS[$i]}
node_id="n$i"
peers="n0-controller-1:9878;n1-controller-2:9879;n2-controller-3:9880"
echo "部署 Controller: $controller (ID: $node_id)"
# 创建配置文件
cat > /etc/rocketmq/controller-$node_id.conf << EOF
controllerDLegerGroup=DefaultController
controllerDLegerPeers=$peers
controllerDLegerSelfId=$node_id
controllerStorePath=/data/rocketmq/controller
listenPort=9878
EOF
# 启动 Controller
ssh $controller "mqcontroller -c /etc/rocketmq/controller-$node_id.conf &"
done
echo "Controller 集群部署完成"
5.2 Broker 配置
# broker-a.properties
# 基础配置
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0
# Controller 模式
controllerMode=true
controllerAddr=controller-1:9878;controller-2:9879;controller-3:9880
# 存储配置
storePathRootDir=/data/rocketmq/store
storePathCommitLog=/data/rocketmq/store/commitlog
5.3 混合模式
NameServer + Controller:
# 兼容旧客户端
enableNameServer=true
nameServerAddr=ns1:9876;ns2:9876
# Controller 模式
controllerMode=true
controllerAddr=controller-1:9878;controller-2:9879;controller-3:9880
六、监控运维
6.1 监控指标
| 指标 | 说明 | 告警阈值 |
|---|---|---|
controller_leader | Controller Leader | != 1 |
controller_broker_count | Broker 数量 | < 预期 |
controller_metadata_size | 元数据大小 | > 100MB |
controller_raft_term | Raft Term | - |
6.2 运维命令
# 查看 Controller 状态
mqadmin controllerStatus -n ns1:9876
# 查看 Broker 注册信息
mqadmin brokerStatus -n ns1:9876 -b broker-1:10911
# 查看元数据
mqadmin metadata -n ns1:9876
# 触发主从切换
mqadmin switchMaster -n ns1:9876 -b broker-a
6.3 故障排查
Controller 无法选举:
# 1. 检查网络
ping controller-1
ping controller-2
ping controller-3
# 2. 检查端口
netstat -tlnp | grep 9878
# 3. 检查日志
tail -f /var/log/rocketmq/controller.log | grep -i "election"
# 4. 检查 Raft 状态
mqadmin controllerStatus -n ns1:9876
Broker 注册失败:
# 1. 检查 Controller 状态
mqadmin controllerStatus -n ns1:9876
# 2. 检查 Broker 配置
cat /etc/rocketmq/broker.conf | grep controller
# 3. 检查 Broker 日志
tail -f /var/log/rocketmq/broker.log | grep -i "register"
# 4. 重新注册
mqadmin updateBrokerConfig -n ns1:9876 -b broker-1:10911
七、最佳实践
7.1 部署建议
推荐配置:
- Controller 数量:3(奇数)
- 跨可用区部署
- 独立 Controller 节点
- 监控告警完善
7.2 配置优化
# Controller 配置优化
# 选举超时(默认 2000ms)
controllerElectionTimeoutMs=2000
# 心跳间隔(默认 1000ms)
controllerHeartbeatIntervalMs=1000
# 扫描间隔(默认 2000ms)
controllerScanIntervalMs=2000
# 元数据持久化间隔
metadataPersistIntervalMs=5000
7.3 迁移方案
NameServer 迁移到 Controller:
#!/bin/bash
# 迁移脚本
# 1. 部署 Controller 集群
./deploy-controller.sh
# 2. 配置 Broker 使用 Controller
for broker in broker-1 broker-2 broker-3; do
ssh $broker "sed -i 's/#controllerMode=true/controllerMode=true/' /etc/rocketmq/broker.conf"
ssh $broker "sed -i 's/#controllerAddr=/controllerAddr=controller-1:9878;controller-2:9879;controller-3:9880/' /etc/rocketmq/broker.conf"
ssh $broker "systemctl restart rocketmq-broker"
done
# 3. 验证迁移
mqadmin controllerStatus -n ns1:9876
# 4. 停用 NameServer(可选)
# systemctl stop rocketmq-namesrv
总结
RocketMQ Controller 的核心要点:
- 选举机制:Raft 选举、Leader/Follower
- 元数据管理:集中式管理、强一致、持久化
- Broker 选举:主从切换、自动故障恢复
- 高可用部署:集群部署、配置优化
- 监控运维:指标、命令、故障排查
核心要点:
- 理解 Controller 选举原理
- 掌握元数据管理机制
- 优先使用 Controller 模式
- 建立完善的监控体系
参考资料
- RocketMQ Controller 官方文档
- RocketMQ 5.0 新特性
- 《RocketMQ 技术内幕》第 10 章