Skip to content
清晨的一缕阳光
返回

RocketMQ 消息回溯与重置实战

RocketMQ 支持消息回溯功能,允许消费者重新消费历史消息。本文将深入探讨消息回溯的实现原理和实战应用。

一、消息回溯基础

1.1 什么是消息回溯?

定义

消息回溯 = 重新消费历史消息

场景:
- 系统故障后重放消息
- 新系统上线需要历史数据
- 数据修复需要重新处理

1.2 回溯原理

graph TB
    subgraph 正常消费
        C1[Consumer] -->|拉取 | Q1[Queue]
        Q1 -->|当前 Offset | O1[Offset=1000]
    end
    
    subgraph 回溯消费
        C2[Consumer] -->|重置 Offset | Q2[Queue]
        Q2 -->|回溯到 | O2[Offset=500]
        O2 -->|重新消费 | M1[消息 500-1000]
    end

1.3 回溯方式

方式说明适用场景
按时间重置重置到指定时间点重放某段时间的消息
按偏移量重置重置到指定 Offset精确控制位置
从开头消费重置到消息开头全量重放
从当前位置从当前 Offset 继续恢复消费

二、消息查询

2.1 按 MessageId 查询

# 命令行查询
mqadmin queryMsgById -n ns1:9876 -i msgId

# 输出
MessageId: 0A3A5F3B00002A9F0000000000000123
Topic: order-topic
Tags: pay
Keys: order_123
StoreTime: 2026-07-15 10:00:00
Offset: 1234

Java 查询

public MessageExt queryByMsgId(String msgId) throws Exception {
    DefaultMQAdminExt adminExt = new DefaultMQAdminExt();
    adminExt.setNamesrvAddr("ns1:9876");
    adminExt.start();
    
    try {
        return adminExt.viewMessage(msgId);
    } finally {
        adminExt.shutdown();
    }
}

2.2 按 Key 查询

# 命令行查询
mqadmin queryMsgByKey -n ns1:9876 -t order-topic -k order_123

# 输出多条匹配消息
MessageId: ...
MessageId: ...

Java 查询

public List<MessageExt> queryByKey(String topic, String key) throws Exception {
    DefaultMQAdminExt adminExt = new DefaultMQAdminExt();
    adminExt.setNamesrvAddr("ns1:9876");
    adminExt.start();
    
    try {
        QueryResult result = adminExt.queryMessage(topic, key, 32, 0, System.currentTimeMillis());
        return result.getMessageList();
    } finally {
        adminExt.shutdown();
    }
}

2.3 按时间查询

# 按时间戳查询
mqadmin queryMsgByOffset -n ns1:9876 -t order-topic -q 0 -o 1657857600000

# 时间戳转换
# 2026-07-15 10:00:00 = 1657857600000

Java 查询

public long queryOffsetByTime(String topic, int queueId, long timestamp) throws Exception {
    DefaultMQAdminExt adminExt = new DefaultMQAdminExt();
    adminExt.setNamesrvAddr("ns1:9876");
    adminExt.start();
    
    try {
        QueryOffsetResult result = adminExt.queryOffset(topic, queueId, timestamp);
        return result.getOffset();
    } finally {
        adminExt.shutdown();
    }
}

三、偏移量重置

3.1 按时间重置

# 重置到指定时间
mqadmin resetOffsetByTime -n ns1:9876 \
  -t order-topic \
  -g order-consumer-group \
  -s 2026-07-15#10:00:00 \
  -f true

# 参数说明
# -t: Topic
# -g: Consumer Group
# -s: 时间(格式:yyyy-MM-dd#HH:mm:ss)
# -f: 强制重置

Java 重置

public void resetOffsetByTime(String topic, String group, long timestamp) throws Exception {
    DefaultMQAdminExt adminExt = new DefaultMQAdminExt();
    adminExt.setNamesrvAddr("ns1:9876");
    adminExt.start();
    
    try {
        adminExt.resetOffsetByTimestamp(topic, group, timestamp, false);
    } finally {
        adminExt.shutdown();
    }
}

