本文汇总了 RocketMQ 生产环境中的典型问题排查案例,包括消息丢失、重复、堆积、性能下降等问题的分析和解决方案。
一、消息丢失案例
案例 1:异步刷盘导致消息丢失
现象:
- Broker 宕机后部分消息丢失
- 消费者未收到部分消息
排查过程:
# 1. 检查 Broker 配置
cat broker.conf | grep flushDiskType
# 发现:flushDiskType=ASYNC_FLUSH
# 2. 检查 Broker 日志
tail -f /var/log/rocketmq/broker.log | grep "shutdown"
# 发现:Broker 异常宕机,未正常刷盘
# 3. 分析时间线
T1: Producer 发送消息
T2: Broker 接收并返回成功
T3: 消息在内存中,未刷盘
T4: Broker 宕机
结果:消息丢失
原因分析:
异步刷盘流程:
Producer -> Broker -> 返回成功 -> 异步刷盘 -> 宕机
↓
数据丢失
解决方案:
# 修改为同步刷盘
flushDiskType=SYNC_FLUSH
# 或增加副本
brokerRole=SYNC_MASTER
min.insync.replicas=2
效果:消息丢失问题彻底解决
案例 2:Producer 未处理异常
现象:
- 消息发送失败但无告警
- 业务数据不一致
排查过程:
// 原代码
SendResult result = producer.send(msg);
// 未检查 SendStatus
// 修改为
SendResult result = producer.send(msg);
if (result.getSendStatus() != SendStatus.SEND_OK) {
log.error("发送失败:{}", result);
handleFailedMessage(msg, result);
}
解决方案:
public class ReliableProducer {
public boolean sendReliable(Message msg) {
int maxRetries = 3;
for (int i = 0; i < maxRetries; i++) {
try {
SendResult result = producer.send(msg);
if (result.getSendStatus() == SendStatus.SEND_OK) {
return true;
}
} catch (Exception e) {
log.warn("发送失败,第 {} 次重试", i + 1, e);
}
}
// 记录失败消息
saveFailedMessage(msg);
return false;
}
}
二、消息重复案例
案例 3:网络重试导致重复
现象:
- 消费者收到重复消息
- 业务数据重复
排查过程:
# 1. 检查 Producer 日志
grep "retry" /var/log/producer/app.log
# 发现:大量重试记录
# 2. 检查网络
ping broker-1
# 发现:网络偶尔超时
# 3. 分析时间线
T1: Producer 发送消息
T2: Broker 接收并写入
T3: 响应超时(网络问题)
T4: Producer 重试发送
T5: Broker 再次写入
结果:消息重复
解决方案:
// 1. 业务层去重
@RedissonLock(key = "#msgId")
public void processMessage(String msgId, Message msg) {
// 检查是否已处理
if (isProcessed(msgId)) {
return;
}
// 处理业务
processBusiness(msg);
// 标记已处理
markAsProcessed(msgId);
}
// 2. 数据库唯一键
CREATE TABLE orders (
order_id VARCHAR(64) PRIMARY KEY,
...
);
案例 4:消费重复提交
现象:
- 消费组 Rebalance 后出现重复消息
- 自动提交位移
排查过程:
// 原代码
consumer.setAutoCommit(true); // 自动提交
// 问题场景
T1: Consumer A 拉取消息(offset=100-150)
T2: 处理消息(offset=100-120)
T3: 自动提交位移(offset=150)
T4: Rebalance 触发
T5: Consumer B 接管(从 offset=150 开始)
T6: offset=120-150 的消息未处理但已提交
结果:消息丢失或重复
解决方案:
// 1. 手动提交
consumer.setAutoCommit(false);
// 2. 处理完成后提交
while (true) {
List<MessageExt> msgs = consumer.poll();
for (MessageExt msg : msgs) {
process(msg); // 处理业务
}
consumer.commitSync(); // 同步提交
}
// 3. 或使用事务
@Transactional
public void consumeMessage(List<MessageExt> msgs) {
for (MessageExt msg : msgs) {
process(msg);
}
consumer.commitSync();
}
三、消息堆积案例
案例 5:消费逻辑慢导致堆积
现象:
- Consumer Lag 持续增长
- 消费 TPS 远低于生产 TPS
排查过程:
# 1. 查看消费组状态
mqadmin consumerProgress -n ns1:9876 -g order-consumer-group
# 输出:
# Group: order-consumer-group
# Topic: order-topic
# Diff: 80000 # 堆积 8 万条
# 2. 检查消费逻辑耗时
# 添加监控
processTime.record(System.currentTimeMillis() - startTime);
# 发现:平均处理时间 500ms
# 3. 分析代码
// 发现:每条消息都调用外部 HTTP 接口
for (MessageExt msg : messages) {
httpClient.post(...); // 同步调用,耗时 500ms
}
解决方案:
// 1. 批量处理外部调用
List<Order> orders = new ArrayList<>();
for (MessageExt msg : messages) {
orders.add(parse(msg));
}
// 批量调用
orderService.batchProcess(orders);
// 2. 异步处理
ExecutorService executor = Executors.newFixedThreadPool(10);
for (MessageExt msg : messages) {
executor.submit(() -> process(msg));
}
// 3. 增加消费者数量
// 从 2 个增加到 8 个
效果:
- 消费 TPS:200 → 2000
- Diff:80000 → 0(2 小时内)
案例 6:Queue 数量不足
现象:
- 消费者数量增加后,消费速度未提升
- 部分消费者空闲
排查过程:
# 1. 查看 Topic 配置
mqadmin updateTopic -n ns1:9876 -t order-topic
# 输出:readQueueNums=4
# 2. 查看消费者数量
# 当前:8 个消费者
# 3. 分析
# 4 个 Queue 最多支持 4 个消费者并发消费
# 多余的 4 个消费者空闲
解决方案:
# 增加 Queue 数量
mqadmin updateTopic -n ns1:9876 -t order-topic -c DefaultCluster -p 16 -r 16
# 验证
mqadmin updateTopic -n ns1:9876 -t order-topic
# readQueueNums=16
注意:
- Queue 数量只能增加,不能减少
- 可能影响消息顺序性
- 需要提前规划
四、性能下降案例
案例 7:磁盘 IO 瓶颈
现象:
- Broker TPS 突然下降
- 请求延迟升高
排查过程:
# 1. 检查磁盘 IO
iostat -x 1 5
# 输出:%util 接近 100%
# 2. 检查磁盘空间
df -h /data/rocketmq/store
# 输出:使用率 95%
# 3. 检查 GC
jstat -gcutil <pid> 1000
# 输出:Full GC 频繁
# 4. 分析日志
tail -f /var/log/rocketmq/broker.log
# 发现:大量 "Slowest request" 警告
解决方案:
# 1. 清理过期文件
# 修改保留策略
fileReservedTime=48 # 从 72 改为 48 小时
# 2. 迁移数据
# 将旧数据迁移到其他磁盘
# 3. 增加磁盘
# 挂载新磁盘,扩展 storePathCommitLog
# 4. 优化刷盘
flushCommitLogThoroughInterval=500 # 减少刷盘频率
案例 8:网络带宽不足
现象:
- 晚高峰期间性能下降
- 网络流量接近带宽上限
排查过程:
# 1. 检查网络流量
iftop -P -n -i eth0
# 输出:900Mbps / 1Gbps
# 2. 检查消息大小
# 平均消息大小:10KB
# 峰值 TPS:10 万
# 带宽需求:10KB * 10 万 = 1GB/s
# 3. 分析
# 带宽不足,导致请求排队
解决方案:
// 1. 消息压缩
MessageCompressor.compress(msg);
// 压缩后:10KB → 3KB
// 2. 增加带宽
# 升级网络:1Gbps → 10Gbps
// 3. 优化批量
consumer.setPullBatchSize(64); // 增大批量
五、Broker 故障案例
案例 9:Master 宕机切换
现象:
- Broker Master 突然宕机
- 生产消费短暂中断
排查过程:
# 1. 检查 Broker 状态
ps -ef | grep BrokerStartup
# 发现:Broker-1 进程不存在
# 2. 检查日志
tail -f /var/log/rocketmq/broker.log
# 发现:OOM 错误
# 3. 检查堆配置
cat runbroker.sh | grep HEAP
# 输出:-Xmx2G -Xms2G
解决方案:
# 1. 调整堆内存
export JAVA_OPT="-Xmx8G -Xms8G"
# 2. 优化 GC
export JAVA_OPT="$JAVA_OPT -XX:+UseG1GC -XX:MaxGCPauseMillis=20"
# 3. 重启 Broker
systemctl restart rocketmq-broker
# 4. 验证
mqadmin brokerStatus -n ns1:9876 -b broker-1:10911
恢复过程:
T0: Broker Master 宕机
T0+10s: Slave 检测到 Master 下线
T0+30s: Producer 切换到 Slave(只读)
T0+120s: Master 重启完成
T0+180s: 集群恢复正常
案例 10:NameServer 故障
现象:
- Producer/Consumer 无法连接
- 路由信息获取失败
排查过程:
# 1. 检查 NameServer 状态
ps -ef | grep NamesrvStartup
# 发现:NameServer 进程不存在
# 2. 检查客户端配置
cat client.properties | grep namesrvAddr
# 发现:只配置了一个 NameServer
# 3. 分析
# 单 NameServer 故障,客户端无法获取路由
解决方案:
// 1. 配置多个 NameServer
producer.setNamesrvAddr("ns1:9876;ns2:9876;ns3:9876");
// 2. 部署至少 2 个 NameServer
# 启动 NameServer 1
nohup mqnamesrv &
# 启动 NameServer 2
nohup mqnamesrv &
// 3. 客户端自动重试
producer.setRetryTimesWhenSendFailed(3);
六、排查工具
常用命令
# Topic 管理
mqadmin updateTopic -n ns1:9876 -t my-topic -c DefaultCluster -p 8 -r 8
mqadmin deleteTopic -n ns1:9876 -t my-topic -c DefaultCluster
# 消费组管理
mqadmin consumerProgress -n ns1:9876 -g my-group
mqadmin resetOffsetByTime -n ns1:9876 -t my-topic -g my-group -s 1609459200000
# 消息查询
mqadmin queryMsgById -n ns1:9876 -i msgId
mqadmin queryMsgByOffset -n ns1:9876 -t my-topic -o 1000
# Broker 状态
mqadmin brokerStatus -n ns1:9876 -b broker-1:10911
mqadmin clusterList -n ns1:9876
# 名称服务器
mqadmin namesrvStatus -n ns1:9876
监控脚本
#!/bin/bash
# RocketMQ 健康检查脚本
check_rocketmq_health() {
local namesrv=$1
# 检查 NameServer 连接
for ns in $(echo $namesrv | tr ';' ' '); do
if ! nc -z ${ns%:*} ${ns#*:} &>/dev/null; then
echo "CRITICAL: NameServer $ns 无法连接"
return 2
fi
done
# 检查 Broker 状态
broker_count=$(mqadmin clusterList -n $namesrv | grep -c "broker-id")
if [ "$broker_count" -lt 2 ]; then
echo "CRITICAL: Broker 数量不足"
return 2
fi
# 检查磁盘使用率
disk_ratio=$(mqadmin brokerStatus -n $namesrv -b broker-1:10911 | \
grep "commitLogDiskRatio" | awk -F: '{print $2}' | tr -d ' ')
if [ "${disk_ratio%.*}" -gt 80 ]; then
echo "WARNING: CommitLog 磁盘使用率过高:$disk_ratio%"
return 1
fi
# 检查消费堆积
lag=$(mqadmin consumerProgress -n $namesrv | awk '{sum+=$5} END {print sum}')
if [ "$lag" -gt 10000 ]; then
echo "WARNING: 消费堆积:$lag"
return 1
fi
echo "OK: RocketMQ 集群健康"
return 0
}
# 使用
check_rocketmq_health "ns1:9876;ns2:9876"
总结
RocketMQ 生产问题排查的核心要点:
- 消息丢失:同步刷盘、副本复制、重试机制
- 消息重复:幂等性、手动提交、去重逻辑
- 消息堆积:增加消费者、优化逻辑、增加 Queue
- 性能下降:磁盘 IO、网络带宽、GC 优化
- Broker 故障:主从切换、NameServer 高可用
核心要点:
- 建立完善的监控告警体系
- 掌握常用排查命令和工具
- 积累典型问题案例库
- 定期演练故障处理流程
参考资料
- RocketMQ Troubleshooting 官方文档
- RocketMQ 运维最佳实践
- 《RocketMQ 技术内幕》第 12 章