Skip to content
清晨的一缕阳光
返回

RocketMQ 顺序消息详解与实战

RocketMQ 顺序消息是其核心特性之一,广泛应用于订单处理、流程审批等需要保证顺序的场景。本文将深入探讨顺序消息的实现原理和实战应用。

一、顺序消息基础

1.1 什么是顺序消息?

顺序消息是指消息的消费顺序与发送顺序一致:

sequenceDiagram
    participant P as Producer
    participant MQ as RocketMQ
    participant C as Consumer
    
    P->>MQ: 消息 1(订单创建)
    P->>MQ: 消息 2(订单支付)
    P->>MQ: 消息 3(订单发货)
    
    MQ-->>C: 消息 1
    MQ-->>C: 消息 2
    MQ-->>C: 消息 3
    
    note over C: 按顺序消费

1.2 顺序类型

类型说明适用场景性能
全局顺序所有消息严格有序队列、任务调度
分区顺序同一 Key 消息有序订单、用户

1.3 应用场景

场景顺序要求类型
订单流程创建→支付→发货→完成分区顺序
证券交易委托→成交→清算全局顺序
状态同步状态变更按顺序分区顺序
日志收集无需顺序无序

二、实现原理

2.1 全局顺序

graph TB
    subgraph Topic
        Q1[Queue 1]
    end
    
    subgraph Producer
        P1[Producer]
    end
    
    subgraph Consumer
        C1[Consumer]
    end
    
    P1 -->|所有消息 | Q1
    Q1 -->|顺序消费 | C1
    
    note over Q1: 单队列保证顺序

特点

2.2 分区顺序

graph TB
    subgraph Topic
        Q1[Queue 1<br/>订单 A]
        Q2[Queue 2<br/>订单 B]
        Q3[Queue 3<br/>订单 C]
    end
    
    P1[订单 A 消息] --> Q1
    P2[订单 B 消息] --> Q2
    P3[订单 C 消息] --> Q3
    
    C1[Consumer 1] --> Q1
    C2[Consumer 2] --> Q2
    C3[Consumer 3] --> Q3
    
    note over Q1,Q3: 同一订单有序<br/>不同订单并行

特点

2.3 队列选择

// MessageQueueSelector 实现
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        // 根据订单 ID 选择队列
        String orderId = (String) arg;
        int index = Math.abs(orderId.hashCode()) % mqs.size();
        return mqs.get(index);
    }
}, orderId);

三、代码实现

3.1 生产者

@Service
public class OrderProducerService {
    
    @Autowired
    private DefaultMQProducer producer;
    
    /**
     * 发送顺序消息
     */
    public void sendOrderMessage(Order order, OrderAction action) {
        // 1. 构建消息
        OrderMessage msg = new OrderMessage(order.getId(), action);
        Message message = new Message(
            "order-topic",
            "order-action",
            order.getId().toString(),
            JSON.toJSONString(msg).getBytes()
        );
        
        // 2. 选择队列(保证同一订单发送到同一队列)
        SendResult result = producer.send(message, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                Long orderId = (Long) arg;
                // 根据订单 ID 哈希选择队列
                int index = Math.abs(orderId.hashCode()) % mqs.size();
                return mqs.get(index);
            }
        }, order.getId());
        
        log.info("顺序消息发送成功:orderId={}, msgId={}", 
            order.getId(), result.getMsgId());
    }
}

3.2 消费者(顺序消费)

@Service
public class OrderConsumerService {
    
    @PostConstruct
    public void init() throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order-consumer-group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("order-topic", "order-action");
        
        // 注册顺序消息监听器
        consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
            for (MessageExt msg : msgs) {
                try {
                    // 解析消息
                    OrderMessage orderMsg = JSON.parseObject(
                        new String(msg.getBody()), OrderMessage.class);
                    
                    // 处理订单消息
                    processOrderMessage(orderMsg);
                    
                    log.info("订单消息处理成功:orderId={}, action={}", 
                        orderMsg.getOrderId(), orderMsg.getAction());
                } catch (Exception e) {
                    log.error("订单消息处理失败:orderId={}", 
                        orderMsg.getOrderId(), e);
                    
                    // 返回重试,保持顺序
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            }
            return ConsumeOrderlyStatus.SUCCESS;
        });
        
        consumer.start();
        log.info("顺序消息消费者启动成功");
    }
    
    private void processOrderMessage(OrderMessage msg) {
        switch (msg.getAction()) {
            case CREATE:
                orderService.create(msg.getOrder());
                break;
            case PAY:
                orderService.pay(msg.getOrder());
                break;
            case SHIP:
                orderService.ship(msg.getOrder());
                break;
            case COMPLETE:
                orderService.complete(msg.getOrder());
                break;
        }
    }
}

