Skip to content
清晨的一缕阳光
返回

RocketMQ 生产问题排查案例集

本文汇总了 RocketMQ 生产环境中的典型问题排查案例,包括消息丢失、重复、堆积、性能下降等问题的分析和解决方案。

一、消息丢失案例

案例 1:异步刷盘导致消息丢失

现象

排查过程

# 1. 检查 Broker 配置
cat broker.conf | grep flushDiskType
# 发现:flushDiskType=ASYNC_FLUSH

# 2. 检查 Broker 日志
tail -f /var/log/rocketmq/broker.log | grep "shutdown"
# 发现:Broker 异常宕机,未正常刷盘

# 3. 分析时间线
T1: Producer 发送消息
T2: Broker 接收并返回成功
T3: 消息在内存中,未刷盘
T4: Broker 宕机
结果:消息丢失

原因分析

异步刷盘流程:
Producer -> Broker -> 返回成功 -> 异步刷盘 -> 宕机

                                数据丢失

解决方案

# 修改为同步刷盘
flushDiskType=SYNC_FLUSH

# 或增加副本
brokerRole=SYNC_MASTER
min.insync.replicas=2

效果:消息丢失问题彻底解决


案例 2:Producer 未处理异常

现象

排查过程

// 原代码
SendResult result = producer.send(msg);
// 未检查 SendStatus

// 修改为
SendResult result = producer.send(msg);
if (result.getSendStatus() != SendStatus.SEND_OK) {
    log.error("发送失败:{}", result);
    handleFailedMessage(msg, result);
}

解决方案

public class ReliableProducer {
    public boolean sendReliable(Message msg) {
        int maxRetries = 3;
        for (int i = 0; i < maxRetries; i++) {
            try {
                SendResult result = producer.send(msg);
                if (result.getSendStatus() == SendStatus.SEND_OK) {
                    return true;
                }
            } catch (Exception e) {
                log.warn("发送失败,第 {} 次重试", i + 1, e);
            }
        }
        // 记录失败消息
        saveFailedMessage(msg);
        return false;
    }
}

二、消息重复案例

案例 3:网络重试导致重复

现象

排查过程

# 1. 检查 Producer 日志
grep "retry" /var/log/producer/app.log
# 发现:大量重试记录

# 2. 检查网络
ping broker-1
# 发现:网络偶尔超时

# 3. 分析时间线
T1: Producer 发送消息
T2: Broker 接收并写入
T3: 响应超时(网络问题)
T4: Producer 重试发送
T5: Broker 再次写入
结果:消息重复

解决方案

// 1. 业务层去重
@RedissonLock(key = "#msgId")
public void processMessage(String msgId, Message msg) {
    // 检查是否已处理
    if (isProcessed(msgId)) {
        return;
    }
    
    // 处理业务
    processBusiness(msg);
    
    // 标记已处理
    markAsProcessed(msgId);
}

// 2. 数据库唯一键
CREATE TABLE orders (
    order_id VARCHAR(64) PRIMARY KEY,
    ...
);

案例 4:消费重复提交

现象

排查过程

// 原代码
consumer.setAutoCommit(true);  // 自动提交

