Skip to content
清晨的一缕阳光
返回

RocketMQ 架构设计与核心原理

RocketMQ 是阿里巴巴开源的分布式消息中间件,具有高吞吐、低延迟、高可靠等特性,广泛应用于电商、金融、物流等领域。本文将深入探讨 RocketMQ 的架构设计和核心原理。

一、RocketMQ 简介

1.1 核心特性

特性说明
高吞吐百万级 TPS,万亿级消息堆积
低延迟毫秒级消息延迟
高可靠金融级可靠性,支持事务消息
顺序消息支持全局/分区顺序消息
定时消息支持多级延迟/定时消息
消息回溯支持按时间回溯消费

1.2 应用场景

graph TB
    subgraph 应用场景
        A[异步解耦] --> B[微服务通信]
        C[削峰填谷] --> D[秒杀活动]
        E[数据同步] --> F[CDC/ETL]
        G[事务消息] --> H[分布式事务]
        I[顺序消息] --> J[订单流程]
    end

1.3 与 Kafka 对比

特性RocketMQKafka
吞吐量10 万级 TPS百万级 TPS
延迟毫秒级毫秒级
可靠性不丢消息可能丢消息
顺序消息支持分区有序
定时消息支持不支持
事务消息支持不支持
消息回溯支持支持
生态Java 友好多语言

二、架构概览

2.1 核心组件

graph TB
    subgraph Producer
        P1[Producer Group 1]
        P2[Producer Group 2]
    end
    
    subgraph NameServer
        NS1[NameServer 1]
        NS2[NameServer 2]
    end
    
    subgraph Broker Cluster
        B1[Broker Master 1]
        B2[Broker Slave 1]
        B3[Broker Master 2]
        B4[Broker Slave 2]
    end
    
    subgraph Consumer
        C1[Consumer Group 1]
        C2[Consumer Group 2]
    end
    
    P1 --> NS1
    P2 --> NS2
    NS1 --> B1
    NS2 --> B3
    B1 --> C1
    B3 --> C2

核心组件说明

组件说明特点
NameServer命名服务,路由发现无状态、对等、轻量级
Broker消息服务器,存储转发主从架构、高可用
Producer消息生产者支持集群、负载均衡
Consumer消息消费者支持集群、广播模式
Topic消息主题逻辑分类、多队列
Queue消息队列物理存储、有序
Consumer Group消费者组负载均衡、容错

2.2 部署架构

单 Master 模式

NameServer × 2
Broker × 1 (Master)

多 Master 模式

NameServer × 2
Broker-M1 × 1
Broker-M2 × 1
Broker-M3 × 1

多 Master 多 Slave 模式(推荐)

NameServer × 2
Broker-M1 + Broker-S1 (异步复制)
Broker-M2 + Broker-S2 (异步复制)
Broker-M3 + Broker-S3 (同步双写)

Dledger 模式(高可用)

NameServer × 2
Broker-DLedger-Group × 3 (Raft 共识)

三、NameServer 详解

3.1 核心功能

graph LR
    subgraph NameServer 功能
        A[路由管理] --> B[Broker 注册]
        C[路由发现] --> D[Topic 查询]
        E[心跳检测] --> F[Broker 存活]
    end

工作原理

3.2 路由发现

// Producer 获取路由
TopicRouteData routeData = mqClientInstance.getMQClientAPIImpl()
    .getTopicRouteInfoFromNameServer(topic, timeout);

// 路由信息包含
class TopicRouteData {
    private List<QueueData> queueDatas;      // 队列信息
    private List<BrokerData> brokerDatas;     // Broker 信息
}

class BrokerData {
    private String brokerName;
    private String brokerAddr;  // Master 地址
    private String haServerAddr;  // Slave 地址
}

四、Broker 详解

4.1 存储架构

