Skip to content
清晨的一缕阳光
返回

RocketMQ 生产实践案例精选

本文汇总了 RocketMQ 在生产环境中的典型应用案例,涵盖电商、金融、物流等多个场景,分享最佳实践和经验教训。

一、电商场景

1.1 订单系统

业务背景

电商平台日均订单量:100 万+
峰值 TPS:1 万+
订单状态流转:创建→支付→发货→完成

架构设计

graph TB
    subgraph 订单服务
        OS[订单创建]
    end
    
    subgraph RocketMQ
        T1[order-topic]
        T2[order-event-topic]
    end
    
    subgraph 下游系统
        IS[库存系统]
        PS[支付系统]
        LS[物流系统]
        AS[分析系统]
    end
    
    OS --> T1
    T1 --> T2
    T2 --> IS
    T2 --> PS
    T2 --> LS
    T2 --> AS

实现方案

@Service
public class OrderService {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 创建订单
     */
    @Transactional
    public Order createOrder(OrderDTO dto) {
        // 1. 创建订单
        Order order = new Order();
        order.setId(UUID.randomUUID().toString());
        order.setUserId(dto.getUserId());
        order.setAmount(dto.getAmount());
        order.setStatus(OrderStatus.CREATED);
        
        orderRepository.save(order);
        
        // 2. 发送订单创建事件(事务消息)
        Message msg = MessageBuilder.withPayload(order)
            .setHeader(MessageConst.PROPERTY_TRANSACTION_ID, UUID.randomUUID().toString())
            .build();
        
        rocketMQTemplate.sendMessageInTransaction("order-event-topic", msg, order);
        
        return order;
    }
}

// 库存扣减消费者
@Component
@RocketMQMessageListener(
    topic = "order-event-topic",
    consumerGroup = "inventory-consumer",
    selectorExpression = "type = 'ORDER_CREATED'"
)
public class InventoryConsumer implements RocketMQListener<OrderEvent> {
    
    @Autowired
    private InventoryService inventoryService;
    
    @Override
    public void onMessage(OrderEvent event) {
        // 扣减库存
        inventoryService.deductStock(event.getOrderId(), event.getItems());
    }
}

最佳实践

1. 使用事务消息保证订单创建和事件发送的原子性
2. 按订单 ID 设置 Key,保证同一订单消息顺序
3. 配置合理的重试机制,处理库存不足等异常
4. 实现幂等性,防止重复扣减库存
5. 监控订单处理延迟,及时发现异常

1.2 秒杀系统

业务背景

秒杀活动:限量 1000 件商品
峰值 QPS:10 万+
要求:不超卖、不重复购买

架构设计

graph TB
    subgraph 用户请求
        UR[秒杀请求]
    end
    
    subgraph 网关层
        GW[限流网关]
    end
    
    subgraph 缓存层
        RC[Redis 缓存]
    end
    
    subgraph 消息队列
        MQ[RocketMQ]
    end
    
    subgraph 数据库
        DB[MySQL]
    end
    
    UR --> GW
    GW --> RC
    RC -->|预扣减 | MQ
    MQ -->|异步处理 | DB

实现方案

@Service
public class SeckillService {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 秒杀下单
     */
    public SeckillResult seckill(Long userId, Long itemId) {
        // 1. 限流检查
        if (!rateLimiter.tryAcquire()) {
            return SeckillResult.fail("请求过于频繁");
        }
        
        // 2. 库存预扣减(Redis Lua 脚本)
        String script = 
            "local stock = redis.call('get', KEYS[1]) " +
            "if tonumber(stock) > 0 then " +
            "  redis.call('decr', KEYS[1]) " +
            "  return 1 " +
            "else " +
            "  return 0 " +
            "end";
        
        Long result = redisTemplate.execute(
            new DefaultRedisScript<>(script, Long.class),
            Collections.singletonList("seckill:stock:" + itemId)
        );
        
        if (result == 0) {
            return SeckillResult.fail("库存不足");
        }
        
        // 3. 发送秒杀消息(削峰填谷)
        SeckillOrder order = new SeckillOrder();
        order.setUserId(userId);
        order.setItemId(itemId);
        order.setCreateTime(System.currentTimeMillis());
        
        rocketMQTemplate.sendOneWay("seckill-topic", order);
        
        return SeckillResult.success("排队中,请稍后查看结果");
    }
}

