前言
RocketMQ 是阿里开源的分布式消息中间件,具有高吞吐、高可用、低延迟等特点。Spring Boot 通过 rocketmq-spring-boot-starter 可以方便地集成 RocketMQ。
快速开始
1. 添加依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.0</version>
</dependency>
2. 基础配置
rocketmq:
name-server: localhost:9876
producer:
group: demo-producer-group
send-message-timeout: 3000
retry-times-when-send-failed: 2
retry-times-when-send-async-failed: 2
3. 发送消息
@Service
@RequiredArgsConstructor
public class MessageService {
private final RocketMQTemplate rocketMQTemplate;
/**
* 发送普通消息
*/
public void sendNormalMessage() {
rocketMQTemplate.convertAndSend(
"demo-topic",
"Hello RocketMQ"
);
}
/**
* 发送对象消息
*/
public void sendObjectMessage(UserDTO user) {
rocketMQTemplate.convertAndSend(
"demo-topic",
user
);
}
/**
* 发送带标签的消息
*/
public void sendWithTag() {
rocketMQTemplate.convertAndSend(
"demo-topic:tag1",
"Message with tag"
);
}
/**
* 发送同步消息
*/
public void sendSync() {
SendResult result = rocketMQTemplate.syncSend(
"demo-topic",
"Sync message"
);
log.info("发送结果:{}", result.getSendStatus());
}
/**
* 发送异步消息
*/
public void sendAsync() {
rocketMQTemplate.asyncSend(
"demo-topic",
"Async message",
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("发送成功:{}", sendResult);
}
@Override
public void onException(Throwable e) {
log.error("发送失败", e);
}
}
);
}
/**
* 发送单向消息
*/
public void sendOneWay() {
rocketMQTemplate.sendOneWay(
"demo-topic",
"One way message"
);
}
}
4. 消费消息
@Component
@RocketMQMessageListener(
topic = "demo-topic",
consumerGroup = "demo-consumer-group",
selectorExpression = "*",
messageModel = MessageModel.CLUSTERING
)
public class DemoConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("收到消息:{}", message);
// 业务处理
processMessage(message);
}
private void processMessage(String message) {
// 业务逻辑
}
}
5. 消费对象消息
@Component
@RocketMQMessageListener(
topic = "demo-topic",
consumerGroup = "demo-consumer-group-object"
)
public class UserConsumer implements RocketMQListener<UserDTO> {
@Override
public void onMessage(UserDTO user) {
log.info("收到用户消息:{}", user);
// 处理用户数据
userService.process(user);
}
}
高级特性
1. 顺序消息
@Service
@RequiredArgsConstructor
public class OrderMessageService {
private final RocketMQTemplate rocketMQTemplate;
/**
* 发送顺序消息
*/
public void sendOrderMessage(Order order) {
// 使用订单 ID 作为哈希键,保证同一订单的消息顺序
rocketMQTemplate.syncSendOrderly(
"order-topic",
order,
String.valueOf(order.getId())
);
}
}
@Component
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-consumer-group",
consumeMode = ConsumeMode.ORDERLY
)
public class OrderConsumer implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
log.info("处理订单:{}", order.getId());
// 按顺序处理订单
processOrder(order);
}
}
2. 延迟消息
@Service
@RequiredArgsConstructor
public class DelayMessageService {
private final RocketMQTemplate rocketMQTemplate;
/**
* 发送延迟消息
* delayLevel: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
*/
public void sendDelayMessage(Order order) {
// 延迟 30 分钟检查订单支付状态
rocketMQTemplate.syncSend(
"order-delay-topic",
MessageBuilder.withPayload(order)
.setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 16) // 30m
.build(),
3000
);
}
}
@Component
@RocketMQMessageListener(
topic = "order-delay-topic",
consumerGroup = "order-delay-consumer-group"
)
public class OrderDelayConsumer implements RocketMQListener<Order> {
@Autowired
private OrderService orderService;
@Override
public void onMessage(Order order) {
// 检查订单是否支付
if (!orderService.isPaid(order.getId())) {
// 取消订单
orderService.cancelOrder(order.getId());
}
}
}
3. 事务消息
@Service
@RequiredArgsConstructor
public class TransactionMessageService {
private final RocketMQTemplate rocketMQTemplate;
/**
* 发送事务消息
*/
@Transactional
public void sendTransactionMessage(Order order) {
// 1. 发送半消息
SendResult result = rocketMQTemplate.sendMessageInTransaction(
"order-transaction-topic",
MessageBuilder.withPayload(order).build(),
order
);
log.info("事务消息发送结果:{}", result.getSendStatus());
}
}
@Component
@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
@Autowired
private OrderService orderService;
/**
* 执行本地事务
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(
Message message,
Object arg
) {
try {
Order order = (Order) arg;
// 执行本地事务:创建订单
orderService.createOrder(order);
// 提交事务
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("执行本地事务失败", e);
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/**
* 事务回查
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
// RocketMQ 未收到确认时,会回调此方法检查事务状态
String messageId = message.getHeaders().get("id").toString();
boolean isCommitted = orderService.isOrderCreated(messageId);
return isCommitted ?
RocketMQLocalTransactionState.COMMIT :
RocketMQLocalTransactionState.ROLLBACK;
}
}
4. 批量消息
@Service
@RequiredArgsConstructor
public class BatchMessageService {
private final RocketMQTemplate rocketMQTemplate;
/**
* 发送批量消息
*/
public void sendBatchMessages(List<Order> orders) {
List<Message> messages = orders.stream()
.map(order -> MessageBuilder.withPayload(order)
.setHeader(MessageConst.PROPERTY_KEYS, String.valueOf(order.getId()))
.build()
)
.collect(Collectors.toList());
rocketMQTemplate.syncSend("batch-topic", messages);
}
}
5. 消息过滤
@Component
@RocketMQMessageListener(
topic = "filter-topic",
consumerGroup = "filter-consumer-group",
selectorExpression = "tag1 || tag2" // SQL92 过滤
)
public class FilterConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
log.info("收到过滤后的消息:{}", message);
}
}
@Service
@RequiredArgsConstructor
public class FilterMessageService {
private final RocketMQTemplate rocketMQTemplate;
public void sendWithFilter() {
Message<String> message = MessageBuilder.withPayload("test").build();
message.setHeader("type", "vip");
rocketMQTemplate.send(
"filter-topic:tag1",
message
);
}
}
最佳实践
1. 消息幂等性
@Component
@RocketMQMessageListener(
topic = "idempotent-topic",
consumerGroup = "idempotent-consumer-group"
)
public class IdempotentConsumer implements RocketMQListener<Order> {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@Override
public void onMessage(Order order) {
String key = "order:processed:" + order.getId();
// 使用 Redis 实现幂等性
Boolean isNew = redisTemplate.opsForValue()
.setIfAbsent(key, "processed", 24, TimeUnit.HOURS);
if (Boolean.FALSE.equals(isNew)) {
log.warn("消息已处理,跳过:{}", order.getId());
return;
}
// 处理业务
processOrder(order);
}
}
2. 消息重试
rocketmq:
consumer:
retry-times-when-consume-failed: 16 # 重试次数
suspend-current-queue-time-millis: 1000 # 重试间隔
@Component
@RocketMQMessageListener(
topic = "retry-topic",
consumerGroup = "retry-consumer-group",
maxReconsumeTimes = 5 // 最大重试次数
)
public class RetryConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
try {
// 业务处理
processMessage(message);
} catch (Exception e) {
log.error("处理失败,将重试", e);
throw new RuntimeException(e); // 抛出异常触发重试
}
}
}
3. 死信队列
@Component
@RocketMQMessageListener(
topic = "dlq-topic",
consumerGroup = "dlq-consumer-group"
)
public class DLQConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
// 处理死信队列中的消息
log.warn("处理死信消息:{}", message);
// 记录日志、发送告警等
alertService.sendAlert("死信消息:" + message);
}
}
4. 消息追踪
@Component
@RocketMQMessageListener(
topic = "trace-topic",
consumerGroup = "trace-consumer-group"
)
public class TraceConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
// 记录消息处理日志
log.info("收到消息,TraceId: {}", MDC.get("traceId"));
processMessage(message);
}
}
5. 监控告警
@Component
public class RocketMQMonitor {
@Autowired
private MeterRegistry meterRegistry;
/**
* 记录消息发送指标
*/
public void recordSend(String topic, boolean success, long cost) {
meterRegistry.counter("rocketmq.message.send",
"topic", topic,
"status", success ? "success" : "failed"
).increment();
meterRegistry.timer("rocketmq.message.send.time",
"topic", topic
).record(cost, TimeUnit.MILLISECONDS);
}
/**
* 记录消息消费指标
*/
public void recordConsume(String topic, boolean success, long cost) {
meterRegistry.counter("rocketmq.message.consume",
"topic", topic,
"status", success ? "success" : "failed"
).increment();
}
}
总结
RocketMQ 集成要点:
- ✅ 基础使用 - 发送、消费消息
- ✅ 顺序消息 - 保证消息顺序
- ✅ 延迟消息 - 定时任务
- ✅ 事务消息 - 分布式事务
- ✅ 最佳实践 - 幂等性、重试、死信队列
RocketMQ 是企业级消息中间件的优秀选择。