Skip to content
清晨的一缕阳光
返回

RocketMQ 核心源码分析与解读

本文深入分析 RocketMQ 的核心源码,包括消息发送、存储、消费等关键流程,帮助理解 RocketMQ 的内部实现机制。

一、源码结构

1.1 模块划分

rocketmq/
├── rocketmq-common/        # 公共模块
├── rocketmq-remoting/      # 远程通信
├── rocketmq-store/         # 存储模块
├── rocketmq-client/        # 客户端
├── rocketmq-broker/        # Broker
├── rocketmq-namesrv/       # NameServer
└── rocketmq-tools/         # 运维工具

1.2 核心类图

graph TB
    subgraph 客户端
        DP[DefaultMQProducer]
        DC[DefaultMQPushConsumer]
    end
    
    subgraph Broker
        MB[MQBroker]
        MS[MessageStore]
    end
    
    subgraph 存储
        CL[CommitLog]
        CQ[ConsumeQueue]
    end
    
    DP --> MB
    DC --> MB
    MB --> MS
    MS --> CL
    MS --> CQ

二、消息发送流程

2.1 Producer 发送流程

// DefaultMQProducerImpl.send()
public SendResult send(Message msg) throws MQClientException, RemotingException, 
    MQBrokerException, InterruptedException {
    
    // 1. 消息校验
    checkMessage(msg);
    
    // 2. 设置 Producer Group
    msg.setTopic(withNamespace(msg.getTopic()));
    
    // 3. 选择 MessageQueue
    MessageQueue mq = selectOneMessageQueue(topicPublishInfo);
    
    // 4. 发送消息
    return sendKernelImpl(msg, mq, communicationMode, null, null, null, 
        defaultMQProducer.getSendMsgTimeout());
}

// 核心发送实现
private SendResult sendKernelImpl(Message msg, MessageQueue mq, 
    CommunicationType communicationType, ...) {
    
    // 1. 获取 Broker 地址
    String brokerAddr = findBrokerAddressInPublish(mq.getBrokerName());
    
    // 2. 构建请求
    SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
    requestHeader.setProducerGroup(this.groupName);
    requestHeader.setTopic(msg.getTopic());
    requestHeader.setDefaultTopic(topicWithNamespace);
    requestHeader.setDefaultTopicQueueNums(this.defaultTopicQueueNums);
    requestHeader.setQueueId(mq.getQueueId());
    requestHeader.setSysFlag(sysFlag);
    requestHeader.setBornTimestamp(System.currentTimeMillis());
    requestHeader.setFlag(msg.getFlag());
    
    // 3. 发送请求
    RemotingCommand request = RemotingCommand.createRequestCommand(
        SendMessageCode.SEND_MESSAGE, requestHeader);
    request.setBody(msg.getBody());
    
    // 4. 获取响应
    RemotingCommand response = this.remotingClient.invokeSync(
        brokerAddr, request, timeoutMillis);
    
    // 5. 解析响应
    SendResult sendResult = decodeSendMessageResponse(response);
    
    return sendResult;
}

2.2 消息路由选择

// TopicPublishInfo.selectOneMessageQueue()
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    // 轮询策略
    if (sendWhichQueue.get() % 2 == 0) {
        return selectOneMessageQueueByRoundRobin(lastBrokerName);
    }
    
    // 最小负载策略
    return selectOneMessageQueueByMinLoad(lastBrokerName);
}

// 轮询选择
private MessageQueue selectOneMessageQueueByRoundRobin(String lastBrokerName) {
    int index = this.sendWhichQueue.incrementAndGet();
    
    for (int i = 0; i < this.messageQueueList.size(); i++) {
        int pos = Math.abs(index++) % this.messageQueueList.size();
        MessageQueue mq = this.messageQueueList.get(pos);
        
        // 避免发送到上次失败的 Broker
        if (!mq.getBrokerName().equals(lastBrokerName)) {
            return mq;
        }
    }
    
    // 如果所有 Broker 都失败,返回第一个
    return this.messageQueueList.get(0);
}