3.3 状态机设计

/**
 * 订单状态机
 */
public class OrderStateMachine {
    
    private static final StateMachine<OrderStatus> machine;
    
    static {
        StateMachineBuilder<OrderStatus> builder = 
            StateMachineBuilder.newBuilder();
        
        // 定义状态转换
        builder.configure()
            .withStates()
                .initial(OrderStatus.CREATED)
                .states(EnumSet.allOf(OrderStatus.class))
            .and()
            .withTransitions()
                .from(OrderStatus.CREATED).to(OrderStatus.PAID).event("pay")
                .from(OrderStatus.PAID).to(OrderStatus.SHIPPED).event("ship")
                .from(OrderStatus.SHIPPED).to(OrderStatus.COMPLETED).event("complete");
        
        machine = builder.build();
    }
    
    /**
     * 处理订单动作
     */
    public boolean process(Order order, OrderAction action) {
        OrderStatus currentState = order.getStatus();
        OrderStatus nextState = machine.transition(currentState, action.name());
        
        if (nextState != null) {
            order.setStatus(nextState);
            orderService.update(order);
            return true;
        }
        
        return false;
    }
}

四、并发控制

4.1 分布式锁

@Service
public class OrderConsumerWithLock {
    
    @Autowired
    private RedissonClient redisson;
    
    @PostConstruct
    public void init() throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order-consumer-group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("order-topic", "order-action");
        
        consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
            for (MessageExt msg : msgs) {
                OrderMessage orderMsg = JSON.parseObject(
                    new String(msg.getBody()), OrderMessage.class);
                
                // 获取分布式锁
                RLock lock = redisson.getLock("order:" + orderMsg.getOrderId());
                
                try {
                    if (lock.tryLock(5, 10, TimeUnit.SECONDS)) {
                        try {
                            processOrderMessage(orderMsg);
                        } finally {
                            lock.unlock();
                        }
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            }
            return ConsumeOrderlyStatus.SUCCESS;
        });
        
        consumer.start();
    }
}

4.2 数据库锁

@Service
public class OrderConsumerWithDBLock {
    
    @Autowired
    private OrderMapper orderMapper;
    
    @Transactional(rollbackFor = Exception.class)
    public void processOrderMessage(OrderMessage msg) {
        // 1. 使用 SELECT FOR UPDATE 加锁
        Order order = orderMapper.selectForUpdate(msg.getOrderId());
        
        if (order == null) {
            throw new BusinessException("订单不存在");
        }
        
        // 2. 检查状态
        if (!canTransition(order.getStatus(), msg.getAction())) {
            log.warn("状态不允许转换:orderId={}, current={}, action={}", 
                msg.getOrderId(), order.getStatus(), msg.getAction());
            return;
        }
        
        // 3. 更新状态
        order.setStatus(transition(order.getStatus(), msg.getAction()));
        orderMapper.update(order);
        
        // 4. 记录日志
        orderLogMapper.insert(new OrderLog(msg.getOrderId(), msg.getAction()));
    }
    
    private boolean canTransition(OrderStatus current, OrderAction action) {
        // 状态机检查
        return true;
    }
}

五、异常处理

5.1 消费失败处理

consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
    for (MessageExt msg : msgs) {
        try {
            processOrderMessage(msg);
        } catch (BusinessException e) {
            // 业务异常,记录日志,跳过
            log.error("业务异常:orderId={}", getOrderId(msg), e);
            continue;
        } catch (Exception e) {
            // 系统异常,返回重试
            log.error("系统异常:orderId={}", getOrderId(msg), e);
            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
        }
    }
    return ConsumeOrderlyStatus.SUCCESS;
});

5.2 重试机制

# 顺序消息重试配置
maxReconsumeTimes=16  # 最大重试次数
suspendCurrentQueueTimeMillis=1000  # 挂起时间 1 秒

重试间隔

第 1 次:1 秒
第 2 次:5 秒
第 3 次:10 秒
第 4 次:30 秒
第 5 次:1 分钟
...
第 16 次:2 小时

5.3 死信处理

/**
 * 死信消息处理
 */
@Component
public class DeadLetterHandler {
    
    @RocketMQMessageListener(
        topic = "%DLQ%order-consumer-group",
        consumerGroup = "dead-letter-consumer-group"
    )
    public class DeadLetterListener implements RocketMQListener<MessageExt> {
        
