RocketMQ 的延迟消息和定时消息是其重要特性,广泛应用于延迟处理、定时任务等场景。本文将深入探讨延迟消息的实现原理和实战应用。
一、延迟消息基础
1.1 什么是延迟消息?
延迟消息是指消息发送后不会立即被消费,而是在指定时间后才可被消费:
sequenceDiagram
participant P as Producer
participant MQ as RocketMQ
participant C as Consumer
P->>MQ: 发送延迟消息(延迟 10 秒)
MQ-->>P: 返回成功
note over MQ: 存储消息<br/>等待 10 秒
MQ->>C: 10 秒后投递消息
C-->>MQ: 消费确认
1.2 应用场景
| 场景 | 说明 | 延迟时间 |
|---|---|---|
| 订单超时取消 | 下单后 30 分钟未支付自动取消 | 30 分钟 |
| 延时通知 | 会议开始前 10 分钟提醒 | 自定义 |
| 定时任务 | 每天凌晨 2 点执行任务 | 固定时间 |
| 重试机制 | 消费失败后延迟重试 | 指数退避 |
| 数据同步 | 数据变更后延迟同步 | 几秒~几分钟 |
1.3 与其他方案对比
| 方案 | 延迟精度 | 可靠性 | 性能 | 复杂度 |
|---|---|---|---|---|
| RocketMQ | 秒级 | 高 | 高 | 低 |
| Redis ZSet | 毫秒级 | 中 | 中 | 中 |
| RabbitMQ TTL+DLX | 毫秒级 | 中 | 中 | 高 |
| Kafka 时间轮 | 秒级 | 高 | 高 | 高 |
二、实现原理
2.1 延迟级别
RocketMQ 支持 18 个延迟级别:
// 延迟级别定义
public enum DelayLevel {
LEVEL_1(1, "1s"),
LEVEL_2(2, "5s"),
LEVEL_3(3, "10s"),
LEVEL_4(4, "30s"),
LEVEL_5(5, "1m"),
LEVEL_6(6, "2m"),
LEVEL_7(7, "3m"),
LEVEL_8(8, "4m"),
LEVEL_9(9, "5m"),
LEVEL_10(10, "6m"),
LEVEL_11(11, "7m"),
LEVEL_12(12, "8m"),
LEVEL_13(13, "9m"),
LEVEL_14(14, "10m"),
LEVEL_15(15, "20m"),
LEVEL_16(16, "30m"),
LEVEL_17(17, "1h"),
LEVEL_18(18, "2h");
private final int level;
private final String desc;
}
2.2 存储结构
graph TB
subgraph 发送阶段
P[Producer] -->|发送 | SCHEDULE_TOPIC
end
subgraph Broker
ST[SCHEDULE_TOPIC_XX<br/>延迟队列]
QT[Queue Offset Table<br/>偏移量表]
DT[Delivery Table<br/>投递表]
end
SCHEDULE_TOPIC --> ST
ST --> QT
QT --> DT
subgraph 投递阶段
DT -->|到期 | RT[Real Topic<br/>真实队列]
RT --> C[Consumer]
end
2.3 处理流程
sequenceDiagram
participant P as Producer
participant B as Broker
participant S as ScheduleService
participant R as RealTopic
P->>B: 发送延迟消息<br/>delayLevel=4
B->>B: 转换为 SCHEDULE_TOPIC_4
B-->>P: 返回成功
loop 每秒检查
S->>S: 检查到期消息
end
S->>S: 消息到期?
S->>R: 还原真实 Topic
R->>C: 投递给消费者
2.4 源码分析
// Broker 端处理延迟消息
public class ScheduleMessageService {
// 延迟级别映射表
private static final Map<Integer, String> DELAY_TOPIC_MAP = new HashMap<>();
static {
for (int i = 1; i <= 18; i++) {
DELAY_TOPIC_MAP.put(i, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC + "_" + i);
}
}
/**
* 准备延迟消息
*/
public MessageExt prepareMessage(MessageExt msg) {
int delayLevel = msg.getDelayTimeLevel();
// 验证延迟级别
if (delayLevel > 18) {
delayLevel = 18;
}
// 保存原始 Topic
msg.putProperty(MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
// 保存原始 Queue ID
msg.putProperty(MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
// 转换为延迟 Topic
msg.setTopic(DELAY_TOPIC_MAP.get(delayLevel));
return msg;
}
/**
* 投递到期消息
*/
public void deliverDelayedMessage(int delayLevel, long offset) {
// 1. 从延迟队列读取消息
MessageExt msg = consumeQueue.getMessage(offset);
// 2. 检查是否到期
if (msg.getStoreTimestamp() + getDelayMillis(delayLevel) > System.currentTimeMillis()) {
return; // 未到期
}
// 3. 还原真实 Topic
String realTopic = msg.getProperty(MessageConst.PROPERTY_REAL_TOPIC);
int realQueueId = Integer.parseInt(msg.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID));
msg.setTopic(realTopic);
msg.setQueueId(realQueueId);
// 4. 写入真实队列
commitRealTopic(msg);
}
}
三、代码实现
3.1 发送延迟消息
@Service
public class DelayMessageProducer {
@Autowired
private DefaultMQProducer producer;
/**
* 发送延迟消息
*/
public SendResult sendDelayMessage(String topic, String body, int delayLevel) {
Message msg = new Message(topic, body.getBytes());
// 设置延迟级别
msg.setDelayTimeLevel(delayLevel);
// 发送消息
SendResult result = producer.send(msg);
log.info("延迟消息发送成功:topic={}, delayLevel={}, msgId={}",
topic, delayLevel, result.getMsgId());
return result;
}
/**
* 发送订单超时取消消息
*/
public void sendOrderTimeoutMessage(Long orderId, long timeoutMinutes) {
OrderTimeoutMessage msg = new OrderTimeoutMessage();
msg.setOrderId(orderId);
msg.setTimeoutTime(System.currentTimeMillis() + timeoutMinutes * 60 * 1000);
Message message = new Message(
"order-timeout-topic",
"timeout",
orderId.toString(),
JSON.toJSONString(msg).getBytes()
);
// 根据超时时间选择延迟级别
int delayLevel = calculateDelayLevel(timeoutMinutes);
message.setDelayTimeLevel(delayLevel);
producer.send(message);
}
private int calculateDelayLevel(long minutes) {
if (minutes <= 1) return 1; // 1s
if (minutes <= 5) return 2; // 5s
if (minutes <= 10) return 3; // 10s
if (minutes <= 30) return 4; // 30s
if (minutes <= 60) return 5; // 1m
if (minutes <= 120) return 6; // 2m
if (minutes <= 180) return 7; // 3m
if (minutes <= 240) return 8; // 4m
if (minutes <= 300) return 9; // 5m
if (minutes <= 360) return 10; // 6m
if (minutes <= 420) return 11; // 7m
if (minutes <= 480) return 12; // 8m
if (minutes <= 540) return 13; // 9m
if (minutes <= 600) return 14; // 10m
if (minutes <= 1200) return 15; // 20m
if (minutes <= 1800) return 16; // 30m
if (minutes <= 3600) return 17; // 1h
return 18; // 2h
}
}
3.2 消费延迟消息
@Service
public class DelayMessageConsumer {
@PostConstruct
public void init() throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay-consumer-group");
consumer.setNamesrvAddr("localhost:9876");
// 订阅延迟消息 Topic
consumer.subscribe("order-timeout-topic", "timeout");
// 注册消息监听器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
try {
// 解析消息
OrderTimeoutMessage timeoutMsg = JSON.parseObject(
new String(msg.getBody()), OrderTimeoutMessage.class);
log.info("收到超时消息:orderId={}, timeoutTime={}",
timeoutMsg.getOrderId(), timeoutMsg.getTimeoutTime());
// 处理订单超时
handleOrderTimeout(timeoutMsg.getOrderId());
} catch (Exception e) {
log.error("处理超时消息失败:orderId={}",
timeoutMsg.getOrderId(), e);
// 返回重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
log.info("延迟消息消费者启动成功");
}
private void handleOrderTimeout(Long orderId) {
// 1. 查询订单状态
Order order = orderService.queryById(orderId);
// 2. 检查是否需要取消
if (order != null && order.getStatus() == OrderStatus.UNPAID) {
// 3. 取消订单
orderService.cancelOrder(orderId, "超时未支付");
// 4. 发送通知
notificationService.sendCancelNotification(order);
log.info("订单已取消:orderId={}", orderId);
}
}
}
四、定时消息
4.1 实现方式
方式一:计算延迟级别
public void sendScheduledMessage(String topic, String body, long scheduledTime) {
long delayMillis = scheduledTime - System.currentTimeMillis();
int delayLevel = calculateDelayLevel(delayMillis);
Message msg = new Message(topic, body.getBytes());
msg.setDelayTimeLevel(delayLevel);
producer.send(msg);
}
方式二:使用定时任务
@Component
public class ScheduledMessageTask {
@Autowired
private DefaultMQProducer producer;
@Autowired
private ScheduledTaskMapper taskMapper;
/**
* 定时发送消息
*/
@Scheduled(cron = "0 */1 * * * ?") // 每分钟执行
public void sendScheduledMessages() {
// 1. 查询待发送的定时任务
List<ScheduledTask> tasks = taskMapper.selectDueTasks(System.currentTimeMillis());
for (ScheduledTask task : tasks) {
try {
// 2. 发送消息
Message msg = new Message(
task.getTopic(),
task.getTag(),
task.getBody().getBytes()
);
producer.send(msg);
// 3. 更新任务状态
task.setStatus("SENT");
taskMapper.update(task);
log.info("定时消息发送成功:taskId={}", task.getId());
} catch (Exception e) {
log.error("定时消息发送失败:taskId={}", task.getId(), e);
}
}
}
}
4.2 数据库方案
-- 定时任务表
CREATE TABLE scheduled_task (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
topic VARCHAR(128) NOT NULL,
tag VARCHAR(64),
body TEXT NOT NULL,
execute_time DATETIME NOT NULL,
status VARCHAR(16) DEFAULT 'PENDING',
retry_count INT DEFAULT 0,
created_time DATETIME DEFAULT CURRENT_TIMESTAMP,
INDEX idx_execute_time (execute_time),
INDEX idx_status (status)
);
@Service
public class DatabaseScheduledTaskService {
@Autowired
private ScheduledTaskMapper taskMapper;
@Autowired
private DefaultMQProducer producer;
/**
* 添加定时任务
*/
@Transactional
public void addTask(ScheduledTask task) {
taskMapper.insert(task);
}
/**
* 定时扫描并发送
*/
@Scheduled(fixedRate = 5000) // 5 秒一次
public void scanAndSend() {
// 1. 批量查询到期任务
PageHelper.startPage(1, 100);
List<ScheduledTask> tasks = taskMapper.selectDueTasks(System.currentTimeMillis());
for (ScheduledTask task : tasks) {
try {
// 2. 发送消息
Message msg = new Message(
task.getTopic(),
task.getTag(),
task.getBody().getBytes()
);
producer.send(msg);
// 3. 标记已完成
task.setStatus("COMPLETED");
taskMapper.update(task);
} catch (Exception e) {
// 4. 失败重试
task.setRetryCount(task.getRetryCount() + 1);
if (task.getRetryCount() >= 3) {
task.setStatus("FAILED");
}
taskMapper.update(task);
}
}
}
}
五、最佳实践
5.1 延迟级别选择
| 业务场景 | 延迟时间 | 推荐级别 |
|---|---|---|
| 订单超时取消 | 30 分钟 | Level 16 |
| 支付提醒 | 15 分钟 | Level 15 |
| 会议提醒 | 10 分钟 | Level 14 |
| 数据同步 | 5 分钟 | Level 9 |
| 重试机制 | 指数退避 | 动态选择 |
5.2 幂等性保证
@Service
public class IdempotentDelayConsumer {
@Autowired
private StringRedisTemplate redisTemplate;
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
String msgId = msg.getMsgId();
// 1. 检查是否已处理
Boolean processed = redisTemplate.opsForValue()
.setIfAbsent("delay:processed:" + msgId, "1", 24, TimeUnit.HOURS);
if (Boolean.FALSE.equals(processed)) {
log.info("消息已处理,跳过:msgId={}", msgId);
continue;
}
try {
// 2. 处理业务
processBusiness(msg);
} catch (Exception e) {
log.error("处理失败:msgId={}", msgId, e);
// 删除幂等标记,允许重试
redisTemplate.delete("delay:processed:" + msgId);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
5.3 监控告警
@Component
public class DelayMessageMetrics {
@Autowired
private MeterRegistry meterRegistry;
private final AtomicLong sendCount = new AtomicLong();
private final AtomicLong consumeCount = new AtomicLong();
private final AtomicLong delayMillis = new AtomicLong();
@PostConstruct
public void init() {
meterRegistry.gauge("delay.message.send.count", sendCount);
meterRegistry.gauge("delay.message.consume.count", consumeCount);
meterRegistry.gauge("delay.message.avg.delay.millis", delayMillis);
}
public void recordSend() {
sendCount.incrementAndGet();
}
public void recordConsume(long actualDelay) {
consumeCount.incrementAndGet();
delayMillis.set(actualDelay);
}
}
六、常见问题排查
6.1 延迟不准确
原因:
- 延迟级别粒度粗
- Broker 负载高
解决:
// 使用数据库方案实现精确延迟
public void sendExactDelayMessage(long exactDelayMillis) {
long executeTime = System.currentTimeMillis() + exactDelayMillis;
ScheduledTask task = new ScheduledTask();
task.setTopic("order-topic");
task.setBody("...");
task.setExecuteTime(executeTime);
taskMapper.insert(task);
}
6.2 消息堆积
原因:
- 消费者处理慢
- 延迟消息量过大
解决:
# 增加消费者数量
# 增加消费线程
consumer.setConsumeThreadMax(100);
# 优化处理逻辑
6.3 消息丢失
原因:
- Broker 故障
- 未持久化
解决:
// 同步发送
SendResult result = producer.send(msg);
if (result.getSendStatus() != SendStatus.SEND_OK) {
// 重试或记录
}
// 开启事务消息
TransactionSendResult result = producer.sendMessageInTransaction(msg, null);
总结
RocketMQ 延迟消息的核心要点:
- 延迟级别:18 个固定级别,从 1 秒到 2 小时
- 实现原理:SCHEDULE_TOPIC + 定时投递
- 使用方式:setDelayTimeLevel()
- 定时消息:计算延迟级别或数据库方案
- 最佳实践:幂等性、监控、告警
核心要点:
- 理解 18 个延迟级别及其对应时间
- 根据业务场景选择合适的延迟级别
- 实现消费者幂等性防止重复
- 监控延迟消息的发送和消费情况
参考资料
- RocketMQ 延迟消息官方文档
- RocketMQ 源码
- 《RocketMQ 技术内幕》第 8 章