3.2 按偏移量重置

# 重置到指定 Offset
mqadmin resetOffsetByOffset -n ns1:9876 \
  -t order-topic \
  -g order-consumer-group \
  -o 1000 \
  -f true

Java 重置

public void resetOffsetByOffset(String topic, String group, long offset) throws Exception {
    DefaultMQAdminExt adminExt = new DefaultMQAdminExt();
    adminExt.setNamesrvAddr("ns1:9876");
    adminExt.start();
    
    try {
        // 获取 Topic 路由
        TopicRouteData route = adminExt.examineTopicRouteInfo(topic);
        
        // 遍历所有 Queue
        for (MessageQueue mq : route.getMessageQueueList()) {
            adminExt.resetOffset(mq, group, offset);
        }
    } finally {
        adminExt.shutdown();
    }
}

3.3 从开头重置

# 重置到最小 Offset
mqadmin resetOffsetByTime -n ns1:9876 \
  -t order-topic \
  -g order-consumer-group \
  -s 0 \
  -f true

四、消息重放

4.1 临时消费者重放

public class MessageReplayService {
    
    /**
     * 重放指定时间段的消息
     */
    public void replayMessages(String topic, String group, 
                               long startTime, long endTime) throws Exception {
        // 1. 创建临时消费者
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("replay-group");
        consumer.setNamesrvAddr("ns1:9876");
        consumer.start();
        
        // 2. 获取 Queue 列表
        Set<MessageQueue> queues = consumer.fetchSubscribeMessageQueues(topic);
        
        // 3. 遍历每个 Queue
        for (MessageQueue mq : queues) {
            // 4. 查询起始 Offset
            long startOffset = consumer.searchOffset(mq, startTime);
            long endOffset = consumer.searchOffset(mq, endTime);
            
            // 5. 拉取并处理消息
            for (long offset = startOffset; offset < endOffset; offset += 100) {
                PullResult result = consumer.pull(mq, "*", offset, 100);
                
                for (MessageExt msg : result.getMsgFoundList()) {
                    processMessage(msg);
                }
            }
        }
        
        consumer.shutdown();
    }
    
    private void processMessage(MessageExt msg) {
        // 重放处理逻辑
        System.out.println("重放消息:" + msg.getMsgId());
    }
}

4.2 新消费者组重放

#!/bin/bash
# 创建新消费者组进行重放

OLD_GROUP="order-consumer-group"
NEW_GROUP="order-consumer-group-replay"
TOPIC="order-topic"
REPLAY_TIME="2026-07-15#10:00:00"

echo "=== 消息重放 ==="

# 1. 创建新消费者组
echo "创建新消费者组:$NEW_GROUP"

# 2. 重置偏移量到指定时间
echo "重置偏移量到:$REPLAY_TIME"
mqadmin resetOffsetByTime -n ns1:9876 \
  -t $TOPIC \
  -g $NEW_GROUP \
  -s $REPLAY_TIME \
  -f true

# 3. 启动新消费者
echo "启动新消费者..."
java -jar consumer.jar \
  --namesrv=ns1:9876 \
  --topic=$TOPIC \
  --group=$NEW_GROUP

echo "重放完成"

4.3 消息备份后重放

public class MessageBackupReplay {
    
    private final DefaultMQPullConsumer pullConsumer;
    private final DefaultMQProducer producer;
    
    public MessageBackupReplay() throws Exception {
        // 拉取消费者
        pullConsumer = new DefaultMQPullConsumer("backup-consumer");
        pullConsumer.setNamesrvAddr("ns1:9876");
        pullConsumer.start();
        
        // 生产者(用于备份)
        producer = new DefaultMQProducer("backup-producer");
        producer.setNamesrvAddr("ns1:9876");
        producer.start();
    }
    
