RocketMQ 集成
RocketMQ 简介
核心特性
高吞吐:
- 支持百万级 TPS
- 低延迟消息传输
- 高可用性保障
可靠性:
- 消息持久化
- 多副本复制
- 故障自动转移
丰富功能:
- 事务消息
- 顺序消息
- 延迟消息
- 消息过滤
- 消息回溯
架构设计
┌─────────────┐ ┌─────────────┐
│ Producer │ │ Consumer │
│ (生产者) │ │ (消费者) │
└──────┬──────┘ └──────▲──────┘
│ │
│ 发送消息 │ 消费消息
▼ │
┌─────────────────────────────────┐
│ NameServer │
│ (名字服务,无状态) │
└─────────────────────────────────┘
│ │
▼ │
┌─────────────────────────────────┐
│ Broker │
│ (消息存储和转发) │
│ ┌─────────┐ ┌─────────┐ │
│ │ Master │ │ Slave │ │
│ └─────────┘ └─────────┘ │
└─────────────────────────────────┘
核心概念
主题(Topic):
- 消息的一级分类
- 消息订阅的基本单位
标签(Tag):
- 消息的二级分类
- 用于消息过滤
消息队列(MessageQueue):
- Topic 的分区
- 支持并行消费
生产组(ProducerGroup):
- 生产者分组
- 用于事务消息和顺序消息
消费组(ConsumerGroup):
- 消费者分组
- 实现负载均衡和故障转移
快速开始
1. 部署 RocketMQ
Docker 部署:
version: '3.8'
services:
namesrv:
image: apache/rocketmq:4.9.4
container_name: rmqnamesrv
ports:
- 9876:9876
command: sh mqnamesrv
environment:
- JAVA_OPT_EXT=-server -Xms512m -Xmx512m
broker:
image: apache/rocketmq:4.9.4
container_name: rmqbroker
ports:
- 10911:10911
- 10909:10909
depends_on:
- namesrv
environment:
- NAMESRV_ADDR=namesrv:9876
- JAVA_OPT_EXT=-server -Xms1g -Xmx1g
command: sh mqbroker -c /home/rocketmq/conf/2m-2s-sync/broker-a.properties
volumes:
- ./broker.conf:/home/rocketmq/conf/2m-2s-sync/broker-a.conf
2. 添加依赖
<dependencies>
<!-- RocketMQ Spring Boot Starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>
</dependencies>
3. 基础配置
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: order-producer-group
send-message-timeout: 3000
retry-times-when-send-failed: 2
retry-times-when-send-async-failed: 2
consumer:
listeners: order-consumer-listener
4. 发送消息
同步发送:
@Service
public class OrderProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public SendResult sendOrderCreated(Order order) {
String topic = "TOPIC_ORDER";
String tag = "CREATE";
String key = order.getId().toString();
// 发送消息
SendResult result = rocketMQTemplate.syncSend(
topic + ":" + tag,
order,
3000 // 超时时间
);
log.info("订单消息发送成功:{}", result);
return result;
}
}
异步发送:
@Service
public class AsyncOrderProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendOrderCreated(Order order) {
String topic = "TOPIC_ORDER";
String tag = "CREATE";
rocketMQTemplate.asyncSend(
topic + ":" + tag,
order,
new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("消息发送成功:{}", sendResult);
}
@Override
public void onError(Throwable e) {
log.error("消息发送失败", e);
}
},
3000
);
}
}
单向发送:
@Service
public class OneWayProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendLog(Log log) {
// 不关心发送结果,只发送一次
rocketMQTemplate.sendOneWay("TOPIC_LOG", log);
}
}
5. 消费消息
基础消费者:
@Component
@RocketMQMessageListener(
topic = "TOPIC_ORDER",
consumerGroup = "order-consumer-group",
selectorExpression = "*", // 订阅所有标签
messageModel = MessageModel.CLUSTERING // 集群消费
)
public class OrderConsumer implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
log.info("收到订单消息:{}", order);
// 处理业务逻辑
processOrder(order);
}
private void processOrder(Order order) {
// 业务处理
}
}
消息过滤:
@Component
@RocketMQMessageListener(
topic = "TOPIC_ORDER",
consumerGroup = "order-consumer-group",
selectorExpression = "CREATE" // 只订阅 CREATE 标签
)
public class OrderCreateConsumer implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
// 处理订单创建
}
}
SQL 过滤:
@Component
@RocketMQMessageListener(
topic = "TOPIC_ORDER",
consumerGroup = "order-consumer-group",
selectorType = SelectorType.SQL92,
selectorExpression = "amount > 100 and status = 'PAID'"
)
public class OrderFilterConsumer implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
// 处理符合条件的订单
}
}
高级特性
1. 事务消息
事务消息配置:
@Component
@RocketMQTransactionListener(txProducerGroup = "order-tx-group")
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
@Autowired
private OrderService orderService;
@Override
public RocketMQLocalTransactionState executeLocalTransaction(
Message msg, Object arg) {
try {
Order order = JSON.parseObject(
new String(msg.getPayload()),
Order.class
);
// 执行本地事务
orderService.createOrder(order);
// 提交事务
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("执行本地事务失败", e);
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 检查本地事务状态
Order order = JSON.parseObject(
new String(msg.getPayload()),
Order.class
);
boolean exists = orderService.exists(order.getId());
if (exists) {
return RocketMQLocalTransactionState.COMMIT;
} else {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
}
发送事务消息:
@Service
public class OrderTransactionProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Transactional
public void sendOrderMessage(Order order) {
Message<String> message = MessageBuilder.withPayload(
JSON.toJSONString(order)
).build();
// 发送事务消息
rocketMQTemplate.sendMessageInTransaction(
"order-tx-group",
"TOPIC_ORDER:CREATE",
message,
null
);
}
}
2. 顺序消息
顺序消息发送:
@Service
public class OrderSequenceProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendOrderSequence(Order order) {
// 使用订单 ID 作为 shardingKey,保证同一订单的消息顺序
String shardingKey = order.getId().toString();
rocketMQTemplate.syncSendOrderly(
"TOPIC_ORDER_SEQUENCE",
order,
shardingKey
);
}
}
顺序消息消费:
@Component
@RocketMQMessageListener(
topic = "TOPIC_ORDER_SEQUENCE",
consumerGroup = "order-sequence-consumer-group",
consumeMode = ConsumeMode.ORDERLY // 顺序消费
)
public class OrderSequenceConsumer implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
// 按顺序处理订单消息
processOrder(order);
}
}
3. 延迟消息
发送延迟消息:
@Service
public class DelayProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendCancelOrder(Long orderId) {
// 延迟级别:1-18,对应不同的延迟时间
// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
int delayLevel = 4; // 30 秒后
Message<String> message = MessageBuilder.withPayload(
orderId.toString()
).build();
rocketMQTemplate.syncSend(
"TOPIC_ORDER_DELAY",
message,
3000,
delayLevel
);
}
}
消费延迟消息:
@Component
@RocketMQMessageListener(
topic = "TOPIC_ORDER_DELAY",
consumerGroup = "order-delay-consumer-group"
)
public class DelayConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String orderId) {
// 取消订单
orderService.cancelOrder(Long.parseLong(orderId));
}
}
4. 批量消息
发送批量消息:
@Service
public class BatchProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendBatchOrders(List<Order> orders) {
// 批量发送(不超过 4MB)
List<Message<Order>> messages = orders.stream()
.map(order -> MessageBuilder.withPayload(order).build())
.collect(Collectors.toList());
rocketMQTemplate.syncSend("TOPIC_ORDER_BATCH", messages);
}
}
5. 消息回溯
配置回溯:
@Component
@RocketMQMessageListener(
topic = "TOPIC_ORDER",
consumerGroup = "order-consumer-group",
consumeTimestamp = "20260410100000" // 回溯到指定时间
)
public class OrderBacktrackConsumer implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
// 消费历史消息
}
}
消息可靠性
1. 消息不丢失
生产者保证:
// 同步发送,等待 Broker 确认
SendResult result = rocketMQTemplate.syncSend(topic, message);
if (result.getSendStatus() == SendStatus.SEND_OK) {
// 发送成功
} else {
// 发送失败,重试或记录
}
// 失败重试
rocketMQTemplate.syncSend(
topic,
message,
3000, // 超时时间
3 // 重试次数
);
Broker 保证:
# broker.conf
# 同步刷盘
flushDiskType=SYNC_FLUSH
# 同步复制
brokerRole=SYNC_MASTER
# 刷盘落盘间隔
flushInterval=1
消费者保证:
@Component
@RocketMQMessageListener(
topic = "TOPIC_ORDER",
consumerGroup = "order-consumer-group",
ackTimeout = 3000 // 确认超时
)
public class ReliableConsumer implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
try {
// 业务处理
processOrder(order);
// 自动 ack
} catch (Exception e) {
log.error("消费失败", e);
// 抛出异常,消息会重新投递
throw e;
}
}
}
2. 消息重复消费
幂等性处理:
@Component
public class IdempotentConsumer implements RocketMQListener<Order> {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Override
public void onMessage(Order order) {
String key = "order:processed:" + order.getId();
// 使用 Redis 实现幂等
Boolean isNew = redisTemplate.opsForValue()
.setIfAbsent(key, "1", 24, TimeUnit.HOURS);
if (Boolean.FALSE.equals(isNew)) {
log.warn("订单已处理,跳过:{}", order.getId());
return;
}
// 处理业务
processOrder(order);
}
}
数据库唯一约束:
CREATE TABLE `order` (
`id` BIGINT PRIMARY KEY,
`order_no` VARCHAR(64) UNIQUE NOT NULL, -- 唯一约束
...
);
3. 消息积压处理
临时扩容:
// 部署多个消费者实例
// 增加消费线程数
@Component
@RocketMQMessageListener(
topic = "TOPIC_ORDER",
consumerGroup = "order-consumer-group",
consumeThreadMax = 64 // 增加消费线程
)
public class OrderConsumer implements RocketMQListener<Order> {
// ...
}
批量消费:
@Component
@RocketMQMessageListener(
topic = "TOPIC_ORDER",
consumerGroup = "order-consumer-group",
consumeMode = ConsumeMode.CONCURRENTLY
)
public class BatchConsumer implements RocketMQMessageListenerConcurrently<Order> {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context
) {
// 批量处理消息
for (MessageExt msg : msgs) {
processMessage(msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
监控告警
1. 监控指标
生产者指标:
- 发送 TPS
- 发送延迟
- 发送失败率
消费者指标:
- 消费 TPS
- 消费延迟
- 消费失败率
- 消息积压量
Broker 指标:
- 存储量
- 写入 TPS
- 读取 TPS
2. RocketMQ Dashboard
部署 Dashboard:
docker run -d \
-p 8080:8080 \
-e "JAVA_OPTS=-Drocketmq.config.namesrvAddr=127.0.0.1:9876" \
--name rocketmq-dashboard \
apache/rocketmq-dashboard:latest
监控功能:
- Topic 管理
- Consumer 管理
- 消息查询
- 监控图表
3. 告警配置
# Prometheus 告警规则
groups:
- name: rocketmq
rules:
- alert: RocketMQConsumerLag
expr: rocketmq_consumer_lag > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "RocketMQ 消费积压"
description: "消费积压超过 10000 条"
- alert: RocketMQSendFailed
expr: rate(rocketmq_send_failed[5m]) > 0.1
for: 2m
labels:
severity: critical
annotations:
summary: "RocketMQ 发送失败率高"
description: "发送失败率超过 10%"
最佳实践
1. Topic 设计
按业务划分:
TOPIC_ORDER - 订单相关
TOPIC_PAYMENT - 支付相关
TOPIC_INVENTORY - 库存相关
TOPIC_LOGISTICS - 物流相关
按重要性划分:
TOPIC_CRITICAL - 核心业务
TOPIC_NORMAL - 普通业务
TOPIC_LOG - 日志类
2. 消息设计
消息体大小:
- 控制在 4KB 以内
- 大字段使用对象存储
消息格式:
{
"id": "123456",
"type": "ORDER_CREATE",
"timestamp": 1680000000000,
"data": {
"orderId": "123456",
"userId": "789",
"amount": 100.00
}
}
3. 消费优化
批量消费:
// 批量处理,减少数据库交互
List<Order> batch = new ArrayList<>();
for (MessageExt msg : msgs) {
batch.add(parseOrder(msg));
if (batch.size() >= 100) {
batchProcess(batch);
batch.clear();
}
}
异步处理:
@Override
public void onMessage(Order order) {
// 快速 ack,异步处理
CompletableFuture.runAsync(() -> {
processOrder(order);
});
}
4. 故障处理
死信队列:
// 消息重试多次失败后进入死信队列
// %DLQ%consumerGroup 是死信队列名称
@Component
@RocketMQMessageListener(
topic = "%DLQ%order-consumer-group",
consumerGroup = "dlq-consumer-group"
)
public class DeadLetterConsumer implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
// 记录死信消息,人工处理
log.error("死信消息:{}", order);
}
}
重试策略:
// 配置重试次数
@Component
@RocketMQMessageListener(
topic = "TOPIC_ORDER",
consumerGroup = "order-consumer-group",
maxReconsumeTimes = 5 // 最多重试 5 次
)
public class OrderConsumer implements RocketMQListener<Order> {
// ...
}
总结
RocketMQ 是高性能、高可靠的消息中间件,支持事务消息、顺序消息、延迟消息等丰富特性。
与 Spring Cloud 集成后,可以构建高可靠的异步通信架构,实现服务解耦、流量削峰、数据同步等功能。
在生产环境中,需要做好消息可靠性保障、监控告警和故障处理,确保消息系统的稳定运行。