Skip to content
清晨的一缕阳光
返回

RocketMQ 延迟消息与定时消息详解

RocketMQ 的延迟消息和定时消息是其重要特性,广泛应用于延迟处理、定时任务等场景。本文将深入探讨延迟消息的实现原理和实战应用。

一、延迟消息基础

1.1 什么是延迟消息?

延迟消息是指消息发送后不会立即被消费,而是在指定时间后才可被消费:

sequenceDiagram
    participant P as Producer
    participant MQ as RocketMQ
    participant C as Consumer
    
    P->>MQ: 发送延迟消息(延迟 10 秒)
    MQ-->>P: 返回成功
    note over MQ: 存储消息<br/>等待 10 秒
    MQ->>C: 10 秒后投递消息
    C-->>MQ: 消费确认

1.2 应用场景

场景说明延迟时间
订单超时取消下单后 30 分钟未支付自动取消30 分钟
延时通知会议开始前 10 分钟提醒自定义
定时任务每天凌晨 2 点执行任务固定时间
重试机制消费失败后延迟重试指数退避
数据同步数据变更后延迟同步几秒~几分钟

1.3 与其他方案对比

方案延迟精度可靠性性能复杂度
RocketMQ秒级
Redis ZSet毫秒级
RabbitMQ TTL+DLX毫秒级
Kafka 时间轮秒级

二、实现原理

2.1 延迟级别

RocketMQ 支持 18 个延迟级别:

// 延迟级别定义
public enum DelayLevel {
    LEVEL_1(1, "1s"),
    LEVEL_2(2, "5s"),
    LEVEL_3(3, "10s"),
    LEVEL_4(4, "30s"),
    LEVEL_5(5, "1m"),
    LEVEL_6(6, "2m"),
    LEVEL_7(7, "3m"),
    LEVEL_8(8, "4m"),
    LEVEL_9(9, "5m"),
    LEVEL_10(10, "6m"),
    LEVEL_11(11, "7m"),
    LEVEL_12(12, "8m"),
    LEVEL_13(13, "9m"),
    LEVEL_14(14, "10m"),
    LEVEL_15(15, "20m"),
    LEVEL_16(16, "30m"),
    LEVEL_17(17, "1h"),
    LEVEL_18(18, "2h");
    
    private final int level;
    private final String desc;
}

2.2 存储结构

graph TB
    subgraph 发送阶段
        P[Producer] -->|发送 | SCHEDULE_TOPIC
    end
    
    subgraph Broker
        ST[SCHEDULE_TOPIC_XX<br/>延迟队列]
        QT[Queue Offset Table<br/>偏移量表]
        DT[Delivery Table<br/>投递表]
    end
    
    SCHEDULE_TOPIC --> ST
    ST --> QT
    QT --> DT
    
    subgraph 投递阶段
        DT -->|到期 | RT[Real Topic<br/>真实队列]
        RT --> C[Consumer]
    end

2.3 处理流程

sequenceDiagram
    participant P as Producer
    participant B as Broker
    participant S as ScheduleService
    participant R as RealTopic
    
    P->>B: 发送延迟消息<br/>delayLevel=4
    B->>B: 转换为 SCHEDULE_TOPIC_4
    B-->>P: 返回成功
    
    loop 每秒检查
        S->>S: 检查到期消息
    end
    
    S->>S: 消息到期?
    S->>R: 还原真实 Topic
    R->>C: 投递给消费者

2.4 源码分析

// Broker 端处理延迟消息
public class ScheduleMessageService {
    
    // 延迟级别映射表
    private static final Map<Integer, String> DELAY_TOPIC_MAP = new HashMap<>();
    
    static {
        for (int i = 1; i <= 18; i++) {
            DELAY_TOPIC_MAP.put(i, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC + "_" + i);
        }
    }
    