    /**
     * 备份消息
     */
    public void backupMessages(String topic, long startTime, long endTime, 
                               String backupTopic) throws Exception {
        Set<MessageQueue> queues = pullConsumer.fetchSubscribeMessageQueues(topic);
        
        for (MessageQueue mq : queues) {
            long startOffset = pullConsumer.searchOffset(mq, startTime);
            long endOffset = pullConsumer.searchOffset(mq, endTime);
            
            for (long offset = startOffset; offset < endOffset; offset += 100) {
                PullResult result = pullConsumer.pull(mq, "*", offset, 100);
                
                // 备份到备份 Topic
                List<Message> messages = new ArrayList<>();
                for (MessageExt msg : result.getMsgFoundList()) {
                    Message backupMsg = new Message(
                        backupTopic,
                        msg.getTags(),
                        msg.getKeys(),
                        msg.getBody()
                    );
                    // 复制属性
                    backupMsg.putUserProperty("original_topic", topic);
                    backupMsg.putUserProperty("original_msgId", msg.getMsgId());
                    messages.add(backupMsg);
                }
                
                if (!messages.isEmpty()) {
                    producer.send(messages);
                }
            }
        }
    }
    
    /**
     * 从备份重放
     */
    public void replayFromBackup(String backupTopic, String targetTopic) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("replay-consumer");
        consumer.setNamesrvAddr("ns1:9876");
        consumer.subscribe(backupTopic, "*");
        
        consumer.registerMessageListener((msgs, context) -> {
            for (MessageExt backupMsg : msgs) {
                // 恢复原始 Topic
                String originalTopic = backupMsg.getUserProperty("original_topic");
                
                // 发送到目标 Topic
                Message msg = new Message(
                    targetTopic,
                    backupMsg.getTags(),
                    backupMsg.getKeys(),
                    backupMsg.getBody()
                );
                
                try {
                    producer.send(msg);
                } catch (Exception e) {
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });
        
        consumer.start();
    }
}

五、实战案例

5.1 系统故障后重放

#!/bin/bash
# 系统故障后消息重放脚本

NAMESRV="ns1:9876"
TOPIC="order-topic"
GROUP="order-consumer-group"
FAULT_START="2026-07-15#10:00:00"
FAULT_END="2026-07-15#12:00:00"

echo "=== 系统故障后消息重放 ==="

# 1. 查看当前消费进度
echo "当前消费进度:"
mqadmin consumerProgress -n $NAMESRV -g $GROUP

# 2. 重置偏移量到故障前
echo "重置偏移量到:$FAULT_START"
mqadmin resetOffsetByTime -n $NAMESRV \
  -t $TOPIC \
  -g $GROUP \
  -s $FAULT_START \
  -f true

# 3. 重启消费者
echo "重启消费者..."
systemctl restart order-consumer

# 4. 监控消费进度
echo "监控消费进度..."
watch -n 5 "mqadmin consumerProgress -n $NAMESRV -g $GROUP"

5.2 新系统上线数据迁移

public class DataMigrationService {
    
    /**
     * 新系统上线,重放历史数据
     */
    public void migrateOldData(String oldTopic, String newGroup, 
                               LocalDate startDate, LocalDate endDate) throws Exception {
        
        DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer("migration-consumer");
        pullConsumer.setNamesrvAddr("ns1:9876");
        pullConsumer.start();
        
        Set<MessageQueue> queues = pullConsumer.fetchSubscribeMessageQueues(oldTopic);
        
        for (MessageQueue mq : queues) {
            // 按天迁移
            for (LocalDate date = startDate; !date.isAfter(endDate); date = date.plusDays(1)) {
                long startTime = date.atStartOfDay(ZoneId.systemDefault()).toInstant().toEpochMilli();
                long endTime = date.plusDays(1).atStartOfDay(ZoneId.systemDefault()).toInstant().toEpochMilli();
                
                long startOffset = pullConsumer.searchOffset(mq, startTime);
                long endOffset = pullConsumer.searchOffset(mq, endTime);
                
                System.out.printf("迁移日期:%s, Offset: %d - %d%n", date, startOffset, endOffset);
                
                // 拉取并处理
                for (long offset = startOffset; offset < endOffset; offset += 100) {
                    PullResult result = pullConsumer.pull(mq, "*", offset, 100);
                    
                    for (MessageExt msg : result.getMsgFoundList()) {
                        // 转换并存储到新系统
                        migrateMessage(msg);
                    }
                }
            }
        }
        
        pullConsumer.shutdown();
    }
    