graph TB
    subgraph Broker 存储
        CommitLog[CommitLog<br/>顺序写入]
        ConsumeQueue[ConsumeQueue<br/>索引文件]
        IndexFile[IndexFile<br/>Key 索引]
    end
    
    Producer --> CommitLog
    CommitLog --> ConsumeQueue
    CommitLog --> IndexFile
    Consumer --> ConsumeQueue

存储文件说明

文件说明大小
CommitLog消息主体,顺序写入1GB/个
ConsumeQueue消费队列索引30 万条/个
IndexFileKey 索引,便于查询1 小时/个

4.2 消息存储流程

sequenceDiagram
    participant Producer
    participant Broker
    participant CommitLog
    participant ConsumeQueue
    
    Producer->>Broker: 发送消息
    Broker->>CommitLog: 追加写入
    Broker->>ConsumeQueue: 创建索引
    Broker-->>Producer: 返回 ACK

4.3 主从复制

异步复制

sequenceDiagram
    participant Producer
    participant Master
    participant Slave
    
    Producer->>Master: 写入消息
    Master-->>Producer: 返回 ACK
    Master->>Slave: 异步复制
    Slave-->>Master: 确认

同步双写

sequenceDiagram
    participant Producer
    participant Master
    participant Slave
    
    Producer->>Master: 写入消息
    Master->>Slave: 同步复制
    Slave-->>Master: 确认
    Master-->>Producer: 返回 ACK

配置示例

# Broker 角色
brokerRole=ASYNC_MASTER  # ASYNC_MASTER/SYNC_MASTER/SLAVE

# 刷盘方式
flushDiskType=ASYNC_FLUSH  # ASYNC_FLUSH/SYNC_FLUSH

# 复制方式
brokerFailoverTimeout=5000  # 故障转移超时

五、Producer 详解

5.1 发送流程

sequenceDiagram
    participant App as 应用程序
    participant Producer as Producer
    participant NS as NameServer
    participant Broker as Broker
    
    App->>Producer: 发送消息
    Producer->>NS: 获取路由
    NS-->>Producer: 返回路由
    Producer->>Broker: 选择队列
    Producer->>Broker: 发送消息
    Broker-->>Producer: 返回 ACK
    Producer-->>App: 返回结果

5.2 发送方式

// 1. 同步发送
SendResult result = producer.send(msg);
if (result.getSendStatus() == SendStatus.SEND_OK) {
    // 发送成功
}

// 2. 异步发送
producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult result) {
        // 发送成功
    }
    
    @Override
    public void onException(Throwable e) {
        // 发送失败
    }
});

// 3. 单向发送(不关心结果)
producer.sendOneway(msg);

5.3 消息类型

普通消息

Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
SendResult result = producer.send(msg);

顺序消息

// 分区有序(推荐)
Message msg = new Message("OrderTopic", "TagA", 
    "OrderID: 123".getBytes());
SendResult result = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> queues, 
                               Message msg, Object arg) {
        Long orderId = (Long) arg;
        // 相同 OrderID 发送到同一队列
        return queues.get((int) (orderId % queues.size()));
    }
}, 123L);

定时消息

Message msg = new Message("TopicTest", "Hello".getBytes());
// 延迟级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
msg.setDelayTimeLevel(3);  // 10 秒后投递
SendResult result = producer.send(msg);

事务消息