    /**
     * 准备延迟消息
     */
    public MessageExt prepareMessage(MessageExt msg) {
        int delayLevel = msg.getDelayTimeLevel();
        
        // 验证延迟级别
        if (delayLevel > 18) {
            delayLevel = 18;
        }
        
        // 保存原始 Topic
        msg.putProperty(MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
        
        // 保存原始 Queue ID
        msg.putProperty(MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
        
        // 转换为延迟 Topic
        msg.setTopic(DELAY_TOPIC_MAP.get(delayLevel));
        
        return msg;
    }
    
    /**
     * 投递到期消息
     */
    public void deliverDelayedMessage(int delayLevel, long offset) {
        // 1. 从延迟队列读取消息
        MessageExt msg = consumeQueue.getMessage(offset);
        
        // 2. 检查是否到期
        if (msg.getStoreTimestamp() + getDelayMillis(delayLevel) > System.currentTimeMillis()) {
            return;  // 未到期
        }
        
        // 3. 还原真实 Topic
        String realTopic = msg.getProperty(MessageConst.PROPERTY_REAL_TOPIC);
        int realQueueId = Integer.parseInt(msg.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID));
        
        msg.setTopic(realTopic);
        msg.setQueueId(realQueueId);
        
        // 4. 写入真实队列
        commitRealTopic(msg);
    }
}

三、代码实现

3.1 发送延迟消息

@Service
public class DelayMessageProducer {
    
    @Autowired
    private DefaultMQProducer producer;
    
    /**
     * 发送延迟消息
     */
    public SendResult sendDelayMessage(String topic, String body, int delayLevel) {
        Message msg = new Message(topic, body.getBytes());
        
        // 设置延迟级别
        msg.setDelayTimeLevel(delayLevel);
        
        // 发送消息
        SendResult result = producer.send(msg);
        
        log.info("延迟消息发送成功:topic={}, delayLevel={}, msgId={}", 
            topic, delayLevel, result.getMsgId());
        
        return result;
    }
    
    /**
     * 发送订单超时取消消息
     */
    public void sendOrderTimeoutMessage(Long orderId, long timeoutMinutes) {
        OrderTimeoutMessage msg = new OrderTimeoutMessage();
        msg.setOrderId(orderId);
        msg.setTimeoutTime(System.currentTimeMillis() + timeoutMinutes * 60 * 1000);
        
        Message message = new Message(
            "order-timeout-topic",
            "timeout",
            orderId.toString(),
            JSON.toJSONString(msg).getBytes()
        );
        
        // 根据超时时间选择延迟级别
        int delayLevel = calculateDelayLevel(timeoutMinutes);
        message.setDelayTimeLevel(delayLevel);
        
        producer.send(message);
    }
    
    private int calculateDelayLevel(long minutes) {
        if (minutes <= 1) return 1;      // 1s
        if (minutes <= 5) return 2;      // 5s
        if (minutes <= 10) return 3;     // 10s
        if (minutes <= 30) return 4;     // 30s
        if (minutes <= 60) return 5;     // 1m
        if (minutes <= 120) return 6;    // 2m
        if (minutes <= 180) return 7;    // 3m
        if (minutes <= 240) return 8;    // 4m
        if (minutes <= 300) return 9;    // 5m
        if (minutes <= 360) return 10;   // 6m
        if (minutes <= 420) return 11;   // 7m
        if (minutes <= 480) return 12;   // 8m
        if (minutes <= 540) return 13;   // 9m
        if (minutes <= 600) return 14;   // 10m
        if (minutes <= 1200) return 15;  // 20m
        if (minutes <= 1800) return 16;  // 30m
        if (minutes <= 3600) return 17;  // 1h
        return 18;                        // 2h
    }
}

3.2 消费延迟消息

@Service
public class DelayMessageConsumer {
    
    @PostConstruct
    public void init() throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay-consumer-group");
        consumer.setNamesrvAddr("localhost:9876");
        
        // 订阅延迟消息 Topic
        consumer.subscribe("order-timeout-topic", "timeout");
        