三、存储流程

3.1 CommitLog 写入

// CommitLog.putMessage()
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
    // 1. 检查 Broker 状态
    if (!this.mqBrokerConfig.getBrokerRole().equals(BrokerRole.SLAVE)) {
        if (this.mqBrokerConfig.getBrokerRole() == BrokerRole.SYNC_MASTER && 
            this.slaveSyncDelay > this.mqBrokerConfig.getFlushSlaveTimeoutMillis()) {
            return new PutMessageResult(PutMessageStatus.SLAVE_NOT_AVAILABLE, null);
        }
    }
    
    // 2. 构建消息
    ByteBuffer byteBuffer = this.messageExtEncoder.encode(msg);
    
    // 3. 追加到 CommitLog
    AppendMessageResult result = this.commitLog.append(byteBuffer);
    
    // 4. 处理结果
    switch (result.getStatus()) {
        case PUT_OK:
            // 成功
            break;
        case END_OF_FILE:
            // 文件已满,创建新文件
            result = this.commitLog.append(byteBuffer);
            break;
        case MESSAGE_SIZE_EXCEEDED:
        case PROPERTIES_SIZE_EXCEEDED:
            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
    }
    
    // 5. 异步刷盘
    if (this.mqBrokerConfig.getFlushDiskType() == FlushDiskType.ASYNC_FLUSH) {
        this.flushCommitLogService.wakeup();
    }
    
    // 6. 构建 ConsumeQueue
    this.reputMessageService.wakeup();
    
    return new PutMessageResult(PutMessageStatus.PUT_OK, result);
}

3.2 ConsumeQueue 构建

// ReputMessageService.run()
public void run() {
    while (!this.isStopped()) {
        try {
            // 1. 从 CommitLog 读取消息
            DispatchRequest request = this.commitLog.pickupDispatchRequest(
                this.reputFromOffset);
            
            if (request == null) {
                Thread.sleep(1);
                continue;
            }
            
            // 2. 构建 ConsumeQueue
            this.doDispatch(request);
            
            // 3. 更新偏移量
            this.reputFromOffset = request.getCommitLogOffset() + request.getMsgSize();
            
        } catch (Exception e) {
            log.error("ReputMessageService 运行错误", e);
        }
    }
}

// 构建 ConsumeQueue 条目
private void doDispatch(DispatchRequest request) {
    // 1. 获取 ConsumeQueue
    ConsumeQueue cq = this.findOrCreateConsumeQueue(request.getTopic(), 
        request.getQueueId());
    
    // 2. 构建索引条目(20 字节)
    ByteBuffer byteBuffer = ByteBuffer.allocate(20);
    byteBuffer.putLong(request.getCommitLogOffset());  // 8 字节
    byteBuffer.putInt(request.getMsgSize());           // 4 字节
    byteBuffer.putLong(request.getTagsCode());         // 8 字节
    
    // 3. 写入 ConsumeQueue
    cq.putQueueData(request.getConsumeQueueOffset(), byteBuffer);
}

四、消费流程

4.1 PullMessage 流程

// PullMessageProcessor.processRequest()
public RemotingCommand processRequest(Channel channel, RemotingCommand request) {
    PullMessageRequestHeader requestHeader = 
        (PullMessageRequestHeader) request.decodeCommandCustomHeader();
    
    // 1. 检查订阅关系
    SubscriptionGroupConfig subscriptionGroupConfig = 
        this.brokerController.getSubscriptionGroupManager()
            .findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
    
    // 2. 获取 ConsumeQueue
    ConsumeQueue consumeQueue = this.brokerController.getMessageStore()
        .findConsumeQueue(requestHeader.getTopic(), requestHeader.getQueueId());
    
    // 3. 获取消息
    GetMessageResult getMessageResult = this.brokerController.getMessageStore()
        .getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(), 
            requestHeader.getQueueId(), requestHeader.getQueueOffset(), 
            requestHeader.getMaxMsgNums(), null);
    
    // 4. 构建响应
    RemotingCommand response = RemotingCommand.createResponseCommand(
        GetMessageResponseHeader.class);
    
    response.setCode(ResponseCode.SUCCESS);
    ((GetMessageResponseHeader) response.readCustomHeader())
        .setOffset(getMessageResult.getNextBeginOffset());
    
    // 5. 返回消息
    if (getMessageResult.getMessageBufferList() != null) {
        response.setBody(getMessageResult.getMessageBufferList().get(0));
    }
    
    return response;
}