    private void migrateMessage(MessageExt msg) {
        // 数据转换逻辑
    }
}

5.3 数据修复重放

public class DataFixReplayService {
    
    /**
     * 数据修复后重放特定消息
     */
    public void replayFixedMessages(String topic, String group, 
                                    List<String> msgIds) throws Exception {
        DefaultMQAdminExt adminExt = new DefaultMQAdminExt();
        adminExt.setNamesrvAddr("ns1:9876");
        adminExt.start();
        
        try {
            for (String msgId : msgIds) {
                // 1. 查询消息
                MessageExt msg = adminExt.viewMessage(msgId);
                
                // 2. 重新发送
                Message newMsg = new Message(
                    msg.getTopic(),
                    msg.getTags(),
                    msg.getKeys(),
                    msg.getBody()
                );
                
                // 复制属性
                for (Map.Entry<String, String> entry : msg.getProperties().entrySet()) {
                    if (!MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX.equals(entry.getKey())) {
                        newMsg.putUserProperty(entry.getKey(), entry.getValue());
                    }
                }
                
                // 3. 发送到原 Topic
                DefaultMQProducer producer = new DefaultMQProducer("fix-producer");
                producer.setNamesrvAddr("ns1:9876");
                producer.start();
                
                producer.send(newMsg);
                producer.shutdown();
                
                System.out.println("重放消息:" + msgId);
            }
        } finally {
            adminExt.shutdown();
        }
    }
}

六、注意事项

6.1 消息保留时间

# Broker 配置
fileReservedTime=72  # 消息保留 72 小时

# 超过保留时间的消息无法回溯

6.2 重复消费风险

// 重放前通知下游系统
// 确保下游系统支持幂等

// 重放后检查数据一致性
public void verifyDataConsistency() {
    // 比对重放前后数据
}

6.3 性能影响

# 重放时限制拉取速度
mqadmin resetOffsetByTime -n ns1:9876 \
  -t order-topic \
  -g order-consumer-group \
  -s 2026-07-15#10:00:00 \
  --speed 50  # 每秒 50 条

七、最佳实践

7.1 重放检查清单

重放前检查:
- [ ] 确认重放时间范围
- [ ] 通知下游系统
- [ ] 备份当前状态
- [ ] 准备回滚方案

重放中监控:
- [ ] 消费进度
- [ ] 错误率
- [ ] 系统负载

重放后验证:
- [ ] 数据一致性
- [ ] 业务功能
- [ ] 监控指标

7.2 重放策略

场景策略说明
系统故障按时间重置重放故障期间消息
新系统上线从开头消费全量历史数据
数据修复按 MessageId精确重放特定消息
测试验证新消费者组不影响生产

总结

RocketMQ 消息回溯的核心要点:

  1. 消息查询:按 MessageId、Key、时间查询
  2. 偏移量重置:按时间、按 Offset、从开头
  3. 消息重放:临时消费者、新消费者组、备份重放
  4. 实战应用:故障恢复、数据迁移、数据修复
  5. 注意事项:保留时间、重复消费、性能影响

核心要点

参考资料


分享这篇文章到:

上一篇文章
AI 应用成本控制实战
下一篇文章
Redis 集群扩容与缩容实战