Skip to content
清晨的一缕阳光
返回

RocketMQ 性能调优实战指南

RocketMQ 性能调优是保障消息系统高吞吐、低延迟的关键。本文将从 Producer、Consumer、Broker 三个维度,深入探讨 RocketMQ 性能调优的实战技巧。

一、性能指标

1.1 核心指标

指标说明目标值
TPS每秒处理消息数> 10 万 TPS
延迟消息发送/消费延迟< 10ms
可用性系统可用时间比例> 99.99%
消息堆积未消费消息数量< 1000

1.2 监控指标

graph TB
    subgraph Producer 指标
        P1[sendRT]
        P2[sendTPS]
        P3[sendFailed]
    end
    
    subgraph Broker 指标
        B1[putTPS]
        B2[getTPS]
        B3[dispatchBehind]
    end
    
    subgraph Consumer 指标
        C1[consumeRT]
        C2[consumeTPS]
        C3[consumeFailed]
    end
    
    P1 --> B1
    B2 --> C1
    C3 --> P3

二、Producer 优化

2.1 发送方式优化

// 1. 异步发送(推荐)
producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        log.info("发送成功:msgId={}", sendResult.getMsgId());
    }
    
    @Override
    public void onException(Throwable e) {
        log.error("发送失败", e);
    }
});

// 2. 单向发送(日志场景)
producer.sendOneway(msg);

// 3. 批量发送
List<Message> messages = Arrays.asList(
    new Message("topic", "body1".getBytes()),
    new Message("topic", "body2".getBytes()),
    new Message("topic", "body3".getBytes())
);
producer.send(messages);

性能对比

发送方式TPS延迟可靠性
同步发送1 万5ms
异步发送5 万2ms
单向发送10 万1ms
批量发送8 万3ms

2.2 配置优化

Properties props = new Properties();

// 1. 增加超时时间
props.put("sendMsgTimeout", "5000");  // 5 秒

// 2. 调整重试次数
props.put("retryTimesWhenSendFailed", "3");
props.put("retryTimesWhenSendAsyncFailed", "3");

// 3. 关闭 VIP 通道(避免网络问题)
producer.setVipChannelEnabled(false);

// 4. 调整批次大小(内部配置)
// sendMessageThreadPoolNums=4

2.3 消息设计优化

// 1. 消息体大小控制
// 推荐:< 4KB
// 最大:4MB(默认)

// 2. 消息压缩
public Message createCompressedMessage(String topic, String body) {
    byte[] compressed = compress(body);  // GZIP/LZ4
    Message msg = new Message(topic, compressed);
    msg.putUserProperty("compress", "true");
    return msg;
}

// 3. 合理设计 Tag 和 Key
// Tag:用于过滤,不超过 20 个
// Key:用于查询,保持唯一性
msg.setTags("order-create");
msg.setKeys("order_123");

三、Consumer 优化

3.1 消费模式优化

