RocketMQ 顺序消息是其核心特性之一,广泛应用于订单处理、流程审批等需要保证顺序的场景。本文将深入探讨顺序消息的实现原理和实战应用。
一、顺序消息基础
1.1 什么是顺序消息?
顺序消息是指消息的消费顺序与发送顺序一致:
sequenceDiagram
participant P as Producer
participant MQ as RocketMQ
participant C as Consumer
P->>MQ: 消息 1(订单创建)
P->>MQ: 消息 2(订单支付)
P->>MQ: 消息 3(订单发货)
MQ-->>C: 消息 1
MQ-->>C: 消息 2
MQ-->>C: 消息 3
note over C: 按顺序消费
1.2 顺序类型
| 类型 | 说明 | 适用场景 | 性能 |
|---|---|---|---|
| 全局顺序 | 所有消息严格有序 | 队列、任务调度 | 低 |
| 分区顺序 | 同一 Key 消息有序 | 订单、用户 | 高 |
1.3 应用场景
| 场景 | 顺序要求 | 类型 |
|---|---|---|
| 订单流程 | 创建→支付→发货→完成 | 分区顺序 |
| 证券交易 | 委托→成交→清算 | 全局顺序 |
| 状态同步 | 状态变更按顺序 | 分区顺序 |
| 日志收集 | 无需顺序 | 无序 |
二、实现原理
2.1 全局顺序
graph TB
subgraph Topic
Q1[Queue 1]
end
subgraph Producer
P1[Producer]
end
subgraph Consumer
C1[Consumer]
end
P1 -->|所有消息 | Q1
Q1 -->|顺序消费 | C1
note over Q1: 单队列保证顺序
特点:
- 所有消息写入同一个 Queue
- 单线程顺序消费
- 性能最低,QPS 约 1000
2.2 分区顺序
graph TB
subgraph Topic
Q1[Queue 1<br/>订单 A]
Q2[Queue 2<br/>订单 B]
Q3[Queue 3<br/>订单 C]
end
P1[订单 A 消息] --> Q1
P2[订单 B 消息] --> Q2
P3[订单 C 消息] --> Q3
C1[Consumer 1] --> Q1
C2[Consumer 2] --> Q2
C3[Consumer 3] --> Q3
note over Q1,Q3: 同一订单有序<br/>不同订单并行
特点:
- 相同 Key 的消息写入同一 Queue
- 不同 Key 可并行消费
- 性能较高,QPS 约 10 万
2.3 队列选择
// MessageQueueSelector 实现
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 根据订单 ID 选择队列
String orderId = (String) arg;
int index = Math.abs(orderId.hashCode()) % mqs.size();
return mqs.get(index);
}
}, orderId);
三、代码实现
3.1 生产者
@Service
public class OrderProducerService {
@Autowired
private DefaultMQProducer producer;
/**
* 发送顺序消息
*/
public void sendOrderMessage(Order order, OrderAction action) {
// 1. 构建消息
OrderMessage msg = new OrderMessage(order.getId(), action);
Message message = new Message(
"order-topic",
"order-action",
order.getId().toString(),
JSON.toJSONString(msg).getBytes()
);
// 2. 选择队列(保证同一订单发送到同一队列)
SendResult result = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long orderId = (Long) arg;
// 根据订单 ID 哈希选择队列
int index = Math.abs(orderId.hashCode()) % mqs.size();
return mqs.get(index);
}
}, order.getId());
log.info("顺序消息发送成功:orderId={}, msgId={}",
order.getId(), result.getMsgId());
}
}
3.2 消费者(顺序消费)
@Service
public class OrderConsumerService {
@PostConstruct
public void init() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order-consumer-group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("order-topic", "order-action");
// 注册顺序消息监听器
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
for (MessageExt msg : msgs) {
try {
// 解析消息
OrderMessage orderMsg = JSON.parseObject(
new String(msg.getBody()), OrderMessage.class);
// 处理订单消息
processOrderMessage(orderMsg);
log.info("订单消息处理成功:orderId={}, action={}",
orderMsg.getOrderId(), orderMsg.getAction());
} catch (Exception e) {
log.error("订单消息处理失败:orderId={}",
orderMsg.getOrderId(), e);
// 返回重试,保持顺序
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
return ConsumeOrderlyStatus.SUCCESS;
});
consumer.start();
log.info("顺序消息消费者启动成功");
}
private void processOrderMessage(OrderMessage msg) {
switch (msg.getAction()) {
case CREATE:
orderService.create(msg.getOrder());
break;
case PAY:
orderService.pay(msg.getOrder());
break;
case SHIP:
orderService.ship(msg.getOrder());
break;
case COMPLETE:
orderService.complete(msg.getOrder());
break;
}
}
}
3.3 状态机设计
/**
* 订单状态机
*/
public class OrderStateMachine {
private static final StateMachine<OrderStatus> machine;
static {
StateMachineBuilder<OrderStatus> builder =
StateMachineBuilder.newBuilder();
// 定义状态转换
builder.configure()
.withStates()
.initial(OrderStatus.CREATED)
.states(EnumSet.allOf(OrderStatus.class))
.and()
.withTransitions()
.from(OrderStatus.CREATED).to(OrderStatus.PAID).event("pay")
.from(OrderStatus.PAID).to(OrderStatus.SHIPPED).event("ship")
.from(OrderStatus.SHIPPED).to(OrderStatus.COMPLETED).event("complete");
machine = builder.build();
}
/**
* 处理订单动作
*/
public boolean process(Order order, OrderAction action) {
OrderStatus currentState = order.getStatus();
OrderStatus nextState = machine.transition(currentState, action.name());
if (nextState != null) {
order.setStatus(nextState);
orderService.update(order);
return true;
}
return false;
}
}
四、并发控制
4.1 分布式锁
@Service
public class OrderConsumerWithLock {
@Autowired
private RedissonClient redisson;
@PostConstruct
public void init() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order-consumer-group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("order-topic", "order-action");
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
for (MessageExt msg : msgs) {
OrderMessage orderMsg = JSON.parseObject(
new String(msg.getBody()), OrderMessage.class);
// 获取分布式锁
RLock lock = redisson.getLock("order:" + orderMsg.getOrderId());
try {
if (lock.tryLock(5, 10, TimeUnit.SECONDS)) {
try {
processOrderMessage(orderMsg);
} finally {
lock.unlock();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
return ConsumeOrderlyStatus.SUCCESS;
});
consumer.start();
}
}
4.2 数据库锁
@Service
public class OrderConsumerWithDBLock {
@Autowired
private OrderMapper orderMapper;
@Transactional(rollbackFor = Exception.class)
public void processOrderMessage(OrderMessage msg) {
// 1. 使用 SELECT FOR UPDATE 加锁
Order order = orderMapper.selectForUpdate(msg.getOrderId());
if (order == null) {
throw new BusinessException("订单不存在");
}
// 2. 检查状态
if (!canTransition(order.getStatus(), msg.getAction())) {
log.warn("状态不允许转换:orderId={}, current={}, action={}",
msg.getOrderId(), order.getStatus(), msg.getAction());
return;
}
// 3. 更新状态
order.setStatus(transition(order.getStatus(), msg.getAction()));
orderMapper.update(order);
// 4. 记录日志
orderLogMapper.insert(new OrderLog(msg.getOrderId(), msg.getAction()));
}
private boolean canTransition(OrderStatus current, OrderAction action) {
// 状态机检查
return true;
}
}
五、异常处理
5.1 消费失败处理
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
for (MessageExt msg : msgs) {
try {
processOrderMessage(msg);
} catch (BusinessException e) {
// 业务异常,记录日志,跳过
log.error("业务异常:orderId={}", getOrderId(msg), e);
continue;
} catch (Exception e) {
// 系统异常,返回重试
log.error("系统异常:orderId={}", getOrderId(msg), e);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
return ConsumeOrderlyStatus.SUCCESS;
});
5.2 重试机制
# 顺序消息重试配置
maxReconsumeTimes=16 # 最大重试次数
suspendCurrentQueueTimeMillis=1000 # 挂起时间 1 秒
重试间隔:
第 1 次:1 秒
第 2 次:5 秒
第 3 次:10 秒
第 4 次:30 秒
第 5 次:1 分钟
...
第 16 次:2 小时
5.3 死信处理
/**
* 死信消息处理
*/
@Component
public class DeadLetterHandler {
@RocketMQMessageListener(
topic = "%DLQ%order-consumer-group",
consumerGroup = "dead-letter-consumer-group"
)
public class DeadLetterListener implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt msg) {
// 1. 记录死信消息
deadLetterMapper.insert(new DeadLetter(msg));
// 2. 发送告警
alertService.send("订单消息进入死信队列:orderId=" + getOrderId(msg));
// 3. 人工处理或补偿
// ...
}
}
}
六、性能优化
6.1 队列数量配置
| 场景 | 队列数 | 并发消费者 | QPS |
|---|---|---|---|
| 低并发 | 4 | 4 | 4000 |
| 中并发 | 16 | 16 | 16000 |
| 高并发 | 64 | 32 | 64000 |
6.2 批量消费
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
// 批量处理同一订单的消息
Map<Long, List<MessageExt>> orderMessages = new HashMap<>();
for (MessageExt msg : msgs) {
Long orderId = getOrderId(msg);
orderMessages.computeIfAbsent(orderId, k -> new ArrayList<>()).add(msg);
}
// 按订单批量处理
for (Map.Entry<Long, List<MessageExt>> entry : orderMessages.entrySet()) {
processBatch(entry.getKey(), entry.getValue());
}
return ConsumeOrderlyStatus.SUCCESS;
});
6.3 异步处理
// 顺序消息 + 异步处理
public class AsyncOrderProcessor {
private final ExecutorService executor =
Executors.newSingleThreadExecutor();
public void processOrderMessage(MessageExt msg) {
OrderMessage orderMsg = parse(msg);
// 提交到单线程队列,保证顺序
executor.submit(() -> {
try {
businessService.process(orderMsg);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}
七、监控与告警
7.1 监控指标
| 指标 | 说明 | 告警阈值 |
|---|---|---|
| 顺序消息 TPS | 每秒处理的消息数 | - |
| 消费延迟 | 消息堆积数量 | > 1000 |
| 消费失败率 | 失败消息比例 | > 1% |
| 死信数量 | 死信队列消息数 | > 10 |
7.2 监控实现
@Component
public class OrderMessageMetrics {
@Autowired
private MeterRegistry meterRegistry;
private final AtomicLong processCount = new AtomicLong();
private final AtomicLong failCount = new AtomicLong();
private final AtomicLong delayMillis = new AtomicLong();
@PostConstruct
public void init() {
meterRegistry.gauge("order.message.process.count", processCount);
meterRegistry.gauge("order.message.fail.count", failCount);
meterRegistry.gauge("order.message.delay.millis", delayMillis);
}
public void recordProcess(long delay) {
processCount.incrementAndGet();
delayMillis.set(delay);
}
public void recordFail() {
failCount.incrementAndGet();
}
public double getFailRate() {
long total = processCount.get() + failCount.get();
return total > 0 ? (double) failCount.get() / total : 0;
}
}
八、最佳实践
8.1 设计建议
| 建议 | 说明 |
|---|---|
| 优先分区顺序 | 性能更好,满足大部分场景 |
| 合理设计队列数 | 根据并发量调整 |
| 实现幂等性 | 防止重复消费 |
| 状态机管理 | 保证状态转换正确 |
8.2 代码模板
/**
* 顺序消息处理模板
*/
public abstract class AbstractOrderlyListener implements MessageListenerOrderly {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
try {
// 1. 解析消息
Object message = parseMessage(msg);
// 2. 业务处理
processBusiness(message);
// 3. 记录成功
recordSuccess(msg);
} catch (BusinessException e) {
// 业务异常,记录日志
log.error("业务异常", e);
recordFail(msg);
} catch (Exception e) {
// 系统异常,重试
log.error("系统异常", e);
recordFail(msg);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
protected abstract Object parseMessage(MessageExt msg);
protected abstract void processBusiness(Object message);
protected abstract void recordSuccess(MessageExt msg);
protected abstract void recordFail(MessageExt msg);
}
总结
RocketMQ 顺序消息的核心要点:
- 顺序类型:全局顺序(单队列)、分区顺序(Key 哈希)
- 实现原理:队列选择器 + 顺序消费监听器
- 并发控制:分布式锁、数据库锁保证顺序
- 异常处理:重试机制、死信队列
- 性能优化:合理队列数、批量消费、异步处理
核心要点:
- 优先使用分区顺序,性能更好
- 实现业务幂等性防止重复
- 使用状态机管理状态转换
- 监控消费延迟和失败率
参考资料
- RocketMQ 顺序消息官方文档
- RocketMQ 源码
- 《RocketMQ 技术内幕》第 6 章