// 问题场景
T1: Consumer A 拉取消息(offset=100-150
T2: 处理消息(offset=100-120
T3: 自动提交位移(offset=150
T4: Rebalance 触发
T5: Consumer B 接管(从 offset=150 开始)
T6: offset=120-150 的消息未处理但已提交
结果:消息丢失或重复

解决方案

// 1. 手动提交
consumer.setAutoCommit(false);

// 2. 处理完成后提交
while (true) {
    List<MessageExt> msgs = consumer.poll();
    
    for (MessageExt msg : msgs) {
        process(msg);  // 处理业务
    }
    
    consumer.commitSync();  // 同步提交
}

// 3. 或使用事务
@Transactional
public void consumeMessage(List<MessageExt> msgs) {
    for (MessageExt msg : msgs) {
        process(msg);
    }
    consumer.commitSync();
}

三、消息堆积案例

案例 5:消费逻辑慢导致堆积

现象

排查过程

# 1. 查看消费组状态
mqadmin consumerProgress -n ns1:9876 -g order-consumer-group

# 输出:
# Group: order-consumer-group
# Topic: order-topic
# Diff: 80000  # 堆积 8 万条

# 2. 检查消费逻辑耗时
# 添加监控
processTime.record(System.currentTimeMillis() - startTime);
# 发现:平均处理时间 500ms

# 3. 分析代码
// 发现:每条消息都调用外部 HTTP 接口
for (MessageExt msg : messages) {
    httpClient.post(...);  // 同步调用,耗时 500ms
}

解决方案

// 1. 批量处理外部调用
List<Order> orders = new ArrayList<>();
for (MessageExt msg : messages) {
    orders.add(parse(msg));
}
// 批量调用
orderService.batchProcess(orders);

// 2. 异步处理
ExecutorService executor = Executors.newFixedThreadPool(10);
for (MessageExt msg : messages) {
    executor.submit(() -> process(msg));
}

// 3. 增加消费者数量
// 从 2 个增加到 8 个

效果


案例 6:Queue 数量不足

现象

排查过程

# 1. 查看 Topic 配置
mqadmin updateTopic -n ns1:9876 -t order-topic
# 输出:readQueueNums=4

# 2. 查看消费者数量
# 当前:8 个消费者

# 3. 分析
# 4 个 Queue 最多支持 4 个消费者并发消费
# 多余的 4 个消费者空闲

解决方案

# 增加 Queue 数量
mqadmin updateTopic -n ns1:9876 -t order-topic -c DefaultCluster -p 16 -r 16

# 验证
mqadmin updateTopic -n ns1:9876 -t order-topic
# readQueueNums=16

注意

四、性能下降案例

案例 7:磁盘 IO 瓶颈

现象

排查过程

# 1. 检查磁盘 IO
iostat -x 1 5
# 输出:%util 接近 100%

# 2. 检查磁盘空间
df -h /data/rocketmq/store
# 输出:使用率 95%

# 3. 检查 GC
jstat -gcutil <pid> 1000
# 输出:Full GC 频繁

# 4. 分析日志
tail -f /var/log/rocketmq/broker.log
# 发现:大量 "Slowest request" 警告

解决方案

# 1. 清理过期文件
# 修改保留策略
fileReservedTime=48  # 从 72 改为 48 小时

# 2. 迁移数据
# 将旧数据迁移到其他磁盘

# 3. 增加磁盘
# 挂载新磁盘,扩展 storePathCommitLog

# 4. 优化刷盘
flushCommitLogThoroughInterval=500  # 减少刷盘频率

案例 8:网络带宽不足

现象

排查过程

# 1. 检查网络流量
iftop -P -n -i eth0
# 输出:900Mbps / 1Gbps

# 2. 检查消息大小
# 平均消息大小:10KB
# 峰值 TPS:10 万
# 带宽需求:10KB * 10 万 = 1GB/s

# 3. 分析
# 带宽不足,导致请求排队

解决方案

// 1. 消息压缩
MessageCompressor.compress(msg);
// 压缩后:10KB → 3KB

// 2. 增加带宽
# 升级网络:1Gbps → 10Gbps

// 3. 优化批量
consumer.setPullBatchSize(64);  // 增大批量

五、Broker 故障案例

案例 9:Master 宕机切换

现象

排查过程

# 1. 检查 Broker 状态
ps -ef | grep BrokerStartup
# 发现:Broker-1 进程不存在

# 2. 检查日志
tail -f /var/log/rocketmq/broker.log
# 发现:OOM 错误

# 3. 检查堆配置
cat runbroker.sh | grep HEAP
# 输出:-Xmx2G -Xms2G

解决方案

# 1. 调整堆内存
export JAVA_OPT="-Xmx8G -Xms8G"

# 2. 优化 GC
export JAVA_OPT="$JAVA_OPT -XX:+UseG1GC -XX:MaxGCPauseMillis=20"

# 3. 重启 Broker
systemctl restart rocketmq-broker

# 4. 验证
mqadmin brokerStatus -n ns1:9876 -b broker-1:10911

恢复过程

T0: Broker Master 宕机
T0+10s: Slave 检测到 Master 下线
T0+30s: Producer 切换到 Slave(只读)
T0+120s: Master 重启完成
T0+180s: 集群恢复正常

案例 10:NameServer 故障

现象

排查过程

# 1. 检查 NameServer 状态
ps -ef | grep NamesrvStartup
# 发现:NameServer 进程不存在

# 2. 检查客户端配置
cat client.properties | grep namesrvAddr
# 发现:只配置了一个 NameServer

# 3. 分析
# 单 NameServer 故障,客户端无法获取路由

解决方案

// 1. 配置多个 NameServer
producer.setNamesrvAddr("ns1:9876;ns2:9876;ns3:9876");

// 2. 部署至少 2 个 NameServer
# 启动 NameServer 1
nohup mqnamesrv &

# 启动 NameServer 2
nohup mqnamesrv &

// 3. 客户端自动重试
producer.setRetryTimesWhenSendFailed(3);

六、排查工具

常用命令

# Topic 管理
mqadmin updateTopic -n ns1:9876 -t my-topic -c DefaultCluster -p 8 -r 8
mqadmin deleteTopic -n ns1:9876 -t my-topic -c DefaultCluster

# 消费组管理
mqadmin consumerProgress -n ns1:9876 -g my-group
mqadmin resetOffsetByTime -n ns1:9876 -t my-topic -g my-group -s 1609459200000

# 消息查询
mqadmin queryMsgById -n ns1:9876 -i msgId
mqadmin queryMsgByOffset -n ns1:9876 -t my-topic -o 1000

# Broker 状态
mqadmin brokerStatus -n ns1:9876 -b broker-1:10911
mqadmin clusterList -n ns1:9876

# 名称服务器
mqadmin namesrvStatus -n ns1:9876

监控脚本

#!/bin/bash
# RocketMQ 健康检查脚本

check_rocketmq_health() {
    local namesrv=$1
    
    # 检查 NameServer 连接
    for ns in $(echo $namesrv | tr ';' ' '); do
        if ! nc -z ${ns%:*} ${ns#*:} &>/dev/null; then
            echo "CRITICAL: NameServer $ns 无法连接"
            return 2
        fi
    done
    
    # 检查 Broker 状态
    broker_count=$(mqadmin clusterList -n $namesrv | grep -c "broker-id")
    if [ "$broker_count" -lt 2 ]; then
        echo "CRITICAL: Broker 数量不足"
        return 2
    fi
    
    # 检查磁盘使用率
    disk_ratio=$(mqadmin brokerStatus -n $namesrv -b broker-1:10911 | \
        grep "commitLogDiskRatio" | awk -F: '{print $2}' | tr -d ' ')
    
    if [ "${disk_ratio%.*}" -gt 80 ]; then
        echo "WARNING: CommitLog 磁盘使用率过高:$disk_ratio%"
        return 1
    fi
    
    # 检查消费堆积
    lag=$(mqadmin consumerProgress -n $namesrv | awk '{sum+=$5} END {print sum}')
    if [ "$lag" -gt 10000 ]; then
        echo "WARNING: 消费堆积:$lag"
        return 1
    fi
    
    echo "OK: RocketMQ 集群健康"
    return 0
}

# 使用
check_rocketmq_health "ns1:9876;ns2:9876"

总结

RocketMQ 生产问题排查的核心要点:

  1. 消息丢失:同步刷盘、副本复制、重试机制
  2. 消息重复:幂等性、手动提交、去重逻辑
  3. 消息堆积:增加消费者、优化逻辑、增加 Queue
  4. 性能下降:磁盘 IO、网络带宽、GC 优化
  5. Broker 故障:主从切换、NameServer 高可用

核心要点

参考资料


分享这篇文章到:

上一篇文章
Redis 主从复制原理
下一篇文章
RocketMQ 消费者组与订阅关系详解