本文深入分析 RocketMQ 的核心源码,包括消息发送、存储、消费等关键流程,帮助理解 RocketMQ 的内部实现机制。
一、源码结构
1.1 模块划分
rocketmq/
├── rocketmq-common/ # 公共模块
├── rocketmq-remoting/ # 远程通信
├── rocketmq-store/ # 存储模块
├── rocketmq-client/ # 客户端
├── rocketmq-broker/ # Broker
├── rocketmq-namesrv/ # NameServer
└── rocketmq-tools/ # 运维工具
1.2 核心类图
graph TB
subgraph 客户端
DP[DefaultMQProducer]
DC[DefaultMQPushConsumer]
end
subgraph Broker
MB[MQBroker]
MS[MessageStore]
end
subgraph 存储
CL[CommitLog]
CQ[ConsumeQueue]
end
DP --> MB
DC --> MB
MB --> MS
MS --> CL
MS --> CQ
二、消息发送流程
2.1 Producer 发送流程
// DefaultMQProducerImpl.send()
public SendResult send(Message msg) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException {
// 1. 消息校验
checkMessage(msg);
// 2. 设置 Producer Group
msg.setTopic(withNamespace(msg.getTopic()));
// 3. 选择 MessageQueue
MessageQueue mq = selectOneMessageQueue(topicPublishInfo);
// 4. 发送消息
return sendKernelImpl(msg, mq, communicationMode, null, null, null,
defaultMQProducer.getSendMsgTimeout());
}
// 核心发送实现
private SendResult sendKernelImpl(Message msg, MessageQueue mq,
CommunicationType communicationType, ...) {
// 1. 获取 Broker 地址
String brokerAddr = findBrokerAddressInPublish(mq.getBrokerName());
// 2. 构建请求
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.groupName);
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(topicWithNamespace);
requestHeader.setDefaultTopicQueueNums(this.defaultTopicQueueNums);
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
// 3. 发送请求
RemotingCommand request = RemotingCommand.createRequestCommand(
SendMessageCode.SEND_MESSAGE, requestHeader);
request.setBody(msg.getBody());
// 4. 获取响应
RemotingCommand response = this.remotingClient.invokeSync(
brokerAddr, request, timeoutMillis);
// 5. 解析响应
SendResult sendResult = decodeSendMessageResponse(response);
return sendResult;
}
2.2 消息路由选择
// TopicPublishInfo.selectOneMessageQueue()
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
// 轮询策略
if (sendWhichQueue.get() % 2 == 0) {
return selectOneMessageQueueByRoundRobin(lastBrokerName);
}
// 最小负载策略
return selectOneMessageQueueByMinLoad(lastBrokerName);
}
// 轮询选择
private MessageQueue selectOneMessageQueueByRoundRobin(String lastBrokerName) {
int index = this.sendWhichQueue.incrementAndGet();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
MessageQueue mq = this.messageQueueList.get(pos);
// 避免发送到上次失败的 Broker
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
// 如果所有 Broker 都失败,返回第一个
return this.messageQueueList.get(0);
}
三、存储流程
3.1 CommitLog 写入
// CommitLog.putMessage()
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
// 1. 检查 Broker 状态
if (!this.mqBrokerConfig.getBrokerRole().equals(BrokerRole.SLAVE)) {
if (this.mqBrokerConfig.getBrokerRole() == BrokerRole.SYNC_MASTER &&
this.slaveSyncDelay > this.mqBrokerConfig.getFlushSlaveTimeoutMillis()) {
return new PutMessageResult(PutMessageStatus.SLAVE_NOT_AVAILABLE, null);
}
}
// 2. 构建消息
ByteBuffer byteBuffer = this.messageExtEncoder.encode(msg);
// 3. 追加到 CommitLog
AppendMessageResult result = this.commitLog.append(byteBuffer);
// 4. 处理结果
switch (result.getStatus()) {
case PUT_OK:
// 成功
break;
case END_OF_FILE:
// 文件已满,创建新文件
result = this.commitLog.append(byteBuffer);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
}
// 5. 异步刷盘
if (this.mqBrokerConfig.getFlushDiskType() == FlushDiskType.ASYNC_FLUSH) {
this.flushCommitLogService.wakeup();
}
// 6. 构建 ConsumeQueue
this.reputMessageService.wakeup();
return new PutMessageResult(PutMessageStatus.PUT_OK, result);
}
3.2 ConsumeQueue 构建
// ReputMessageService.run()
public void run() {
while (!this.isStopped()) {
try {
// 1. 从 CommitLog 读取消息
DispatchRequest request = this.commitLog.pickupDispatchRequest(
this.reputFromOffset);
if (request == null) {
Thread.sleep(1);
continue;
}
// 2. 构建 ConsumeQueue
this.doDispatch(request);
// 3. 更新偏移量
this.reputFromOffset = request.getCommitLogOffset() + request.getMsgSize();
} catch (Exception e) {
log.error("ReputMessageService 运行错误", e);
}
}
}
// 构建 ConsumeQueue 条目
private void doDispatch(DispatchRequest request) {
// 1. 获取 ConsumeQueue
ConsumeQueue cq = this.findOrCreateConsumeQueue(request.getTopic(),
request.getQueueId());
// 2. 构建索引条目(20 字节)
ByteBuffer byteBuffer = ByteBuffer.allocate(20);
byteBuffer.putLong(request.getCommitLogOffset()); // 8 字节
byteBuffer.putInt(request.getMsgSize()); // 4 字节
byteBuffer.putLong(request.getTagsCode()); // 8 字节
// 3. 写入 ConsumeQueue
cq.putQueueData(request.getConsumeQueueOffset(), byteBuffer);
}
四、消费流程
4.1 PullMessage 流程
// PullMessageProcessor.processRequest()
public RemotingCommand processRequest(Channel channel, RemotingCommand request) {
PullMessageRequestHeader requestHeader =
(PullMessageRequestHeader) request.decodeCommandCustomHeader();
// 1. 检查订阅关系
SubscriptionGroupConfig subscriptionGroupConfig =
this.brokerController.getSubscriptionGroupManager()
.findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
// 2. 获取 ConsumeQueue
ConsumeQueue consumeQueue = this.brokerController.getMessageStore()
.findConsumeQueue(requestHeader.getTopic(), requestHeader.getQueueId());
// 3. 获取消息
GetMessageResult getMessageResult = this.brokerController.getMessageStore()
.getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId(), requestHeader.getQueueOffset(),
requestHeader.getMaxMsgNums(), null);
// 4. 构建响应
RemotingCommand response = RemotingCommand.createResponseCommand(
GetMessageResponseHeader.class);
response.setCode(ResponseCode.SUCCESS);
((GetMessageResponseHeader) response.readCustomHeader())
.setOffset(getMessageResult.getNextBeginOffset());
// 5. 返回消息
if (getMessageResult.getMessageBufferList() != null) {
response.setBody(getMessageResult.getMessageBufferList().get(0));
}
return response;
}
4.2 消费推送流程
// PullRequest.handleTask()
public void handleTask() {
// 1. 拉取消息
PullResult pullResult = this.pullMessage(this.pullRequest);
// 2. 处理结果
switch (pullResult.getPullStatus()) {
case FOUND:
// 3. 提交消费
boolean dispatchToConsume = processQueue.putMessage(
pullResult.getMsgFoundList());
// 4. 触发消费
this.consumeMessageService.submitConsumeRequest(
new ConsumeRequest(pullResult.getMsgFoundList(),
processQueue, messageQueue));
// 5. 继续拉取
if (dispatchToConsume) {
this.pullRequest.putMessage(pullResult.getMsgFoundList());
this.mqConsumerFactory.pullMessage(this.pullRequest);
}
break;
case NO_NEW_MSG:
// 无新消息,继续拉取
this.mqConsumerFactory.pullMessage(this.pullRequest);
break;
case OFFSET_ILLEGAL:
// 偏移量非法,重置
this.mqConsumerFactory.updateConsumeOffset(this.pullRequest,
pullResult.getNextBeginOffset());
break;
}
}
五、事务消息
5.1 事务消息发送
// TransactionalMessageServiceImpl.putMessage()
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
// 1. 构建半消息
msg.setTopic(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC);
msg.setProperties(MessageAccessor.string2Properties(
MessageConst.PROPERTY_REAL_TOPIC + "=" + msg.getTopic() + ";" +
MessageConst.PROPERTY_REAL_QUEUE_ID + "=" + msg.getQueueId()));
// 2. 发送到半消息队列
PutMessageResult result = this.defaultMessageStore.putMessage(msg);
// 3. 记录事务消息
if (result.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
this.transactionTable.put(msg.getMsgId(),
new TransactionMsg(msg.getMsgId(), msg.getUserProperty("TRAN_MSG")));
}
return result;
}
5.2 事务回查
// TransactionalMessageCheckService.run()
public void run() {
while (!this.isStopped()) {
try {
// 1. 等待检查间隔
this.waitForRunning(checkInterval);
// 2. 获取半消息
Map<MessageQueue, PullResult> msgQueues = this.pullHalfMsg();
// 3. 检查每条消息
for (Map.Entry<MessageQueue, PullResult> entry : msgQueues.entrySet()) {
for (MessageExt msg : entry.getValue().getMsgFoundList()) {
// 4. 回查事务状态
this.check(msg);
}
}
} catch (Exception e) {
log.error("事务消息回查失败", e);
}
}
}
// 回查事务状态
private void check(MessageExt msg) {
// 1. 构建回查请求
CheckTransactionStateRequestHeader header =
new CheckTransactionStateRequestHeader();
header.setMsgId(msg.getMsgId());
header.setTransactionId(msg.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
// 2. 发送给 Producer
this.brokerController.getBroker2Client().checkTransactionState(
msg.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP),
msg, header);
}
六、高可用
6.1 主从同步
// SlaveSynchronize.run()
public void run() {
while (!this.isStopped()) {
try {
// 1. 等待同步间隔
this.waitForRunning(syncInterval);
// 2. 获取 Master 地址
String masterAddr = this.brokerController.getMasterBrokerAddr();
if (masterAddr != null) {
// 3. 同步 Topic 配置
this.updateTopicConfig(masterAddr);
// 4. 同步订阅组配置
this.updateSubscriptionGroupConfig(masterAddr);
// 5. 同步消息
this.syncMessage(masterAddr);
}
} catch (Exception e) {
log.error("主从同步失败", e);
}
}
}
6.2 故障切换
// BrokerController.brokerHeartbeat()
public void brokerHeartbeat() {
// 1. 发送心跳到 NameServer
this.registerBrokerAll(true, false);
// 2. 检查 Master 状态
if (this.brokerConfig.getBrokerRole() == BrokerRole.SLAVE) {
String masterAddr = this.getMasterBrokerAddr();
if (masterAddr == null) {
// Master 不可用
log.warn("Master Broker 不可用");
// 3. 尝试切换(Dledger 模式)
if (this.brokerConfig.isEnableDLegerCommitLog()) {
this.dledgerLeaderElection.electLeader();
}
}
}
}
七、性能优化
7.1 零拷贝优化
// FileChannel.transferTo()
public void transferTo() throws IOException {
// 使用零拷贝技术
// 数据直接从磁盘到 Socket,不经过用户空间
long count = fileChannel.transferTo(
position, // 起始位置
count, // 传输字节数
socketChannel // 目标 Socket
);
// 传统方式需要 4 次拷贝:
// disk -> kernel buffer -> user buffer -> kernel buffer -> socket
// 零拷贝只需要 2 次:
// disk -> kernel buffer -> socket
}
7.2 页缓存优化
// MappedFile.warmMappedFile()
public void warmMappedFile() {
// 预热 mmap,减少首次访问延迟
// 1. 锁定内存
int pages = (int) (this.fileSize / pageSize);
for (int i = 0; i < pages; i++) {
// 2. 逐页访问
mappedByteBuffer.get(i * pageSize);
}
// 3. 设置为不交换
// ((DirectBuffer) mappedByteBuffer).load();
}
八、源码学习建议
8.1 学习路径
1. 消息发送流程
- DefaultMQProducer
- MQClientInstance
- NettyRemotingClient
2. 存储流程
- CommitLog
- ConsumeQueue
- MessageStore
3. 消费流程
- DefaultMQPushConsumer
- RebalanceImpl
- PullRequest
4. 高级特性
- 事务消息
- 定时消息
- 顺序消息
8.2 调试技巧
// 1. 添加断点
// 在关键方法添加断点:
// - DefaultMQProducerImpl.sendKernelImpl()
// - CommitLog.putMessage()
// - PullMessageProcessor.processRequest()
// 2. 启用调试日志
// 修改 logback.xml
<logger name="org.apache.rocketmq" level="DEBUG"/>
// 3. 使用 Arthas 追踪
// 实时追踪方法调用
trace org.apache.rocketmq.client.producer.DefaultMQProducer send
总结
RocketMQ 源码的核心要点:
- 消息发送:路由选择、同步/异步发送
- 存储机制:CommitLog、ConsumeQueue、IndexFile
- 消费流程:Pull/Push 模式、位移管理
- 事务消息:半消息、回查机制
- 高可用:主从同步、故障切换
核心要点:
- 理解 CommitLog 顺序写盘设计
- 掌握 ConsumeQueue 索引机制
- 了解事务消息实现原理
- 学习性能优化技巧
参考资料
- RocketMQ GitHub
- RocketMQ 源码解读
- 《RocketMQ 技术内幕》