RocketMQ 支持消息回溯功能,允许消费者重新消费历史消息。本文将深入探讨消息回溯的实现原理和实战应用。
一、消息回溯基础
1.1 什么是消息回溯?
定义:
消息回溯 = 重新消费历史消息
场景:
- 系统故障后重放消息
- 新系统上线需要历史数据
- 数据修复需要重新处理
1.2 回溯原理
graph TB
subgraph 正常消费
C1[Consumer] -->|拉取 | Q1[Queue]
Q1 -->|当前 Offset | O1[Offset=1000]
end
subgraph 回溯消费
C2[Consumer] -->|重置 Offset | Q2[Queue]
Q2 -->|回溯到 | O2[Offset=500]
O2 -->|重新消费 | M1[消息 500-1000]
end
1.3 回溯方式
| 方式 | 说明 | 适用场景 |
|---|---|---|
| 按时间重置 | 重置到指定时间点 | 重放某段时间的消息 |
| 按偏移量重置 | 重置到指定 Offset | 精确控制位置 |
| 从开头消费 | 重置到消息开头 | 全量重放 |
| 从当前位置 | 从当前 Offset 继续 | 恢复消费 |
二、消息查询
2.1 按 MessageId 查询
# 命令行查询
mqadmin queryMsgById -n ns1:9876 -i msgId
# 输出
MessageId: 0A3A5F3B00002A9F0000000000000123
Topic: order-topic
Tags: pay
Keys: order_123
StoreTime: 2026-07-15 10:00:00
Offset: 1234
Java 查询:
public MessageExt queryByMsgId(String msgId) throws Exception {
DefaultMQAdminExt adminExt = new DefaultMQAdminExt();
adminExt.setNamesrvAddr("ns1:9876");
adminExt.start();
try {
return adminExt.viewMessage(msgId);
} finally {
adminExt.shutdown();
}
}
2.2 按 Key 查询
# 命令行查询
mqadmin queryMsgByKey -n ns1:9876 -t order-topic -k order_123
# 输出多条匹配消息
MessageId: ...
MessageId: ...
Java 查询:
public List<MessageExt> queryByKey(String topic, String key) throws Exception {
DefaultMQAdminExt adminExt = new DefaultMQAdminExt();
adminExt.setNamesrvAddr("ns1:9876");
adminExt.start();
try {
QueryResult result = adminExt.queryMessage(topic, key, 32, 0, System.currentTimeMillis());
return result.getMessageList();
} finally {
adminExt.shutdown();
}
}
2.3 按时间查询
# 按时间戳查询
mqadmin queryMsgByOffset -n ns1:9876 -t order-topic -q 0 -o 1657857600000
# 时间戳转换
# 2026-07-15 10:00:00 = 1657857600000
Java 查询:
public long queryOffsetByTime(String topic, int queueId, long timestamp) throws Exception {
DefaultMQAdminExt adminExt = new DefaultMQAdminExt();
adminExt.setNamesrvAddr("ns1:9876");
adminExt.start();
try {
QueryOffsetResult result = adminExt.queryOffset(topic, queueId, timestamp);
return result.getOffset();
} finally {
adminExt.shutdown();
}
}
三、偏移量重置
3.1 按时间重置
# 重置到指定时间
mqadmin resetOffsetByTime -n ns1:9876 \
-t order-topic \
-g order-consumer-group \
-s 2026-07-15#10:00:00 \
-f true
# 参数说明
# -t: Topic
# -g: Consumer Group
# -s: 时间(格式:yyyy-MM-dd#HH:mm:ss)
# -f: 强制重置
Java 重置:
public void resetOffsetByTime(String topic, String group, long timestamp) throws Exception {
DefaultMQAdminExt adminExt = new DefaultMQAdminExt();
adminExt.setNamesrvAddr("ns1:9876");
adminExt.start();
try {
adminExt.resetOffsetByTimestamp(topic, group, timestamp, false);
} finally {
adminExt.shutdown();
}
}
3.2 按偏移量重置
# 重置到指定 Offset
mqadmin resetOffsetByOffset -n ns1:9876 \
-t order-topic \
-g order-consumer-group \
-o 1000 \
-f true
Java 重置:
public void resetOffsetByOffset(String topic, String group, long offset) throws Exception {
DefaultMQAdminExt adminExt = new DefaultMQAdminExt();
adminExt.setNamesrvAddr("ns1:9876");
adminExt.start();
try {
// 获取 Topic 路由
TopicRouteData route = adminExt.examineTopicRouteInfo(topic);
// 遍历所有 Queue
for (MessageQueue mq : route.getMessageQueueList()) {
adminExt.resetOffset(mq, group, offset);
}
} finally {
adminExt.shutdown();
}
}
3.3 从开头重置
# 重置到最小 Offset
mqadmin resetOffsetByTime -n ns1:9876 \
-t order-topic \
-g order-consumer-group \
-s 0 \
-f true
四、消息重放
4.1 临时消费者重放
public class MessageReplayService {
/**
* 重放指定时间段的消息
*/
public void replayMessages(String topic, String group,
long startTime, long endTime) throws Exception {
// 1. 创建临时消费者
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("replay-group");
consumer.setNamesrvAddr("ns1:9876");
consumer.start();
// 2. 获取 Queue 列表
Set<MessageQueue> queues = consumer.fetchSubscribeMessageQueues(topic);
// 3. 遍历每个 Queue
for (MessageQueue mq : queues) {
// 4. 查询起始 Offset
long startOffset = consumer.searchOffset(mq, startTime);
long endOffset = consumer.searchOffset(mq, endTime);
// 5. 拉取并处理消息
for (long offset = startOffset; offset < endOffset; offset += 100) {
PullResult result = consumer.pull(mq, "*", offset, 100);
for (MessageExt msg : result.getMsgFoundList()) {
processMessage(msg);
}
}
}
consumer.shutdown();
}
private void processMessage(MessageExt msg) {
// 重放处理逻辑
System.out.println("重放消息:" + msg.getMsgId());
}
}
4.2 新消费者组重放
#!/bin/bash
# 创建新消费者组进行重放
OLD_GROUP="order-consumer-group"
NEW_GROUP="order-consumer-group-replay"
TOPIC="order-topic"
REPLAY_TIME="2026-07-15#10:00:00"
echo "=== 消息重放 ==="
# 1. 创建新消费者组
echo "创建新消费者组:$NEW_GROUP"
# 2. 重置偏移量到指定时间
echo "重置偏移量到:$REPLAY_TIME"
mqadmin resetOffsetByTime -n ns1:9876 \
-t $TOPIC \
-g $NEW_GROUP \
-s $REPLAY_TIME \
-f true
# 3. 启动新消费者
echo "启动新消费者..."
java -jar consumer.jar \
--namesrv=ns1:9876 \
--topic=$TOPIC \
--group=$NEW_GROUP
echo "重放完成"
4.3 消息备份后重放
public class MessageBackupReplay {
private final DefaultMQPullConsumer pullConsumer;
private final DefaultMQProducer producer;
public MessageBackupReplay() throws Exception {
// 拉取消费者
pullConsumer = new DefaultMQPullConsumer("backup-consumer");
pullConsumer.setNamesrvAddr("ns1:9876");
pullConsumer.start();
// 生产者(用于备份)
producer = new DefaultMQProducer("backup-producer");
producer.setNamesrvAddr("ns1:9876");
producer.start();
}
/**
* 备份消息
*/
public void backupMessages(String topic, long startTime, long endTime,
String backupTopic) throws Exception {
Set<MessageQueue> queues = pullConsumer.fetchSubscribeMessageQueues(topic);
for (MessageQueue mq : queues) {
long startOffset = pullConsumer.searchOffset(mq, startTime);
long endOffset = pullConsumer.searchOffset(mq, endTime);
for (long offset = startOffset; offset < endOffset; offset += 100) {
PullResult result = pullConsumer.pull(mq, "*", offset, 100);
// 备份到备份 Topic
List<Message> messages = new ArrayList<>();
for (MessageExt msg : result.getMsgFoundList()) {
Message backupMsg = new Message(
backupTopic,
msg.getTags(),
msg.getKeys(),
msg.getBody()
);
// 复制属性
backupMsg.putUserProperty("original_topic", topic);
backupMsg.putUserProperty("original_msgId", msg.getMsgId());
messages.add(backupMsg);
}
if (!messages.isEmpty()) {
producer.send(messages);
}
}
}
}
/**
* 从备份重放
*/
public void replayFromBackup(String backupTopic, String targetTopic) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("replay-consumer");
consumer.setNamesrvAddr("ns1:9876");
consumer.subscribe(backupTopic, "*");
consumer.registerMessageListener((msgs, context) -> {
for (MessageExt backupMsg : msgs) {
// 恢复原始 Topic
String originalTopic = backupMsg.getUserProperty("original_topic");
// 发送到目标 Topic
Message msg = new Message(
targetTopic,
backupMsg.getTags(),
backupMsg.getKeys(),
backupMsg.getBody()
);
try {
producer.send(msg);
} catch (Exception e) {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
}
五、实战案例
5.1 系统故障后重放
#!/bin/bash
# 系统故障后消息重放脚本
NAMESRV="ns1:9876"
TOPIC="order-topic"
GROUP="order-consumer-group"
FAULT_START="2026-07-15#10:00:00"
FAULT_END="2026-07-15#12:00:00"
echo "=== 系统故障后消息重放 ==="
# 1. 查看当前消费进度
echo "当前消费进度:"
mqadmin consumerProgress -n $NAMESRV -g $GROUP
# 2. 重置偏移量到故障前
echo "重置偏移量到:$FAULT_START"
mqadmin resetOffsetByTime -n $NAMESRV \
-t $TOPIC \
-g $GROUP \
-s $FAULT_START \
-f true
# 3. 重启消费者
echo "重启消费者..."
systemctl restart order-consumer
# 4. 监控消费进度
echo "监控消费进度..."
watch -n 5 "mqadmin consumerProgress -n $NAMESRV -g $GROUP"
5.2 新系统上线数据迁移
public class DataMigrationService {
/**
* 新系统上线,重放历史数据
*/
public void migrateOldData(String oldTopic, String newGroup,
LocalDate startDate, LocalDate endDate) throws Exception {
DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer("migration-consumer");
pullConsumer.setNamesrvAddr("ns1:9876");
pullConsumer.start();
Set<MessageQueue> queues = pullConsumer.fetchSubscribeMessageQueues(oldTopic);
for (MessageQueue mq : queues) {
// 按天迁移
for (LocalDate date = startDate; !date.isAfter(endDate); date = date.plusDays(1)) {
long startTime = date.atStartOfDay(ZoneId.systemDefault()).toInstant().toEpochMilli();
long endTime = date.plusDays(1).atStartOfDay(ZoneId.systemDefault()).toInstant().toEpochMilli();
long startOffset = pullConsumer.searchOffset(mq, startTime);
long endOffset = pullConsumer.searchOffset(mq, endTime);
System.out.printf("迁移日期:%s, Offset: %d - %d%n", date, startOffset, endOffset);
// 拉取并处理
for (long offset = startOffset; offset < endOffset; offset += 100) {
PullResult result = pullConsumer.pull(mq, "*", offset, 100);
for (MessageExt msg : result.getMsgFoundList()) {
// 转换并存储到新系统
migrateMessage(msg);
}
}
}
}
pullConsumer.shutdown();
}
private void migrateMessage(MessageExt msg) {
// 数据转换逻辑
}
}
5.3 数据修复重放
public class DataFixReplayService {
/**
* 数据修复后重放特定消息
*/
public void replayFixedMessages(String topic, String group,
List<String> msgIds) throws Exception {
DefaultMQAdminExt adminExt = new DefaultMQAdminExt();
adminExt.setNamesrvAddr("ns1:9876");
adminExt.start();
try {
for (String msgId : msgIds) {
// 1. 查询消息
MessageExt msg = adminExt.viewMessage(msgId);
// 2. 重新发送
Message newMsg = new Message(
msg.getTopic(),
msg.getTags(),
msg.getKeys(),
msg.getBody()
);
// 复制属性
for (Map.Entry<String, String> entry : msg.getProperties().entrySet()) {
if (!MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX.equals(entry.getKey())) {
newMsg.putUserProperty(entry.getKey(), entry.getValue());
}
}
// 3. 发送到原 Topic
DefaultMQProducer producer = new DefaultMQProducer("fix-producer");
producer.setNamesrvAddr("ns1:9876");
producer.start();
producer.send(newMsg);
producer.shutdown();
System.out.println("重放消息:" + msgId);
}
} finally {
adminExt.shutdown();
}
}
}
六、注意事项
6.1 消息保留时间
# Broker 配置
fileReservedTime=72 # 消息保留 72 小时
# 超过保留时间的消息无法回溯
6.2 重复消费风险
// 重放前通知下游系统
// 确保下游系统支持幂等
// 重放后检查数据一致性
public void verifyDataConsistency() {
// 比对重放前后数据
}
6.3 性能影响
# 重放时限制拉取速度
mqadmin resetOffsetByTime -n ns1:9876 \
-t order-topic \
-g order-consumer-group \
-s 2026-07-15#10:00:00 \
--speed 50 # 每秒 50 条
七、最佳实践
7.1 重放检查清单
重放前检查:
- [ ] 确认重放时间范围
- [ ] 通知下游系统
- [ ] 备份当前状态
- [ ] 准备回滚方案
重放中监控:
- [ ] 消费进度
- [ ] 错误率
- [ ] 系统负载
重放后验证:
- [ ] 数据一致性
- [ ] 业务功能
- [ ] 监控指标
7.2 重放策略
| 场景 | 策略 | 说明 |
|---|---|---|
| 系统故障 | 按时间重置 | 重放故障期间消息 |
| 新系统上线 | 从开头消费 | 全量历史数据 |
| 数据修复 | 按 MessageId | 精确重放特定消息 |
| 测试验证 | 新消费者组 | 不影响生产 |
总结
RocketMQ 消息回溯的核心要点:
- 消息查询:按 MessageId、Key、时间查询
- 偏移量重置:按时间、按 Offset、从开头
- 消息重放:临时消费者、新消费者组、备份重放
- 实战应用:故障恢复、数据迁移、数据修复
- 注意事项:保留时间、重复消费、性能影响
核心要点:
- 掌握多种查询方式
- 根据场景选择重置方式
- 重放前做好备份和通知
- 重放后验证数据一致性
参考资料
- RocketMQ 消息回溯官方文档
- RocketMQ 运维工具
- 《RocketMQ 技术内幕》第 11 章