// 异步处理消费者
@Component
@RocketMQMessageListener(
    topic = "seckill-topic",
    consumerGroup = "seckill-processor",
    consumeThreadMax = 100
)
public class SeckillProcessor implements RocketMQListener<SeckillOrder> {
    
    @Autowired
    private OrderService orderService;
    
    @Override
    public void onMessage(SeckillOrder order) {
        // 创建订单
        orderService.createSeckillOrder(order);
    }
}

最佳实践

1. Redis 预扣减库存,快速响应
2. RocketMQ 削峰填谷,保护数据库
3. 增加消费者并发度,加快处理速度
4. 实现幂等性,防止重复下单
5. 配置死信队列,处理失败订单

二、金融场景

2.1 支付系统

业务背景

支付交易:日均 50 万笔
可靠性要求:金融级
一致性要求:最终一致

架构设计

graph TB
    subgraph 支付网关
        PG[支付请求]
    end
    
    subgraph 支付服务
        PS[支付处理]
    end
    
    subgraph RocketMQ
        T1[payment-topic]
        T2[notify-topic]
    end
    
    subgraph 下游系统
        BS[银行系统]
        NS[通知系统]
        AS[账务系统]
    end
    
    PG --> PS
    PS --> T1
    T1 --> BS
    BS --> T2
    T2 --> NS
    T2 --> AS

实现方案

@Service
public class PaymentService {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    @Autowired
    private PaymentRepository paymentRepository;
    
    /**
     * 处理支付
     */
    @Transactional
    public PaymentResult processPayment(PaymentRequest request) {
        // 1. 创建支付记录
        Payment payment = new Payment();
        payment.setId(UUID.randomUUID().toString());
        payment.setOrderId(request.getOrderId());
        payment.setAmount(request.getAmount());
        payment.setStatus(PaymentStatus.PROCESSING);
        
        paymentRepository.save(payment);
        
        // 2. 发送支付请求到银行(事务消息)
        Message msg = MessageBuilder.withPayload(request)
            .setHeader(MessageConst.PROPERTY_TRANSACTION_ID, payment.getId())
            .build();
        
        TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
            "payment-topic", 
            msg, 
            payment
        );
        
        if (result.getSendStatus() != SendStatus.SEND_OK) {
            throw new BusinessException("发送支付请求失败");
        }
        
        return PaymentResult.processing(payment.getId());
    }
}

// 银行回调消费者
@Component
@RocketMQMessageListener(
    topic = "payment-callback-topic",
    consumerGroup = "payment-callback-consumer"
)
public class PaymentCallbackConsumer implements RocketMQListener<PaymentCallback> {
    
    @Autowired
    private PaymentService paymentService;
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    @Override
    @Transactional
    public void onMessage(PaymentCallback callback) {
        // 1. 更新支付状态
        Payment payment = paymentService.updatePaymentStatus(
            callback.getPaymentId(), 
            callback.getStatus()
        );
        
        // 2. 发送支付结果通知
        rocketMQTemplate.convertAndSend("notify-topic", 
            PaymentNotify.builder()
                .paymentId(payment.getId())
                .orderId(payment.getOrderId())
                .status(payment.getStatus())
                .build());
    }
}

最佳实践

1. 使用事务消息保证支付记录和消息发送的一致性
2. 实现幂等性,防止重复处理回调
3. 配置合理的重试机制,处理网络异常
4. 记录完整的审计日志
5. 实时监控支付成功率,及时发现异常

2.2 对账系统

业务背景

对账频率:每日一次
数据量:百万级交易
要求:准确、完整

实现方案

@Service
public class ReconciliationService {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 发起对账
     */
    @Scheduled(cron = "0 0 2 * * ?")  // 每天凌晨 2 点
    public void startReconciliation() {
        // 1. 获取昨日交易
        List<Transaction> transactions = getYesterdayTransactions();
        
        // 2. 分批发送对账消息
        int batchSize = 1000;
        for (int i = 0; i < transactions.size(); i += batchSize) {
            int end = Math.min(i + batchSize, transactions.size());
            List<Transaction> batch = transactions.subList(i, end);
            
            rocketMQTemplate.convertAndSend("reconciliation-topic", 
                ReconciliationBatch.builder()
                    .batchId(UUID.randomUUID().toString())
                    .transactions(batch)
                    .build());
        }
    }
}