        // 注册消息监听器
        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            for (MessageExt msg : msgs) {
                try {
                    // 解析消息
                    OrderTimeoutMessage timeoutMsg = JSON.parseObject(
                        new String(msg.getBody()), OrderTimeoutMessage.class);
                    
                    log.info("收到超时消息:orderId={}, timeoutTime={}", 
                        timeoutMsg.getOrderId(), timeoutMsg.getTimeoutTime());
                    
                    // 处理订单超时
                    handleOrderTimeout(timeoutMsg.getOrderId());
                    
                } catch (Exception e) {
                    log.error("处理超时消息失败:orderId={}", 
                        timeoutMsg.getOrderId(), e);
                    
                    // 返回重试
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        
        consumer.start();
        log.info("延迟消息消费者启动成功");
    }
    
    private void handleOrderTimeout(Long orderId) {
        // 1. 查询订单状态
        Order order = orderService.queryById(orderId);
        
        // 2. 检查是否需要取消
        if (order != null && order.getStatus() == OrderStatus.UNPAID) {
            // 3. 取消订单
            orderService.cancelOrder(orderId, "超时未支付");
            
            // 4. 发送通知
            notificationService.sendCancelNotification(order);
            
            log.info("订单已取消:orderId={}", orderId);
        }
    }
}

四、定时消息

4.1 实现方式

方式一:计算延迟级别

public void sendScheduledMessage(String topic, String body, long scheduledTime) {
    long delayMillis = scheduledTime - System.currentTimeMillis();
    int delayLevel = calculateDelayLevel(delayMillis);
    
    Message msg = new Message(topic, body.getBytes());
    msg.setDelayTimeLevel(delayLevel);
    
    producer.send(msg);
}

方式二:使用定时任务

@Component
public class ScheduledMessageTask {
    
    @Autowired
    private DefaultMQProducer producer;
    
    @Autowired
    private ScheduledTaskMapper taskMapper;
    
    /**
     * 定时发送消息
     */
    @Scheduled(cron = "0 */1 * * * ?")  // 每分钟执行
    public void sendScheduledMessages() {
        // 1. 查询待发送的定时任务
        List<ScheduledTask> tasks = taskMapper.selectDueTasks(System.currentTimeMillis());
        
        for (ScheduledTask task : tasks) {
            try {
                // 2. 发送消息
                Message msg = new Message(
                    task.getTopic(),
                    task.getTag(),
                    task.getBody().getBytes()
                );
                
                producer.send(msg);
                
                // 3. 更新任务状态
                task.setStatus("SENT");
                taskMapper.update(task);
                
                log.info("定时消息发送成功:taskId={}", task.getId());
            } catch (Exception e) {
                log.error("定时消息发送失败:taskId={}", task.getId(), e);
            }
        }
    }
}

4.2 数据库方案

-- 定时任务表
CREATE TABLE scheduled_task (
    id BIGINT PRIMARY KEY AUTO_INCREMENT,
    topic VARCHAR(128) NOT NULL,
    tag VARCHAR(64),
    body TEXT NOT NULL,
    execute_time DATETIME NOT NULL,
    status VARCHAR(16) DEFAULT 'PENDING',
    retry_count INT DEFAULT 0,
    created_time DATETIME DEFAULT CURRENT_TIMESTAMP,
    
    INDEX idx_execute_time (execute_time),
    INDEX idx_status (status)
);
@Service
public class DatabaseScheduledTaskService {
    
    @Autowired
    private ScheduledTaskMapper taskMapper;
    
    @Autowired
    private DefaultMQProducer producer;
    
    /**
     * 添加定时任务
     */
    @Transactional
    public void addTask(ScheduledTask task) {
        taskMapper.insert(task);
    }
    