TransactionMQProducer producer = new TransactionMQProducer("producer_group");
producer.setTransactionListener(new TransactionListener() {
    @Override
    public LocalTransactionState executeLocalTransaction(
            Message msg, Object arg) {
        // 执行本地事务
        boolean success = doLocalTransaction(msg);
        if (success) {
            return LocalTransactionState.COMMIT_MESSAGE;
        } else {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
    
    @Override
    public LocalTransactionState checkLocalTransaction(
            ExtendedMessageQueueExt msgExt) {
        // 检查本地事务状态
        boolean success = checkTransaction(msgExt);
        return success ? LocalTransactionState.COMMIT_MESSAGE 
                       : LocalTransactionState.ROLLBACK_MESSAGE;
    }
});

六、Consumer 详解

6.1 消费模式

集群消费

graph TB
    subgraph Consumer Group
        C1[Consumer 1]
        C2[Consumer 2]
    end
    
    subgraph Topic
        Q1[Queue 1]
        Q2[Queue 2]
        Q3[Queue 3]
        Q4[Queue 4]
    end
    
    C1 --> Q1
    C1 --> Q2
    C2 --> Q3
    C2 --> Q4

广播消费

graph TB
    subgraph Consumer Group
        C1[Consumer 1]
        C2[Consumer 2]
    end
    
    subgraph Topic
        Q1[Queue 1]
        Q2[Queue 2]
    end
    
    C1 --> Q1
    C1 --> Q2
    C2 --> Q1
    C2 --> Q2

6.2 消费方式

// 1. 推模式(Push)
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(
            List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            System.out.println(new String(msg.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

// 2. 拉模式(Pull)
PullResult result = consumer.pull(new MessageQueue("TopicA", "BrokerA", 0), 
                                   "*", 0, 32);
for (MessageExt msg : result.getMsgFoundList()) {
    System.out.println(new String(msg.getBody()));
}

6.3 消息重试

// 消费失败自动重试
consumer.setConsumeTimeout(15, TimeUnit.MINUTES);
consumer.setMaxReconsumeTimes(16);  // 最大重试次数

// 重试队列:%RETRY%ConsumerGroup
// 重试间隔:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 30m 1h 2h 3h 4h 5h 6h 7h 8h 9h 10h 11h 12h 13h 14h 15h 16h 17h 18h 19h 20h 21h 22h 23h 1d

6.4 消费进度

// 自动提交 Offset(默认)
consumer.setOffsetStoreAutoCommit(true);

// 手动提交 Offset
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(
            List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        try {
            // 处理消息
            processMessage(msgs);
            
            // 提交 Offset
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        } catch (Exception e) {
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }
});

七、实战应用

7.1 Spring Boot 集成

Maven 依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.3</version>
</dependency>

配置文件

rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: order-producer-group
    send-message-timeout: 3000
    retry-times-when-send-failed: 2
  consumer:
    group: order-consumer-group

Producer 实现

@Service
public class OrderProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    public void sendOrderCreated(OrderEvent event) {
        rocketMQTemplate.convertAndSend("order-topic:TagA", event);
    }
    
    public void sendOrderDelayed(OrderEvent event, int delayLevel) {
        Message<OrderEvent> message = MessageBuilder.withPayload(event).build();
        rocketMQTemplate.syncSend("order-topic:TagA", message, 3000, delayLevel);
    }
}

Consumer 实现

@Component
@RocketMQMessageListener(
    topic = "order-topic",
    consumerGroup = "order-consumer-group",
    selectorExpression = "TagA",
    messageModel = MessageModel.CLUSTERING,
    consumeMode = ConsumeMode.CONCURRENTLY
)
public class OrderConsumer implements RocketMQListener<OrderEvent> {
    
    @Override
    public void onMessage(OrderEvent event) {
        System.out.println("收到订单消息:" + event.getOrderId());
        processOrder(event);
    }
}

7.2 订单处理示例

// 订单事件
public class OrderEvent {
    private String orderId;
    private String userId;
    private BigDecimal amount;
    private OrderStatus status;
    private LocalDateTime createTime;
}

// 订单创建 Producer
@Service
public class OrderCreateProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    public void sendOrderCreated(String orderId) {
        OrderEvent event = new OrderEvent();
        event.setOrderId(orderId);
        event.setStatus(OrderStatus.CREATED);
        
        // 发送到消息队列
        rocketMQTemplate.convertAndSend("order-topic:OrderTag", event);
    }
}

// 订单消费 Consumer
@Component
@RocketMQMessageListener(
    topic = "order-topic",
    consumerGroup = "order-process-group",
    selectorExpression = "OrderTag"
)
public class OrderProcessConsumer implements RocketMQListener<OrderEvent> {
    
    @Autowired
    private OrderService orderService;
    
    @Override
    @Transactional(rollbackFor = Exception.class)
    public void onMessage(OrderEvent event) {
        switch (event.getStatus()) {
            case CREATED:
                // 创建订单
                orderService.createOrder(event);
                break;
            case PAID:
                // 支付处理
                orderService.processPayment(event);
                break;
            case SHIPPED:
                // 发货处理
                orderService.processShipment(event);
                break;
        }
    }
}

7.3 事务消息示例

@Service
public class TransactionProducer {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    @PostConstruct
    public void init() {
        TransactionMQProducer producer = new TransactionMQProducer("tx-producer");
        producer.setNamesrvAddr("127.0.0.1:9876");
        
        producer.setTransactionListener(new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(
                    Message message, Object o) {
                try {
                    // 执行本地事务
                    OrderEvent event = JSON.parseObject(
                        new String(message.getBody()), OrderEvent.class);
                    orderService.createOrder(event);
                    
                    return LocalTransactionState.COMMIT_MESSAGE;
                } catch (Exception e) {
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }
            }
            
            @Override
            public LocalTransactionState checkLocalTransaction(
                    ExtendedMessageQueueExt message) {
                // 检查事务状态
                OrderEvent event = JSON.parseObject(
                    new String(message.getBody()), OrderEvent.class);
                boolean exists = orderService.exists(event.getOrderId());
                
                return exists ? LocalTransactionState.COMMIT_MESSAGE 
                              : LocalTransactionState.ROLLBACK_MESSAGE;
            }
        });
        
        producer.start();
    }
}

八、性能优化

8.1 Producer 优化

// 批量发送
List<Message<String>> messages = Arrays.asList(msg1, msg2, msg3);
SendResult result = producer.send(messages);

// 异步发送
producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult result) {
        // 成功回调
    }
    
    @Override
    public void onException(Throwable e) {
        // 异常处理
    }
});