// 1. 并发消费(推荐)
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    for (MessageExt msg : msgs) {
        process(msg);
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

// 2. 批量消费
consumer.registerMessageListener((msgs, context) -> {
    // 批量处理
    List<String> batch = new ArrayList<>();
    for (MessageExt msg : msgs) {
        batch.add(new String(msg.getBody()));
    }
    processBatch(batch);
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

// 3. 顺序消费(必要时)
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
    for (MessageExt msg : msgs) {
        process(msg);
    }
    return ConsumeOrderlyStatus.SUCCESS;
});

3.2 并发配置

// 消费线程配置
consumer.setConsumeThreadMin(20);  // 最小线程数
consumer.setConsumeThreadMax(64);  // 最大线程数

// 拉取配置
consumer.setPullBatchSize(32);     // 每次拉取 32 条
consumer.setPullInterval(0);       // 不延迟拉取

// 流控配置
consumer.setPullThresholdForQueue(200);      // 队列消息数阈值
consumer.setPullThresholdSizeForQueue(100);  // 队列大小阈值 (MB)

3.3 位移提交优化

// 1. 自动提交(默认)
consumer.setAutoCommit(true);
consumer.setAutoCommitInterval(5000);  // 5 秒

// 2. 手动提交
consumer.registerMessageListener((msgs, context) -> {
    try {
        for (MessageExt msg : msgs) {
            process(msg);
        }
        // 消费成功后提交
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    } catch (Exception e) {
        // 失败不提交,触发重试
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
});

// 3. 批量提交
int count = 0;
for (MessageExt msg : msgs) {
    process(msg);
    count++;
    if (count % 100 == 0) {
        context.setAckIndex(count - 1);  // 批量确认
    }
}

四、Broker 优化

4.1 JVM 优化

# runbroker.sh 配置

# 堆内存(根据服务器配置调整)
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g"

# G1 GC
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC"
JAVA_OPT="${JAVA_OPT} -XX:MaxGCPauseMillis=20"
JAVA_OPT="${JAVA_OPT} -XX:InitiatingHeapOccupancyPercent=35"

# 堆外内存(Direct Memory)
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=16g"

# 打印 GC 日志
JAVA_OPT="${JAVA_OPT} -Xlog:gc*:file=logs/gc.log:time,level,tags"

4.2 存储优化

# store.properties 配置

# CommitLog 配置
flushDiskType=ASYNC_FLUSH              # 异步刷盘
flushCommitLogLeastPages=4             # 最少 4 页刷盘
flushCommitLogThoroughInterval=200     # 200ms 强制刷盘

# 文件配置
fileReservedTime=72                    # 文件保留 72 小时
deleteWhen=02                          # 凌晨 2 点删除
maxMessageSize=4194304                 # 最大 4MB

# 索引配置
maxHashSlotNum=5000000                 # 最大哈希槽数
maxIndexNum=30000000                   # 最大索引数

4.3 网络优化

# broker.conf 配置

# 网络线程
listenPort=10911
fastTransactionTopicMaxCachedNum=2048

# Socket 配置
serverSocketSndBufSize=65536
serverSocketRcvBufSize=65536

# 心跳配置
sendHeartbeatTimeoutMillis=1000
clientChannelMaxIdleTimeSeconds=120

4.4 副本优化

# 主从配置
brokerRole=ASYNC_MASTER    # 异步复制(性能优先)
# brokerRole=SYNC_MASTER   # 同步复制(可靠性优先)
brokerRole=SLAVE           # 从节点

flushDiskType=ASYNC_FLUSH  # 异步刷盘
flushSlaveTimeoutMillis=3000  # 同步从节点超时

五、NameServer 优化

5.1 配置优化

# runserver.sh 配置

# 堆内存(NameServer 轻量,不需要太大)
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g"

# GC 配置
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC"
JAVA_OPT="${JAVA_OPT} -XX:MaxGCPauseMillis=50"

# 无 GC 日志(减少 IO)
JAVA_OPT="${JAVA_OPT} -XX:-PrintGC"

5.2 集群部署

# 推荐部署
- 2-3 个 NameServer 节点
- 跨机房部署
- 无状态设计,无需高可用

# 客户端配置
namesrvAddr=ns1:9876;ns2:9876;ns3:9876

六、性能测试

6.1 基准测试工具

# Producer 测试
sh bin/benchmarkproducer.sh \
  -n "localhost:9876" \
  -t "test-topic" \
  -g "test-producer-group" \
  -m 1024 \        # 消息大小 1KB
  -s 1000 \        # 线程数
  -i 10000         # 每秒发送数

# Consumer 测试
sh bin/benchmarkconsumer.sh \
  -n "localhost:9876" \
  -t "test-topic" \
  -g "test-consumer-group"

6.2 测试结果分析

# Producer 测试结果
Send TPS:           95000 messages/sec
Send RT:            2.5 ms (average)
Success Rate:       99.99%

# Consumer 测试结果
Consume TPS:        88000 messages/sec
Consume RT:         3.2 ms (average)
Success Rate:       99.98%

七、调优实战

7.1 高吞吐场景

// Producer 配置(高吞吐)
DefaultMQProducer producer = new DefaultMQProducer("high-throughput-producer");
producer.setNamesrvAddr("localhost:9876");
producer.setVipChannelEnabled(false);  // 关闭 VIP
producer.setRetryTimesWhenSendFailed(0);  // 不重试
producer.setRetryTimesWhenSendAsyncFailed(0);
producer.start();

// 异步发送
producer.send(msg, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {}
    
    @Override
    public void onException(Throwable e) {}
});

// Consumer 配置(高吞吐)
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("high-throughput-consumer");
consumer.setConsumeThreadMin(40);
consumer.setConsumeThreadMax(100);
consumer.setPullBatchSize(64);
consumer.setPullThresholdForQueue(500);

7.2 低延迟场景

// Producer 配置(低延迟)
DefaultMQProducer producer = new DefaultMQProducer("low-latency-producer");
producer.setNamesrvAddr("localhost:9876");
producer.setSendMsgTimeout(3000);  // 3 秒超时
producer.setRetryTimesWhenSendFailed(3);
producer.start();

// 同步发送(保证可靠性)
SendResult result = producer.send(msg);

// Consumer 配置(低延迟)
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("low-latency-consumer");
consumer.setConsumeThreadMin(10);
consumer.setConsumeThreadMax(32);
consumer.setPullBatchSize(16);
consumer.setPullInterval(0);  // 不延迟

7.3 大消息场景

# Broker 配置
maxMessageSize=10485760          # 10MB
transferMsgByHeap=true           # 通过堆传输

# Producer 配置
producer.setMaxMessageSize(10485760);  # 10MB

# Consumer 配置
consumer.setPullBatchSize(10);   # 减少批量

八、常见问题排查

8.1 TPS 下降

原因

解决

# 1. 使用 SSD 磁盘
# 2. 优化 GC 配置
-XX:+UseG1GC
-XX:MaxGCPauseMillis=20

# 3. 增加网络带宽
# 或减少消息大小

8.2 延迟过高

原因

解决

# 改为异步刷盘
flushDiskType=ASYNC_FLUSH

# 改为异步复制
brokerRole=ASYNC_MASTER

# 优化消费者
consumer.setConsumeThreadMax(64);

8.3 消息堆积

原因

解决

// 1. 增加消费者数量
// 2. 增加消费线程
consumer.setConsumeThreadMax(100);

// 3. 优化处理逻辑
// 异步处理、批量处理

// 4. 增加 Queue 数量
// 需要重新创建 Topic

九、监控告警

9.1 关键指标

指标告警阈值说明
consumeDelay> 10000消费延迟
brokerPutTPS< 1000写入 TPS 过低
brokerGetTPS< 1000消费 TPS 过低
sendFailed> 1%发送失败率
consumeFailed> 1%消费失败率

9.2 Prometheus 监控

# prometheus.yml
scrape_configs:
  - job_name: 'rocketmq-broker'
    static_configs:
      - targets: ['broker-1:5557', 'broker-2:5557']
  
  - job_name: 'rocketmq-namesrv'
    static_configs:
      - targets: ['namesrv-1:9878']

# alerting_rules.yml
groups:
  - name: rocketmq
    rules:
      - alert: RocketMQConsumerLag
        expr: rocketmq_consumer_lag > 10000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "消费滞后"
      
      - alert: RocketMQSendFailed
        expr: rate(rocketmq_send_failed[5m]) > 0.01
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "发送失败率过高"

十、最佳实践

10.1 配置建议

场景刷盘方式复制方式消费线程
日志收集异步异步40-64
一般业务异步异步20-32
金融交易同步同步10-20
订单处理异步异步20-32

10.2 运维建议

  1. 定期清理:删除过期消息,释放磁盘空间
  2. 监控告警:配置关键指标告警
  3. 容量规划:根据业务增长提前扩容
  4. 故障演练:定期进行故障切换演练

总结

RocketMQ 性能调优的核心要点:

  1. Producer 优化:异步发送、批量发送、消息压缩
  2. Consumer 优化:并发消费、拉取配置、位移提交
  3. Broker 优化:JVM、存储配置、网络优化
  4. 性能测试:基准测试、结果分析
  5. 监控告警:关键指标、告警规则

核心要点

参考资料


分享这篇文章到:

上一篇文章
RocketMQ 故障演练与应急预案实战
下一篇文章
AI 应用生产环境问题排查指南