RocketMQ 提供了 Spring Boot Starter,简化了 RocketMQ 应用的开发。本文将深入探讨 Spring Boot 集成 RocketMQ 的配置、使用和最佳实践。
一、基础配置
1.1 依赖配置
Maven 依赖:
<dependencies>
<!-- RocketMQ Spring Boot Starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
<!-- Spring Boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
</dependencies>
1.2 应用配置
application.yml:
rocketmq:
name-server: 192.168.1.100:9876;192.168.1.101:9876
producer:
group: my-producer-group
send-message-timeout: 3000
retry-times-when-send-failed: 2
retry-times-when-send-async-failed: 2
max-message-size: 4194304
compress-message-body-threshold: 1048576
consumer:
listeners:
- topic: order-topic
group: order-consumer-group
二、Producer 开发
2.1 基础 Producer
@Service
public class RocketMQProducerService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 同步发送
*/
public void sendSync(String topic, Object payload) {
SendResult result = rocketMQTemplate.syncSend(topic, payload);
log.info("发送成功:msgId={}", result.getMsgId());
}
/**
* 异步发送
*/
public void sendAsync(String topic, Object payload) {
rocketMQTemplate.asyncSend(topic, payload, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("发送成功:msgId={}", sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
log.error("发送失败", e);
}
});
}
/**
* 单向发送
*/
public void sendOneway(String topic, Object payload) {
rocketMQTemplate.sendOneWay(topic, payload);
}
/**
* 发送延迟消息
*/
public void sendDelay(String topic, Object payload, int delayLevel) {
rocketMQTemplate.syncSend(topic, payload, 3000, delayLevel);
}
}
2.2 自定义 Producer
@Configuration
public class RocketMQProducerConfig {
@Bean
public DefaultMQProducer defaultMQProducer() throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("custom-producer");
producer.setNamesrvAddr("ns1:9876;ns2:9876");
producer.setSendMsgTimeout(5000);
producer.setRetryTimesWhenSendFailed(3);
producer.setRetryTimesWhenSendAsyncFailed(3);
producer.setVipChannelEnabled(false);
producer.start();
return producer;
}
}
2.3 批量发送
@Service
public class BatchProducerService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 批量发送
*/
public void sendBatch(String topic, List<Object> payloads) {
List<Message> messages = payloads.stream()
.map(payload -> {
Message msg = new Message(topic, JSON.toJSONString(payload).getBytes());
return msg;
})
.collect(Collectors.toList());
SendResult result = rocketMQTemplate.getProducer().send(messages);
log.info("批量发送成功:size={}", messages.size());
}
}
三、Consumer 开发
3.1 基础 Consumer
@Component
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-consumer-group"
)
public class OrderConsumer implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
log.info("收到订单:{}", order);
processOrder(order);
}
private void processOrder(Order order) {
// 处理订单
}
}
3.2 并发消费
@Component
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-consumer-group",
consumeThreadMax = 64,
messageModel = MessageModel.CLUSTERING
)
public class ConcurrentOrderConsumer implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
// 并发处理
}
}
3.3 顺序消费
@Component
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-consumer-group",
consumeMode = ConsumeMode.ORDERLY
)
public class OrderlyOrderConsumer implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
// 顺序处理
}
}
3.4 事务消息 Consumer
@Component
@RocketMQMessageListener(
topic = "transaction-topic",
consumerGroup = "transaction-consumer-group"
)
public class TransactionConsumer implements RocketMQListener<Order> {
@Autowired
private OrderService orderService;
@Override
public void onMessage(Order order) {
// 处理事务消息
orderService.processTransaction(order);
}
}
四、事务消息
4.1 事务监听器
@Component
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
@Autowired
private OrderService orderService;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 1. 解析消息
Order order = JSON.parseObject(new String(msg.getBody()), Order.class);
// 2. 执行本地事务
orderService.createOrder(order);
// 3. 返回提交
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("执行本地事务失败", e);
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 1. 解析消息
Order order = JSON.parseObject(new String(msg.getBody()), Order.class);
try {
// 2. 查询本地事务状态
Order dbOrder = orderService.queryOrder(order.getId());
if (dbOrder != null && dbOrder.getStatus() == OrderStatus.CREATED) {
return RocketMQLocalTransactionState.COMMIT;
} else {
return RocketMQLocalTransactionState.ROLLBACK;
}
} catch (Exception e) {
log.error("回查事务状态失败", e);
return RocketMQLocalTransactionState.ROLLBACK;
}
}
}
4.2 事务 Producer
@Service
public class TransactionProducerService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 发送事务消息
*/
public void sendTransactionMessage(String topic, Order order) {
Message msg = MessageBuilder.withPayload(order)
.setHeader(MessageConst.PROPERTY_TRANSACTION_ID, UUID.randomUUID().toString())
.build();
SendResult result = rocketMQTemplate.sendMessageInTransaction(
topic,
msg,
order // 业务参数
);
switch (result.getSendStatus()) {
case SEND_OK:
log.info("事务消息发送成功");
break;
case SEND_MESSAGE_ILLEGAL:
log.error("消息非法");
break;
default:
log.error("发送失败:{}", result.getSendStatus());
}
}
}
五、实战案例
5.1 订单处理
@Service
public class OrderService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Autowired
private OrderRepository orderRepository;
/**
* 创建订单
*/
@Transactional
public Order createOrder(OrderDTO dto) {
// 1. 创建订单
Order order = new Order();
order.setId(UUID.randomUUID().toString());
order.setUserId(dto.getUserId());
order.setAmount(dto.getAmount());
order.setStatus(OrderStatus.CREATED);
orderRepository.save(order);
// 2. 发送订单创建事件
rocketMQTemplate.convertAndSend("order-events",
OrderEvent.builder()
.type("ORDER_CREATED")
.orderId(order.getId())
.build());
return order;
}
}
// 订单事件消费者
@Component
@RocketMQMessageListener(
topic = "order-events",
consumerGroup = "order-event-consumer"
)
public class OrderEventConsumer implements RocketMQListener<OrderEvent> {
@Override
public void onMessage(OrderEvent event) {
switch (event.getType()) {
case "ORDER_CREATED":
handleOrderCreated(event.getOrderId());
break;
case "ORDER_PAID":
handleOrderPaid(event.getOrderId());
break;
case "ORDER_SHIPPED":
handleOrderShipped(event.getOrderId());
break;
}
}
}
5.2 日志收集
@Service
public class LogCollector {
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 收集日志
*/
public void collectLog(LogEntry log) {
rocketMQTemplate.convertAndSend("logs-topic", log);
}
}
// 日志拦截器
@Component
public class LogInterceptor implements HandlerInterceptor {
@Autowired
private LogCollector logCollector;
@Override
public void afterCompletion(HttpServletRequest request,
HttpServletResponse response,
Object handler, Exception ex) {
LogEntry log = LogEntry.builder()
.service("api-gateway")
.method(request.getMethod())
.url(request.getRequestURI())
.status(response.getStatus())
.timestamp(System.currentTimeMillis())
.build();
// 异步发送
CompletableFuture.runAsync(() -> logCollector.collectLog(log));
}
}
5.3 配置中心
@Component
public class ConfigCenter {
private final Map<String, Object> configs = new ConcurrentHashMap<>();
@RocketMQMessageListener(
topic = "config-center",
consumerGroup = "config-subscriber"
)
public class ConfigListener implements RocketMQListener<ConfigUpdateEvent> {
@Override
public void onMessage(ConfigUpdateEvent event) {
log.info("配置更新:key={}, value={}", event.getKey(), event.getValue());
configs.put(event.getKey(), event.getValue());
}
}
public Object getConfig(String key) {
return configs.get(key);
}
}
六、错误处理
6.1 重试配置
@Configuration
public class RocketMQRetryConfig {
@Bean
public DefaultMQPushConsumer defaultMQPushConsumer() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer");
consumer.setNamesrvAddr("ns1:9876");
// 重试配置
consumer.setMaxReconsumeTimes(3);
consumer.setConsumeTimeout(15, TimeUnit.MINUTES);
return consumer;
}
}
6.2 死信队列
@Component
@RocketMQMessageListener(
topic = "%DLQ%order-consumer-group",
consumerGroup = "dead-letter-consumer-group"
)
public class DeadLetterConsumer implements RocketMQListener<MessageExt> {
@Override
public void onMessage(MessageExt msg) {
// 处理死信消息
log.error("死信消息:msgId={}", msg.getMsgId());
// 保存失败消息
saveFailedMessage(msg);
// 发送告警
sendAlert("死信消息:msgId=" + msg.getMsgId());
}
}
6.3 异常处理
@Component
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-consumer-group"
)
public class OrderConsumer implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
try {
processOrder(order);
} catch (BusinessException e) {
// 业务异常,记录日志
log.error("业务异常:orderId={}", order.getId(), e);
saveFailedMessage(order, e);
} catch (Exception e) {
// 系统异常,抛出重试
log.error("系统异常:orderId={}", order.getId(), e);
throw e;
}
}
}
七、监控运维
7.1 健康检查
@Component
public class RocketMQHealthIndicator implements HealthIndicator {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
public Health health() {
try {
// 发送测试消息
rocketMQTemplate.syncSend("health-check", "test");
return Health.up().build();
} catch (Exception e) {
return Health.down(e).build();
}
}
}
7.2 监控指标
@Component
public class RocketMQMetrics {
private final MeterRegistry meterRegistry;
public RocketMQMetrics(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
// 注册指标
meterRegistry.gauge("rocketmq.producer.send.count", sendCount);
meterRegistry.gauge("rocketmq.consumer.consume.count", consumeCount);
meterRegistry.gauge("rocketmq.consumer.failed.count", failedCount);
}
private final AtomicLong sendCount = new AtomicLong();
private final AtomicLong consumeCount = new AtomicLong();
private final AtomicLong failedCount = new AtomicLong();
}
八、最佳实践
8.1 配置建议
# 生产环境配置
rocketmq:
name-server: ns1:9876;ns2:9876
producer:
group: prod-producer-group
send-message-timeout: 5000
retry-times-when-send-failed: 3
retry-times-when-send-async-failed: 3
max-message-size: 4194304
vip-channel-enabled: false
consumer:
listeners:
- topic: order-topic
group: order-consumer-group
consume-thread-max: 64
message-model: CLUSTERING
8.2 代码规范
// ✅ 推荐
@Component
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-consumer-group"
)
public class OrderConsumer implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
process(order);
}
}
// ❌ 不推荐
@Component
@RocketMQMessageListener(topic = "order-topic") // 缺少 consumerGroup
public class OrderConsumer {
// ...
}
8.3 检查清单
开发检查:
- [ ] 配置 NameServer 地址
- [ ] 配置 Producer Group
- [ ] 配置 Consumer Group
- [ ] 实现错误处理
- [ ] 配置重试机制
运维检查:
- [ ] 监控发送成功率
- [ ] 监控消费滞后
- [ ] 配置告警规则
- [ ] 定期备份配置
总结
Spring Boot 集成 RocketMQ 的核心要点:
- 基础配置:依赖、YAML 配置、Bean 配置
- Producer 开发:RocketMQTemplate、自定义配置、批量发送
- Consumer 开发:@RocketMQMessageListener、并发消费、顺序消费
- 事务消息:事务监听器、事务 Producer
- 错误处理:重试配置、死信队列、异常处理
- 监控运维:健康检查、指标监控
核心要点:
- 使用 RocketMQ Spring Boot Starter 简化开发
- 合理配置错误处理和重试
- 实现事务保证数据一致
- 建立完善的监控体系
参考资料
- RocketMQ Spring 官方文档
- RocketMQ 官方文档
- 《RocketMQ 技术内幕》第 9 章