8.2 Consumer 优化

// 调整消费线程数
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(64);

// 调整批量消费
consumer.setConsumeMessageBatchMaxSize(1);

// 调整拉取批次
consumer.setPullBatchSize(32);

8.3 Broker 优化

# 增加线程池
defaultThreadPoolNums=16

# 调整刷盘策略
flushDiskType=ASYNC_FLUSH
flushInterval=500

# 调整复制方式
brokerRole=ASYNC_MASTER

# 调整存储
mappedFileSizeCommitLog=1073741824
mappedFileSizeConsumeQueue=300000

九、监控与运维

9.1 关键指标

指标说明告警阈值
TPS每秒消息数持续 > 80%
延迟消息延迟时间> 1 秒
堆积未消费消息数> 10 万
Broker CPUBroker CPU 使用率> 80%
Broker 内存Broker 内存使用率> 80%

9.2 常用命令

# 查看 Topic 列表
mqadmin topicList -n 127.0.0.1:9876

# 查看 Topic 详情
mqadmin topicStatus -n 127.0.0.1:9876 -t TopicTest

# 查看 Consumer 进度
mqadmin consumerProgress -n 127.0.0.1:9876 -g consumer-group

# 重置消费 Offset
mqadmin resetOffset -n 127.0.0.1:9876 -t TopicTest -g consumer-group -s now

# 查看 Broker 状态
mqadmin brokerStatus -n 127.0.0.1:9876 -b 127.0.0.1:10911

十、总结

RocketMQ vs Kafka

维度RocketMQKafka
适用场景订单、支付、金融日志、流处理
可靠性金融级标准级
功能特性丰富简洁
生态Java 友好多语言
学习曲线中等较低

选型建议


参考资料


分享这篇文章到:

上一篇文章
AI 应用性能优化实战案例
下一篇文章
Kafka KRaft 深度解析与实战