RocketMQ 是阿里巴巴开源的分布式消息中间件,具有高吞吐、低延迟、高可靠等特性,广泛应用于电商、金融、物流等领域。本文将深入探讨 RocketMQ 的架构设计和核心原理。
一、RocketMQ 简介
1.1 核心特性
| 特性 | 说明 |
|---|---|
| 高吞吐 | 百万级 TPS,万亿级消息堆积 |
| 低延迟 | 毫秒级消息延迟 |
| 高可靠 | 金融级可靠性,支持事务消息 |
| 顺序消息 | 支持全局/分区顺序消息 |
| 定时消息 | 支持多级延迟/定时消息 |
| 消息回溯 | 支持按时间回溯消费 |
1.2 应用场景
graph TB
subgraph 应用场景
A[异步解耦] --> B[微服务通信]
C[削峰填谷] --> D[秒杀活动]
E[数据同步] --> F[CDC/ETL]
G[事务消息] --> H[分布式事务]
I[顺序消息] --> J[订单流程]
end
1.3 与 Kafka 对比
| 特性 | RocketMQ | Kafka |
|---|---|---|
| 吞吐量 | 10 万级 TPS | 百万级 TPS |
| 延迟 | 毫秒级 | 毫秒级 |
| 可靠性 | 不丢消息 | 可能丢消息 |
| 顺序消息 | 支持 | 分区有序 |
| 定时消息 | 支持 | 不支持 |
| 事务消息 | 支持 | 不支持 |
| 消息回溯 | 支持 | 支持 |
| 生态 | Java 友好 | 多语言 |
二、架构概览
2.1 核心组件
graph TB
subgraph Producer
P1[Producer Group 1]
P2[Producer Group 2]
end
subgraph NameServer
NS1[NameServer 1]
NS2[NameServer 2]
end
subgraph Broker Cluster
B1[Broker Master 1]
B2[Broker Slave 1]
B3[Broker Master 2]
B4[Broker Slave 2]
end
subgraph Consumer
C1[Consumer Group 1]
C2[Consumer Group 2]
end
P1 --> NS1
P2 --> NS2
NS1 --> B1
NS2 --> B3
B1 --> C1
B3 --> C2
核心组件说明:
| 组件 | 说明 | 特点 |
|---|---|---|
| NameServer | 命名服务,路由发现 | 无状态、对等、轻量级 |
| Broker | 消息服务器,存储转发 | 主从架构、高可用 |
| Producer | 消息生产者 | 支持集群、负载均衡 |
| Consumer | 消息消费者 | 支持集群、广播模式 |
| Topic | 消息主题 | 逻辑分类、多队列 |
| Queue | 消息队列 | 物理存储、有序 |
| Consumer Group | 消费者组 | 负载均衡、容错 |
2.2 部署架构
单 Master 模式:
NameServer × 2
Broker × 1 (Master)
多 Master 模式:
NameServer × 2
Broker-M1 × 1
Broker-M2 × 1
Broker-M3 × 1
多 Master 多 Slave 模式(推荐):
NameServer × 2
Broker-M1 + Broker-S1 (异步复制)
Broker-M2 + Broker-S2 (异步复制)
Broker-M3 + Broker-S3 (同步双写)
Dledger 模式(高可用):
NameServer × 2
Broker-DLedger-Group × 3 (Raft 共识)
三、NameServer 详解
3.1 核心功能
graph LR
subgraph NameServer 功能
A[路由管理] --> B[Broker 注册]
C[路由发现] --> D[Topic 查询]
E[心跳检测] --> F[Broker 存活]
end
工作原理:
- ✅ Broker 启动时向所有 NameServer 注册
- ✅ Broker 每 30 秒发送心跳
- ✅ NameServer 10 秒未收到心跳,认为 Broker 下线
- ✅ Producer/Consumer 从 NameServer 获取路由信息
3.2 路由发现
// Producer 获取路由
TopicRouteData routeData = mqClientInstance.getMQClientAPIImpl()
.getTopicRouteInfoFromNameServer(topic, timeout);
// 路由信息包含
class TopicRouteData {
private List<QueueData> queueDatas; // 队列信息
private List<BrokerData> brokerDatas; // Broker 信息
}
class BrokerData {
private String brokerName;
private String brokerAddr; // Master 地址
private String haServerAddr; // Slave 地址
}
四、Broker 详解
4.1 存储架构
graph TB
subgraph Broker 存储
CommitLog[CommitLog<br/>顺序写入]
ConsumeQueue[ConsumeQueue<br/>索引文件]
IndexFile[IndexFile<br/>Key 索引]
end
Producer --> CommitLog
CommitLog --> ConsumeQueue
CommitLog --> IndexFile
Consumer --> ConsumeQueue
存储文件说明:
| 文件 | 说明 | 大小 |
|---|---|---|
| CommitLog | 消息主体,顺序写入 | 1GB/个 |
| ConsumeQueue | 消费队列索引 | 30 万条/个 |
| IndexFile | Key 索引,便于查询 | 1 小时/个 |
4.2 消息存储流程
sequenceDiagram
participant Producer
participant Broker
participant CommitLog
participant ConsumeQueue
Producer->>Broker: 发送消息
Broker->>CommitLog: 追加写入
Broker->>ConsumeQueue: 创建索引
Broker-->>Producer: 返回 ACK
4.3 主从复制
异步复制:
sequenceDiagram
participant Producer
participant Master
participant Slave
Producer->>Master: 写入消息
Master-->>Producer: 返回 ACK
Master->>Slave: 异步复制
Slave-->>Master: 确认
同步双写:
sequenceDiagram
participant Producer
participant Master
participant Slave
Producer->>Master: 写入消息
Master->>Slave: 同步复制
Slave-->>Master: 确认
Master-->>Producer: 返回 ACK
配置示例:
# Broker 角色
brokerRole=ASYNC_MASTER # ASYNC_MASTER/SYNC_MASTER/SLAVE
# 刷盘方式
flushDiskType=ASYNC_FLUSH # ASYNC_FLUSH/SYNC_FLUSH
# 复制方式
brokerFailoverTimeout=5000 # 故障转移超时
五、Producer 详解
5.1 发送流程
sequenceDiagram
participant App as 应用程序
participant Producer as Producer
participant NS as NameServer
participant Broker as Broker
App->>Producer: 发送消息
Producer->>NS: 获取路由
NS-->>Producer: 返回路由
Producer->>Broker: 选择队列
Producer->>Broker: 发送消息
Broker-->>Producer: 返回 ACK
Producer-->>App: 返回结果
5.2 发送方式
// 1. 同步发送
SendResult result = producer.send(msg);
if (result.getSendStatus() == SendStatus.SEND_OK) {
// 发送成功
}
// 2. 异步发送
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult result) {
// 发送成功
}
@Override
public void onException(Throwable e) {
// 发送失败
}
});
// 3. 单向发送(不关心结果)
producer.sendOneway(msg);
5.3 消息类型
普通消息:
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());
SendResult result = producer.send(msg);
顺序消息:
// 分区有序(推荐)
Message msg = new Message("OrderTopic", "TagA",
"OrderID: 123".getBytes());
SendResult result = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> queues,
Message msg, Object arg) {
Long orderId = (Long) arg;
// 相同 OrderID 发送到同一队列
return queues.get((int) (orderId % queues.size()));
}
}, 123L);
定时消息:
Message msg = new Message("TopicTest", "Hello".getBytes());
// 延迟级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
msg.setDelayTimeLevel(3); // 10 秒后投递
SendResult result = producer.send(msg);
事务消息:
TransactionMQProducer producer = new TransactionMQProducer("producer_group");
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(
Message msg, Object arg) {
// 执行本地事务
boolean success = doLocalTransaction(msg);
if (success) {
return LocalTransactionState.COMMIT_MESSAGE;
} else {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(
ExtendedMessageQueueExt msgExt) {
// 检查本地事务状态
boolean success = checkTransaction(msgExt);
return success ? LocalTransactionState.COMMIT_MESSAGE
: LocalTransactionState.ROLLBACK_MESSAGE;
}
});
六、Consumer 详解
6.1 消费模式
集群消费:
graph TB
subgraph Consumer Group
C1[Consumer 1]
C2[Consumer 2]
end
subgraph Topic
Q1[Queue 1]
Q2[Queue 2]
Q3[Queue 3]
Q4[Queue 4]
end
C1 --> Q1
C1 --> Q2
C2 --> Q3
C2 --> Q4
广播消费:
graph TB
subgraph Consumer Group
C1[Consumer 1]
C2[Consumer 2]
end
subgraph Topic
Q1[Queue 1]
Q2[Queue 2]
end
C1 --> Q1
C1 --> Q2
C2 --> Q1
C2 --> Q2
6.2 消费方式
// 1. 推模式(Push)
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 2. 拉模式(Pull)
PullResult result = consumer.pull(new MessageQueue("TopicA", "BrokerA", 0),
"*", 0, 32);
for (MessageExt msg : result.getMsgFoundList()) {
System.out.println(new String(msg.getBody()));
}
6.3 消息重试
// 消费失败自动重试
consumer.setConsumeTimeout(15, TimeUnit.MINUTES);
consumer.setMaxReconsumeTimes(16); // 最大重试次数
// 重试队列:%RETRY%ConsumerGroup
// 重试间隔:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 30m 1h 2h 3h 4h 5h 6h 7h 8h 9h 10h 11h 12h 13h 14h 15h 16h 17h 18h 19h 20h 21h 22h 23h 1d
6.4 消费进度
// 自动提交 Offset(默认)
consumer.setOffsetStoreAutoCommit(true);
// 手动提交 Offset
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
// 处理消息
processMessage(msgs);
// 提交 Offset
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
七、实战应用
7.1 Spring Boot 集成
Maven 依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
配置文件:
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: order-producer-group
send-message-timeout: 3000
retry-times-when-send-failed: 2
consumer:
group: order-consumer-group
Producer 实现:
@Service
public class OrderProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendOrderCreated(OrderEvent event) {
rocketMQTemplate.convertAndSend("order-topic:TagA", event);
}
public void sendOrderDelayed(OrderEvent event, int delayLevel) {
Message<OrderEvent> message = MessageBuilder.withPayload(event).build();
rocketMQTemplate.syncSend("order-topic:TagA", message, 3000, delayLevel);
}
}
Consumer 实现:
@Component
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-consumer-group",
selectorExpression = "TagA",
messageModel = MessageModel.CLUSTERING,
consumeMode = ConsumeMode.CONCURRENTLY
)
public class OrderConsumer implements RocketMQListener<OrderEvent> {
@Override
public void onMessage(OrderEvent event) {
System.out.println("收到订单消息:" + event.getOrderId());
processOrder(event);
}
}
7.2 订单处理示例
// 订单事件
public class OrderEvent {
private String orderId;
private String userId;
private BigDecimal amount;
private OrderStatus status;
private LocalDateTime createTime;
}
// 订单创建 Producer
@Service
public class OrderCreateProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendOrderCreated(String orderId) {
OrderEvent event = new OrderEvent();
event.setOrderId(orderId);
event.setStatus(OrderStatus.CREATED);
// 发送到消息队列
rocketMQTemplate.convertAndSend("order-topic:OrderTag", event);
}
}
// 订单消费 Consumer
@Component
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-process-group",
selectorExpression = "OrderTag"
)
public class OrderProcessConsumer implements RocketMQListener<OrderEvent> {
@Autowired
private OrderService orderService;
@Override
@Transactional(rollbackFor = Exception.class)
public void onMessage(OrderEvent event) {
switch (event.getStatus()) {
case CREATED:
// 创建订单
orderService.createOrder(event);
break;
case PAID:
// 支付处理
orderService.processPayment(event);
break;
case SHIPPED:
// 发货处理
orderService.processShipment(event);
break;
}
}
}
7.3 事务消息示例
@Service
public class TransactionProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@PostConstruct
public void init() {
TransactionMQProducer producer = new TransactionMQProducer("tx-producer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(
Message message, Object o) {
try {
// 执行本地事务
OrderEvent event = JSON.parseObject(
new String(message.getBody()), OrderEvent.class);
orderService.createOrder(event);
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(
ExtendedMessageQueueExt message) {
// 检查事务状态
OrderEvent event = JSON.parseObject(
new String(message.getBody()), OrderEvent.class);
boolean exists = orderService.exists(event.getOrderId());
return exists ? LocalTransactionState.COMMIT_MESSAGE
: LocalTransactionState.ROLLBACK_MESSAGE;
}
});
producer.start();
}
}
八、性能优化
8.1 Producer 优化
// 批量发送
List<Message<String>> messages = Arrays.asList(msg1, msg2, msg3);
SendResult result = producer.send(messages);
// 异步发送
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult result) {
// 成功回调
}
@Override
public void onException(Throwable e) {
// 异常处理
}
});
8.2 Consumer 优化
// 调整消费线程数
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(64);
// 调整批量消费
consumer.setConsumeMessageBatchMaxSize(1);
// 调整拉取批次
consumer.setPullBatchSize(32);
8.3 Broker 优化
# 增加线程池
defaultThreadPoolNums=16
# 调整刷盘策略
flushDiskType=ASYNC_FLUSH
flushInterval=500
# 调整复制方式
brokerRole=ASYNC_MASTER
# 调整存储
mappedFileSizeCommitLog=1073741824
mappedFileSizeConsumeQueue=300000
九、监控与运维
9.1 关键指标
| 指标 | 说明 | 告警阈值 |
|---|---|---|
| TPS | 每秒消息数 | 持续 > 80% |
| 延迟 | 消息延迟时间 | > 1 秒 |
| 堆积 | 未消费消息数 | > 10 万 |
| Broker CPU | Broker CPU 使用率 | > 80% |
| Broker 内存 | Broker 内存使用率 | > 80% |
9.2 常用命令
# 查看 Topic 列表
mqadmin topicList -n 127.0.0.1:9876
# 查看 Topic 详情
mqadmin topicStatus -n 127.0.0.1:9876 -t TopicTest
# 查看 Consumer 进度
mqadmin consumerProgress -n 127.0.0.1:9876 -g consumer-group
# 重置消费 Offset
mqadmin resetOffset -n 127.0.0.1:9876 -t TopicTest -g consumer-group -s now
# 查看 Broker 状态
mqadmin brokerStatus -n 127.0.0.1:9876 -b 127.0.0.1:10911
十、总结
RocketMQ vs Kafka
| 维度 | RocketMQ | Kafka |
|---|---|---|
| 适用场景 | 订单、支付、金融 | 日志、流处理 |
| 可靠性 | 金融级 | 标准级 |
| 功能特性 | 丰富 | 简洁 |
| 生态 | Java 友好 | 多语言 |
| 学习曲线 | 中等 | 较低 |
选型建议
- ✅ 订单/支付系统 - RocketMQ(事务消息、顺序消息)
- ✅ 日志收集 - Kafka(高吞吐、生态完善)
- ✅ 流处理 - Kafka(与 Flink/Spark 集成)
- ✅ 金融场景 - RocketMQ(高可靠、不丢消息)
参考资料
- RocketMQ 官方文档
- 《RocketMQ 技术内幕》- 丁威
- 《RocketMQ 实战与原理解析》- 杨延浩