4.2 消费推送流程

// PullRequest.handleTask()
public void handleTask() {
    // 1. 拉取消息
    PullResult pullResult = this.pullMessage(this.pullRequest);
    
    // 2. 处理结果
    switch (pullResult.getPullStatus()) {
        case FOUND:
            // 3. 提交消费
            boolean dispatchToConsume = processQueue.putMessage(
                pullResult.getMsgFoundList());
            
            // 4. 触发消费
            this.consumeMessageService.submitConsumeRequest(
                new ConsumeRequest(pullResult.getMsgFoundList(), 
                    processQueue, messageQueue));
            
            // 5. 继续拉取
            if (dispatchToConsume) {
                this.pullRequest.putMessage(pullResult.getMsgFoundList());
                this.mqConsumerFactory.pullMessage(this.pullRequest);
            }
            break;
        case NO_NEW_MSG:
            // 无新消息,继续拉取
            this.mqConsumerFactory.pullMessage(this.pullRequest);
            break;
        case OFFSET_ILLEGAL:
            // 偏移量非法,重置
            this.mqConsumerFactory.updateConsumeOffset(this.pullRequest, 
                pullResult.getNextBeginOffset());
            break;
    }
}

五、事务消息

5.1 事务消息发送

// TransactionalMessageServiceImpl.putMessage()
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
    // 1. 构建半消息
    msg.setTopic(TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC);
    msg.setProperties(MessageAccessor.string2Properties(
        MessageConst.PROPERTY_REAL_TOPIC + "=" + msg.getTopic() + ";" +
        MessageConst.PROPERTY_REAL_QUEUE_ID + "=" + msg.getQueueId()));
    
    // 2. 发送到半消息队列
    PutMessageResult result = this.defaultMessageStore.putMessage(msg);
    
    // 3. 记录事务消息
    if (result.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
        this.transactionTable.put(msg.getMsgId(), 
            new TransactionMsg(msg.getMsgId(), msg.getUserProperty("TRAN_MSG")));
    }
    
    return result;
}

5.2 事务回查

// TransactionalMessageCheckService.run()
public void run() {
    while (!this.isStopped()) {
        try {
            // 1. 等待检查间隔
            this.waitForRunning(checkInterval);
            
            // 2. 获取半消息
            Map<MessageQueue, PullResult> msgQueues = this.pullHalfMsg();
            
            // 3. 检查每条消息
            for (Map.Entry<MessageQueue, PullResult> entry : msgQueues.entrySet()) {
                for (MessageExt msg : entry.getValue().getMsgFoundList()) {
                    // 4. 回查事务状态
                    this.check(msg);
                }
            }
            
        } catch (Exception e) {
            log.error("事务消息回查失败", e);
        }
    }
}

// 回查事务状态
private void check(MessageExt msg) {
    // 1. 构建回查请求
    CheckTransactionStateRequestHeader header = 
        new CheckTransactionStateRequestHeader();
    header.setMsgId(msg.getMsgId());
    header.setTransactionId(msg.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
    
    // 2. 发送给 Producer
    this.brokerController.getBroker2Client().checkTransactionState(
        msg.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP), 
        msg, header);
}

六、高可用

6.1 主从同步