        @Override
        public void onMessage(MessageExt msg) {
            // 1. 记录死信消息
            deadLetterMapper.insert(new DeadLetter(msg));
            
            // 2. 发送告警
            alertService.send("订单消息进入死信队列:orderId=" + getOrderId(msg));
            
            // 3. 人工处理或补偿
            // ...
        }
    }
}

六、性能优化

6.1 队列数量配置

场景队列数并发消费者QPS
低并发444000
中并发161616000
高并发643264000

6.2 批量消费

consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
    // 批量处理同一订单的消息
    Map<Long, List<MessageExt>> orderMessages = new HashMap<>();
    
    for (MessageExt msg : msgs) {
        Long orderId = getOrderId(msg);
        orderMessages.computeIfAbsent(orderId, k -> new ArrayList<>()).add(msg);
    }
    
    // 按订单批量处理
    for (Map.Entry<Long, List<MessageExt>> entry : orderMessages.entrySet()) {
        processBatch(entry.getKey(), entry.getValue());
    }
    
    return ConsumeOrderlyStatus.SUCCESS;
});

6.3 异步处理

// 顺序消息 + 异步处理
public class AsyncOrderProcessor {
    private final ExecutorService executor = 
        Executors.newSingleThreadExecutor();
    
    public void processOrderMessage(MessageExt msg) {
        OrderMessage orderMsg = parse(msg);
        
        // 提交到单线程队列,保证顺序
        executor.submit(() -> {
            try {
                businessService.process(orderMsg);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }
}

七、监控与告警

7.1 监控指标

指标说明告警阈值
顺序消息 TPS每秒处理的消息数-
消费延迟消息堆积数量> 1000
消费失败率失败消息比例> 1%
死信数量死信队列消息数> 10

7.2 监控实现

@Component
public class OrderMessageMetrics {
    
    @Autowired
    private MeterRegistry meterRegistry;
    
    private final AtomicLong processCount = new AtomicLong();
    private final AtomicLong failCount = new AtomicLong();
    private final AtomicLong delayMillis = new AtomicLong();
    
    @PostConstruct
    public void init() {
        meterRegistry.gauge("order.message.process.count", processCount);
        meterRegistry.gauge("order.message.fail.count", failCount);
        meterRegistry.gauge("order.message.delay.millis", delayMillis);
    }
    
    public void recordProcess(long delay) {
        processCount.incrementAndGet();
        delayMillis.set(delay);
    }
    
    public void recordFail() {
        failCount.incrementAndGet();
    }
    
    public double getFailRate() {
        long total = processCount.get() + failCount.get();
        return total > 0 ? (double) failCount.get() / total : 0;
    }
}

八、最佳实践

8.1 设计建议

建议说明
优先分区顺序性能更好,满足大部分场景
合理设计队列数根据并发量调整
实现幂等性防止重复消费
状态机管理保证状态转换正确

8.2 代码模板

/**
 * 顺序消息处理模板
 */
public abstract class AbstractOrderlyListener implements MessageListenerOrderly {
    
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, 
                                               ConsumeOrderlyContext context) {
        for (MessageExt msg : msgs) {
            try {
                // 1. 解析消息
                Object message = parseMessage(msg);
                
                // 2. 业务处理
                processBusiness(message);
                
                // 3. 记录成功
                recordSuccess(msg);
            } catch (BusinessException e) {
                // 业务异常,记录日志
                log.error("业务异常", e);
                recordFail(msg);
            } catch (Exception e) {
                // 系统异常,重试
                log.error("系统异常", e);
                recordFail(msg);
                return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
            }
        }
        return ConsumeOrderlyStatus.SUCCESS;
    }
    
    protected abstract Object parseMessage(MessageExt msg);
    protected abstract void processBusiness(Object message);
    protected abstract void recordSuccess(MessageExt msg);
    protected abstract void recordFail(MessageExt msg);
}

总结

RocketMQ 顺序消息的核心要点:

  1. 顺序类型:全局顺序(单队列)、分区顺序(Key 哈希)
  2. 实现原理:队列选择器 + 顺序消费监听器
  3. 并发控制:分布式锁、数据库锁保证顺序
  4. 异常处理:重试机制、死信队列
  5. 性能优化:合理队列数、批量消费、异步处理

核心要点

参考资料


分享这篇文章到:

上一篇文章
Skill 系统设计与实现
下一篇文章
多 Agent 协作框架实战