    /**
     * 定时扫描并发送
     */
    @Scheduled(fixedRate = 5000)  // 5 秒一次
    public void scanAndSend() {
        // 1. 批量查询到期任务
        PageHelper.startPage(1, 100);
        List<ScheduledTask> tasks = taskMapper.selectDueTasks(System.currentTimeMillis());
        
        for (ScheduledTask task : tasks) {
            try {
                // 2. 发送消息
                Message msg = new Message(
                    task.getTopic(),
                    task.getTag(),
                    task.getBody().getBytes()
                );
                
                producer.send(msg);
                
                // 3. 标记已完成
                task.setStatus("COMPLETED");
                taskMapper.update(task);
                
            } catch (Exception e) {
                // 4. 失败重试
                task.setRetryCount(task.getRetryCount() + 1);
                if (task.getRetryCount() >= 3) {
                    task.setStatus("FAILED");
                }
                taskMapper.update(task);
            }
        }
    }
}

五、最佳实践

5.1 延迟级别选择

业务场景延迟时间推荐级别
订单超时取消30 分钟Level 16
支付提醒15 分钟Level 15
会议提醒10 分钟Level 14
数据同步5 分钟Level 9
重试机制指数退避动态选择

5.2 幂等性保证

@Service
public class IdempotentDelayConsumer {
    
    @Autowired
    private StringRedisTemplate redisTemplate;
    
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, 
                                                    ConsumeConcurrentlyContext context) {
        for (MessageExt msg : msgs) {
            String msgId = msg.getMsgId();
            
            // 1. 检查是否已处理
            Boolean processed = redisTemplate.opsForValue()
                .setIfAbsent("delay:processed:" + msgId, "1", 24, TimeUnit.HOURS);
            
            if (Boolean.FALSE.equals(processed)) {
                log.info("消息已处理,跳过:msgId={}", msgId);
                continue;
            }
            
            try {
                // 2. 处理业务
                processBusiness(msg);
            } catch (Exception e) {
                log.error("处理失败:msgId={}", msgId, e);
                
                // 删除幂等标记,允许重试
                redisTemplate.delete("delay:processed:" + msgId);
                
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

5.3 监控告警

@Component
public class DelayMessageMetrics {
    
    @Autowired
    private MeterRegistry meterRegistry;
    
    private final AtomicLong sendCount = new AtomicLong();
    private final AtomicLong consumeCount = new AtomicLong();
    private final AtomicLong delayMillis = new AtomicLong();
    
    @PostConstruct
    public void init() {
        meterRegistry.gauge("delay.message.send.count", sendCount);
        meterRegistry.gauge("delay.message.consume.count", consumeCount);
        meterRegistry.gauge("delay.message.avg.delay.millis", delayMillis);
    }
    
    public void recordSend() {
        sendCount.incrementAndGet();
    }
    
    public void recordConsume(long actualDelay) {
        consumeCount.incrementAndGet();
        delayMillis.set(actualDelay);
    }
}

六、常见问题排查

6.1 延迟不准确

原因

解决

// 使用数据库方案实现精确延迟
public void sendExactDelayMessage(long exactDelayMillis) {
    long executeTime = System.currentTimeMillis() + exactDelayMillis;
    
    ScheduledTask task = new ScheduledTask();
    task.setTopic("order-topic");
    task.setBody("...");
    task.setExecuteTime(executeTime);
    
    taskMapper.insert(task);
}

6.2 消息堆积

原因

解决

# 增加消费者数量
# 增加消费线程
consumer.setConsumeThreadMax(100);

# 优化处理逻辑

6.3 消息丢失

原因

解决

// 同步发送
SendResult result = producer.send(msg);
if (result.getSendStatus() != SendStatus.SEND_OK) {
    // 重试或记录
}

// 开启事务消息
TransactionSendResult result = producer.sendMessageInTransaction(msg, null);

总结

RocketMQ 延迟消息的核心要点:

  1. 延迟级别:18 个固定级别,从 1 秒到 2 小时
  2. 实现原理:SCHEDULE_TOPIC + 定时投递
  3. 使用方式:setDelayTimeLevel()
  4. 定时消息:计算延迟级别或数据库方案
  5. 最佳实践:幂等性、监控、告警

核心要点

参考资料


分享这篇文章到:

上一篇文章
Redis 集群扩容与缩容实战
下一篇文章
Redis 最佳实践总结