本文汇总了 RocketMQ 在生产环境中的典型应用案例,涵盖电商、金融、物流等多个场景,分享最佳实践和经验教训。
一、电商场景
1.1 订单系统
业务背景:
电商平台日均订单量:100 万+
峰值 TPS:1 万+
订单状态流转:创建→支付→发货→完成
架构设计:
graph TB
subgraph 订单服务
OS[订单创建]
end
subgraph RocketMQ
T1[order-topic]
T2[order-event-topic]
end
subgraph 下游系统
IS[库存系统]
PS[支付系统]
LS[物流系统]
AS[分析系统]
end
OS --> T1
T1 --> T2
T2 --> IS
T2 --> PS
T2 --> LS
T2 --> AS
实现方案:
@Service
public class OrderService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 创建订单
*/
@Transactional
public Order createOrder(OrderDTO dto) {
// 1. 创建订单
Order order = new Order();
order.setId(UUID.randomUUID().toString());
order.setUserId(dto.getUserId());
order.setAmount(dto.getAmount());
order.setStatus(OrderStatus.CREATED);
orderRepository.save(order);
// 2. 发送订单创建事件(事务消息)
Message msg = MessageBuilder.withPayload(order)
.setHeader(MessageConst.PROPERTY_TRANSACTION_ID, UUID.randomUUID().toString())
.build();
rocketMQTemplate.sendMessageInTransaction("order-event-topic", msg, order);
return order;
}
}
// 库存扣减消费者
@Component
@RocketMQMessageListener(
topic = "order-event-topic",
consumerGroup = "inventory-consumer",
selectorExpression = "type = 'ORDER_CREATED'"
)
public class InventoryConsumer implements RocketMQListener<OrderEvent> {
@Autowired
private InventoryService inventoryService;
@Override
public void onMessage(OrderEvent event) {
// 扣减库存
inventoryService.deductStock(event.getOrderId(), event.getItems());
}
}
最佳实践:
1. 使用事务消息保证订单创建和事件发送的原子性
2. 按订单 ID 设置 Key,保证同一订单消息顺序
3. 配置合理的重试机制,处理库存不足等异常
4. 实现幂等性,防止重复扣减库存
5. 监控订单处理延迟,及时发现异常
1.2 秒杀系统
业务背景:
秒杀活动:限量 1000 件商品
峰值 QPS:10 万+
要求:不超卖、不重复购买
架构设计:
graph TB
subgraph 用户请求
UR[秒杀请求]
end
subgraph 网关层
GW[限流网关]
end
subgraph 缓存层
RC[Redis 缓存]
end
subgraph 消息队列
MQ[RocketMQ]
end
subgraph 数据库
DB[MySQL]
end
UR --> GW
GW --> RC
RC -->|预扣减 | MQ
MQ -->|异步处理 | DB
实现方案:
@Service
public class SeckillService {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 秒杀下单
*/
public SeckillResult seckill(Long userId, Long itemId) {
// 1. 限流检查
if (!rateLimiter.tryAcquire()) {
return SeckillResult.fail("请求过于频繁");
}
// 2. 库存预扣减(Redis Lua 脚本)
String script =
"local stock = redis.call('get', KEYS[1]) " +
"if tonumber(stock) > 0 then " +
" redis.call('decr', KEYS[1]) " +
" return 1 " +
"else " +
" return 0 " +
"end";
Long result = redisTemplate.execute(
new DefaultRedisScript<>(script, Long.class),
Collections.singletonList("seckill:stock:" + itemId)
);
if (result == 0) {
return SeckillResult.fail("库存不足");
}
// 3. 发送秒杀消息(削峰填谷)
SeckillOrder order = new SeckillOrder();
order.setUserId(userId);
order.setItemId(itemId);
order.setCreateTime(System.currentTimeMillis());
rocketMQTemplate.sendOneWay("seckill-topic", order);
return SeckillResult.success("排队中,请稍后查看结果");
}
}
// 异步处理消费者
@Component
@RocketMQMessageListener(
topic = "seckill-topic",
consumerGroup = "seckill-processor",
consumeThreadMax = 100
)
public class SeckillProcessor implements RocketMQListener<SeckillOrder> {
@Autowired
private OrderService orderService;
@Override
public void onMessage(SeckillOrder order) {
// 创建订单
orderService.createSeckillOrder(order);
}
}
最佳实践:
1. Redis 预扣减库存,快速响应
2. RocketMQ 削峰填谷,保护数据库
3. 增加消费者并发度,加快处理速度
4. 实现幂等性,防止重复下单
5. 配置死信队列,处理失败订单
二、金融场景
2.1 支付系统
业务背景:
支付交易:日均 50 万笔
可靠性要求:金融级
一致性要求:最终一致
架构设计:
graph TB
subgraph 支付网关
PG[支付请求]
end
subgraph 支付服务
PS[支付处理]
end
subgraph RocketMQ
T1[payment-topic]
T2[notify-topic]
end
subgraph 下游系统
BS[银行系统]
NS[通知系统]
AS[账务系统]
end
PG --> PS
PS --> T1
T1 --> BS
BS --> T2
T2 --> NS
T2 --> AS
实现方案:
@Service
public class PaymentService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private PaymentRepository paymentRepository;
/**
* 处理支付
*/
@Transactional
public PaymentResult processPayment(PaymentRequest request) {
// 1. 创建支付记录
Payment payment = new Payment();
payment.setId(UUID.randomUUID().toString());
payment.setOrderId(request.getOrderId());
payment.setAmount(request.getAmount());
payment.setStatus(PaymentStatus.PROCESSING);
paymentRepository.save(payment);
// 2. 发送支付请求到银行(事务消息)
Message msg = MessageBuilder.withPayload(request)
.setHeader(MessageConst.PROPERTY_TRANSACTION_ID, payment.getId())
.build();
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
"payment-topic",
msg,
payment
);
if (result.getSendStatus() != SendStatus.SEND_OK) {
throw new BusinessException("发送支付请求失败");
}
return PaymentResult.processing(payment.getId());
}
}
// 银行回调消费者
@Component
@RocketMQMessageListener(
topic = "payment-callback-topic",
consumerGroup = "payment-callback-consumer"
)
public class PaymentCallbackConsumer implements RocketMQListener<PaymentCallback> {
@Autowired
private PaymentService paymentService;
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
@Transactional
public void onMessage(PaymentCallback callback) {
// 1. 更新支付状态
Payment payment = paymentService.updatePaymentStatus(
callback.getPaymentId(),
callback.getStatus()
);
// 2. 发送支付结果通知
rocketMQTemplate.convertAndSend("notify-topic",
PaymentNotify.builder()
.paymentId(payment.getId())
.orderId(payment.getOrderId())
.status(payment.getStatus())
.build());
}
}
最佳实践:
1. 使用事务消息保证支付记录和消息发送的一致性
2. 实现幂等性,防止重复处理回调
3. 配置合理的重试机制,处理网络异常
4. 记录完整的审计日志
5. 实时监控支付成功率,及时发现异常
2.2 对账系统
业务背景:
对账频率:每日一次
数据量:百万级交易
要求:准确、完整
实现方案:
@Service
public class ReconciliationService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发起对账
*/
@Scheduled(cron = "0 0 2 * * ?") // 每天凌晨 2 点
public void startReconciliation() {
// 1. 获取昨日交易
List<Transaction> transactions = getYesterdayTransactions();
// 2. 分批发送对账消息
int batchSize = 1000;
for (int i = 0; i < transactions.size(); i += batchSize) {
int end = Math.min(i + batchSize, transactions.size());
List<Transaction> batch = transactions.subList(i, end);
rocketMQTemplate.convertAndSend("reconciliation-topic",
ReconciliationBatch.builder()
.batchId(UUID.randomUUID().toString())
.transactions(batch)
.build());
}
}
}
// 对账处理消费者
@Component
@RocketMQMessageListener(
topic = "reconciliation-topic",
consumerGroup = "reconciliation-consumer",
consumeThreadMax = 10
)
public class ReconciliationConsumer implements RocketMQListener<ReconciliationBatch> {
@Autowired
private BankChannel bankChannel;
@Override
public void onMessage(ReconciliationBatch batch) {
// 1. 获取银行对账单
BankStatement bankStatement = bankChannel.getStatement(
batch.getDate(),
batch.getBatchId()
);
// 2. 逐笔核对
List<ReconciliationResult> results = new ArrayList<>();
for (Transaction tx : batch.getTransactions()) {
ReconciliationResult result = reconcile(tx, bankStatement);
results.add(result);
}
// 3. 发送对账结果
rocketMQTemplate.convertAndSend("reconciliation-result-topic",
ReconciliationReport.builder()
.batchId(batch.getBatchId())
.results(results)
.build());
}
private ReconciliationResult reconcile(Transaction tx, BankStatement statement) {
// 核对逻辑
// ...
return result;
}
}
最佳实践:
1. 批量处理,提高效率
2. 限制并发度,避免对银行系统造成压力
3. 记录详细对账日志,便于排查
4. 对账失败自动重试
5. 对账结果及时通知
三、物流场景
3.1 轨迹追踪
业务背景:
物流订单:日均 200 万单
轨迹更新:实时
查询要求:低延迟
实现方案:
@Service
public class LogisticsService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 更新物流轨迹
*/
public void updateTrack(TrackUpdate update) {
// 1. 保存到数据库
trackRepository.save(update);
// 2. 发送轨迹更新事件
rocketMQTemplate.convertAndSend("track-update-topic",
TrackEvent.builder()
.orderId(update.getOrderId())
.location(update.getLocation())
.status(update.getStatus())
.timestamp(update.getTimestamp())
.build());
}
}
// 轨迹推送消费者
@Component
@RocketMQMessageListener(
topic = "track-update-topic",
consumerGroup = "track-push-consumer"
)
public class TrackPushConsumer implements RocketMQListener<TrackEvent> {
@Autowired
private WebSocketService webSocketService;
@Autowired
private SmsService smsService;
@Override
public void onMessage(TrackEvent event) {
// 1. 推送 WebSocket 通知
webSocketService.sendToUser(event.getOrderId(), event);
// 2. 关键节点发送短信
if (isCriticalStatus(event.getStatus())) {
smsService.sendTrackSms(event.getOrderId(), event.getStatus());
}
}
}
最佳实践:
1. 轨迹更新使用单向发送,提高吞吐量
2. 关键节点发送短信通知
3. WebSocket 实时推送轨迹
4. 实现轨迹查询缓存
5. 监控轨迹更新延迟
四、最佳实践总结
4.1 架构设计
架构原则:
1. 使用事务消息保证数据一致性
2. 合理使用 Tag 进行消息过滤
3. 按业务划分 Topic,隔离不同业务
4. 配置合理的重试机制
5. 实现完善的监控告警
4.2 开发规范
开发规范:
1. 消息体尽量精简,避免过大
2. 设置合理的 Key,便于查询和去重
3. 实现消费者幂等性
4. 处理异常时返回 RECONSUME_LATER
5. 记录完整的日志
4.3 运维建议
运维建议:
1. 定期监控消息堆积情况
2. 定期检查死信队列
3. 定期清理过期消息
4. 定期进行容量评估
5. 定期进行故障演练
4.4 性能优化
性能优化:
1. 批量发送/消费
2. 增加并发度
3. 优化消息体大小
4. 合理使用延迟消息
5. 配置合适的刷盘策略
总结
RocketMQ 生产实践的核心要点:
- 电商场景:订单系统、秒杀系统
- 金融场景:支付系统、对账系统
- 物流场景:轨迹追踪
- 最佳实践:架构设计、开发规范、运维建议
核心要点:
- 根据业务场景选择合适的消息模式
- 使用事务消息保证数据一致性
- 实现消费者幂等性
- 建立完善的监控告警体系
- 定期运维和优化
参考资料
- RocketMQ 官方文档
- RocketMQ 最佳实践
- 《RocketMQ 技术内幕》第 12 章