// SlaveSynchronize.run()
public void run() {
    while (!this.isStopped()) {
        try {
            // 1. 等待同步间隔
            this.waitForRunning(syncInterval);
            
            // 2. 获取 Master 地址
            String masterAddr = this.brokerController.getMasterBrokerAddr();
            
            if (masterAddr != null) {
                // 3. 同步 Topic 配置
                this.updateTopicConfig(masterAddr);
                
                // 4. 同步订阅组配置
                this.updateSubscriptionGroupConfig(masterAddr);
                
                // 5. 同步消息
                this.syncMessage(masterAddr);
            }
            
        } catch (Exception e) {
            log.error("主从同步失败", e);
        }
    }
}

6.2 故障切换

// BrokerController.brokerHeartbeat()
public void brokerHeartbeat() {
    // 1. 发送心跳到 NameServer
    this.registerBrokerAll(true, false);
    
    // 2. 检查 Master 状态
    if (this.brokerConfig.getBrokerRole() == BrokerRole.SLAVE) {
        String masterAddr = this.getMasterBrokerAddr();
        
        if (masterAddr == null) {
            // Master 不可用
            log.warn("Master Broker 不可用");
            
            // 3. 尝试切换(Dledger 模式)
            if (this.brokerConfig.isEnableDLegerCommitLog()) {
                this.dledgerLeaderElection.electLeader();
            }
        }
    }
}

七、性能优化

7.1 零拷贝优化

// FileChannel.transferTo()
public void transferTo() throws IOException {
    // 使用零拷贝技术
    // 数据直接从磁盘到 Socket,不经过用户空间
    
    long count = fileChannel.transferTo(
        position,           // 起始位置
        count,              // 传输字节数
        socketChannel       // 目标 Socket
    );
    
    // 传统方式需要 4 次拷贝:
    // disk -> kernel buffer -> user buffer -> kernel buffer -> socket
    // 零拷贝只需要 2 次:
    // disk -> kernel buffer -> socket
}

7.2 页缓存优化

// MappedFile.warmMappedFile()
public void warmMappedFile() {
    // 预热 mmap,减少首次访问延迟
    
    // 1. 锁定内存
    int pages = (int) (this.fileSize / pageSize);
    
    for (int i = 0; i < pages; i++) {
        // 2. 逐页访问
        mappedByteBuffer.get(i * pageSize);
    }
    
    // 3. 设置为不交换
    // ((DirectBuffer) mappedByteBuffer).load();
}

八、源码学习建议

8.1 学习路径

1. 消息发送流程
   - DefaultMQProducer
   - MQClientInstance
   - NettyRemotingClient

2. 存储流程
   - CommitLog
   - ConsumeQueue
   - MessageStore

3. 消费流程
   - DefaultMQPushConsumer
   - RebalanceImpl
   - PullRequest

4. 高级特性
   - 事务消息
   - 定时消息
   - 顺序消息

8.2 调试技巧

// 1. 添加断点
// 在关键方法添加断点:
// - DefaultMQProducerImpl.sendKernelImpl()
// - CommitLog.putMessage()
// - PullMessageProcessor.processRequest()

// 2. 启用调试日志
// 修改 logback.xml
<logger name="org.apache.rocketmq" level="DEBUG"/>

// 3. 使用 Arthas 追踪
// 实时追踪方法调用
trace org.apache.rocketmq.client.producer.DefaultMQProducer send

总结

RocketMQ 源码的核心要点:

  1. 消息发送:路由选择、同步/异步发送
  2. 存储机制:CommitLog、ConsumeQueue、IndexFile
  3. 消费流程:Pull/Push 模式、位移管理
  4. 事务消息:半消息、回查机制
  5. 高可用:主从同步、故障切换

核心要点

参考资料


分享这篇文章到:

上一篇文章
RocketMQ 5.0 新特性详解
下一篇文章
Kafka 生产者发送机制详解