// 对账处理消费者
@Component
@RocketMQMessageListener(
    topic = "reconciliation-topic",
    consumerGroup = "reconciliation-consumer",
    consumeThreadMax = 10
)
public class ReconciliationConsumer implements RocketMQListener<ReconciliationBatch> {
    
    @Autowired
    private BankChannel bankChannel;
    
    @Override
    public void onMessage(ReconciliationBatch batch) {
        // 1. 获取银行对账单
        BankStatement bankStatement = bankChannel.getStatement(
            batch.getDate(), 
            batch.getBatchId()
        );
        
        // 2. 逐笔核对
        List<ReconciliationResult> results = new ArrayList<>();
        for (Transaction tx : batch.getTransactions()) {
            ReconciliationResult result = reconcile(tx, bankStatement);
            results.add(result);
        }
        
        // 3. 发送对账结果
        rocketMQTemplate.convertAndSend("reconciliation-result-topic",
            ReconciliationReport.builder()
                .batchId(batch.getBatchId())
                .results(results)
                .build());
    }
    
    private ReconciliationResult reconcile(Transaction tx, BankStatement statement) {
        // 核对逻辑
        // ...
        return result;
    }
}

最佳实践

1. 批量处理,提高效率
2. 限制并发度,避免对银行系统造成压力
3. 记录详细对账日志,便于排查
4. 对账失败自动重试
5. 对账结果及时通知

三、物流场景

3.1 轨迹追踪

业务背景

物流订单:日均 200 万单
轨迹更新:实时
查询要求:低延迟

实现方案

@Service
public class LogisticsService {
    
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    
    /**
     * 更新物流轨迹
     */
    public void updateTrack(TrackUpdate update) {
        // 1. 保存到数据库
        trackRepository.save(update);
        
        // 2. 发送轨迹更新事件
        rocketMQTemplate.convertAndSend("track-update-topic",
            TrackEvent.builder()
                .orderId(update.getOrderId())
                .location(update.getLocation())
                .status(update.getStatus())
                .timestamp(update.getTimestamp())
                .build());
    }
}

// 轨迹推送消费者
@Component
@RocketMQMessageListener(
    topic = "track-update-topic",
    consumerGroup = "track-push-consumer"
)
public class TrackPushConsumer implements RocketMQListener<TrackEvent> {
    
    @Autowired
    private WebSocketService webSocketService;
    
    @Autowired
    private SmsService smsService;
    
    @Override
    public void onMessage(TrackEvent event) {
        // 1. 推送 WebSocket 通知
        webSocketService.sendToUser(event.getOrderId(), event);
        
        // 2. 关键节点发送短信
        if (isCriticalStatus(event.getStatus())) {
            smsService.sendTrackSms(event.getOrderId(), event.getStatus());
        }
    }
}

最佳实践

1. 轨迹更新使用单向发送,提高吞吐量
2. 关键节点发送短信通知
3. WebSocket 实时推送轨迹
4. 实现轨迹查询缓存
5. 监控轨迹更新延迟

四、最佳实践总结

4.1 架构设计

架构原则:
1. 使用事务消息保证数据一致性
2. 合理使用 Tag 进行消息过滤
3. 按业务划分 Topic,隔离不同业务
4. 配置合理的重试机制
5. 实现完善的监控告警

4.2 开发规范

开发规范:
1. 消息体尽量精简,避免过大
2. 设置合理的 Key,便于查询和去重
3. 实现消费者幂等性
4. 处理异常时返回 RECONSUME_LATER
5. 记录完整的日志

4.3 运维建议

运维建议:
1. 定期监控消息堆积情况
2. 定期检查死信队列
3. 定期清理过期消息
4. 定期进行容量评估
5. 定期进行故障演练

4.4 性能优化

性能优化:
1. 批量发送/消费
2. 增加并发度
3. 优化消息体大小
4. 合理使用延迟消息
5. 配置合适的刷盘策略

总结

RocketMQ 生产实践的核心要点:

  1. 电商场景:订单系统、秒杀系统
  2. 金融场景:支付系统、对账系统
  3. 物流场景:轨迹追踪
  4. 最佳实践:架构设计、开发规范、运维建议

核心要点

参考资料


分享这篇文章到:

上一篇文章
Kafka 生态工具全景图
下一篇文章
Redis 主从复制原理