RocketMQ 性能调优是保障消息系统高吞吐、低延迟的关键。本文将从 Producer、Consumer、Broker 三个维度,深入探讨 RocketMQ 性能调优的实战技巧。
一、性能指标
1.1 核心指标
| 指标 | 说明 | 目标值 |
|---|---|---|
| TPS | 每秒处理消息数 | > 10 万 TPS |
| 延迟 | 消息发送/消费延迟 | < 10ms |
| 可用性 | 系统可用时间比例 | > 99.99% |
| 消息堆积 | 未消费消息数量 | < 1000 |
1.2 监控指标
graph TB
subgraph Producer 指标
P1[sendRT]
P2[sendTPS]
P3[sendFailed]
end
subgraph Broker 指标
B1[putTPS]
B2[getTPS]
B3[dispatchBehind]
end
subgraph Consumer 指标
C1[consumeRT]
C2[consumeTPS]
C3[consumeFailed]
end
P1 --> B1
B2 --> C1
C3 --> P3
二、Producer 优化
2.1 发送方式优化
// 1. 异步发送(推荐)
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("发送成功:msgId={}", sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
log.error("发送失败", e);
}
});
// 2. 单向发送(日志场景)
producer.sendOneway(msg);
// 3. 批量发送
List<Message> messages = Arrays.asList(
new Message("topic", "body1".getBytes()),
new Message("topic", "body2".getBytes()),
new Message("topic", "body3".getBytes())
);
producer.send(messages);
性能对比:
| 发送方式 | TPS | 延迟 | 可靠性 |
|---|---|---|---|
| 同步发送 | 1 万 | 5ms | 高 |
| 异步发送 | 5 万 | 2ms | 中 |
| 单向发送 | 10 万 | 1ms | 低 |
| 批量发送 | 8 万 | 3ms | 中 |
2.2 配置优化
Properties props = new Properties();
// 1. 增加超时时间
props.put("sendMsgTimeout", "5000"); // 5 秒
// 2. 调整重试次数
props.put("retryTimesWhenSendFailed", "3");
props.put("retryTimesWhenSendAsyncFailed", "3");
// 3. 关闭 VIP 通道(避免网络问题)
producer.setVipChannelEnabled(false);
// 4. 调整批次大小(内部配置)
// sendMessageThreadPoolNums=4
2.3 消息设计优化
// 1. 消息体大小控制
// 推荐:< 4KB
// 最大:4MB(默认)
// 2. 消息压缩
public Message createCompressedMessage(String topic, String body) {
byte[] compressed = compress(body); // GZIP/LZ4
Message msg = new Message(topic, compressed);
msg.putUserProperty("compress", "true");
return msg;
}
// 3. 合理设计 Tag 和 Key
// Tag:用于过滤,不超过 20 个
// Key:用于查询,保持唯一性
msg.setTags("order-create");
msg.setKeys("order_123");
三、Consumer 优化
3.1 消费模式优化
// 1. 并发消费(推荐)
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
process(msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 2. 批量消费
consumer.registerMessageListener((msgs, context) -> {
// 批量处理
List<String> batch = new ArrayList<>();
for (MessageExt msg : msgs) {
batch.add(new String(msg.getBody()));
}
processBatch(batch);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 3. 顺序消费(必要时)
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
for (MessageExt msg : msgs) {
process(msg);
}
return ConsumeOrderlyStatus.SUCCESS;
});
3.2 并发配置
// 消费线程配置
consumer.setConsumeThreadMin(20); // 最小线程数
consumer.setConsumeThreadMax(64); // 最大线程数
// 拉取配置
consumer.setPullBatchSize(32); // 每次拉取 32 条
consumer.setPullInterval(0); // 不延迟拉取
// 流控配置
consumer.setPullThresholdForQueue(200); // 队列消息数阈值
consumer.setPullThresholdSizeForQueue(100); // 队列大小阈值 (MB)
3.3 位移提交优化
// 1. 自动提交(默认)
consumer.setAutoCommit(true);
consumer.setAutoCommitInterval(5000); // 5 秒
// 2. 手动提交
consumer.registerMessageListener((msgs, context) -> {
try {
for (MessageExt msg : msgs) {
process(msg);
}
// 消费成功后提交
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 失败不提交,触发重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
// 3. 批量提交
int count = 0;
for (MessageExt msg : msgs) {
process(msg);
count++;
if (count % 100 == 0) {
context.setAckIndex(count - 1); // 批量确认
}
}
四、Broker 优化
4.1 JVM 优化
# runbroker.sh 配置
# 堆内存(根据服务器配置调整)
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g"
# G1 GC
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC"
JAVA_OPT="${JAVA_OPT} -XX:MaxGCPauseMillis=20"
JAVA_OPT="${JAVA_OPT} -XX:InitiatingHeapOccupancyPercent=35"
# 堆外内存(Direct Memory)
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=16g"
# 打印 GC 日志
JAVA_OPT="${JAVA_OPT} -Xlog:gc*:file=logs/gc.log:time,level,tags"
4.2 存储优化
# store.properties 配置
# CommitLog 配置
flushDiskType=ASYNC_FLUSH # 异步刷盘
flushCommitLogLeastPages=4 # 最少 4 页刷盘
flushCommitLogThoroughInterval=200 # 200ms 强制刷盘
# 文件配置
fileReservedTime=72 # 文件保留 72 小时
deleteWhen=02 # 凌晨 2 点删除
maxMessageSize=4194304 # 最大 4MB
# 索引配置
maxHashSlotNum=5000000 # 最大哈希槽数
maxIndexNum=30000000 # 最大索引数
4.3 网络优化
# broker.conf 配置
# 网络线程
listenPort=10911
fastTransactionTopicMaxCachedNum=2048
# Socket 配置
serverSocketSndBufSize=65536
serverSocketRcvBufSize=65536
# 心跳配置
sendHeartbeatTimeoutMillis=1000
clientChannelMaxIdleTimeSeconds=120
4.4 副本优化
# 主从配置
brokerRole=ASYNC_MASTER # 异步复制(性能优先)
# brokerRole=SYNC_MASTER # 同步复制(可靠性优先)
brokerRole=SLAVE # 从节点
flushDiskType=ASYNC_FLUSH # 异步刷盘
flushSlaveTimeoutMillis=3000 # 同步从节点超时
五、NameServer 优化
5.1 配置优化
# runserver.sh 配置
# 堆内存(NameServer 轻量,不需要太大)
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g"
# GC 配置
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC"
JAVA_OPT="${JAVA_OPT} -XX:MaxGCPauseMillis=50"
# 无 GC 日志(减少 IO)
JAVA_OPT="${JAVA_OPT} -XX:-PrintGC"
5.2 集群部署
# 推荐部署
- 2-3 个 NameServer 节点
- 跨机房部署
- 无状态设计,无需高可用
# 客户端配置
namesrvAddr=ns1:9876;ns2:9876;ns3:9876
六、性能测试
6.1 基准测试工具
# Producer 测试
sh bin/benchmarkproducer.sh \
-n "localhost:9876" \
-t "test-topic" \
-g "test-producer-group" \
-m 1024 \ # 消息大小 1KB
-s 1000 \ # 线程数
-i 10000 # 每秒发送数
# Consumer 测试
sh bin/benchmarkconsumer.sh \
-n "localhost:9876" \
-t "test-topic" \
-g "test-consumer-group"
6.2 测试结果分析
# Producer 测试结果
Send TPS: 95000 messages/sec
Send RT: 2.5 ms (average)
Success Rate: 99.99%
# Consumer 测试结果
Consume TPS: 88000 messages/sec
Consume RT: 3.2 ms (average)
Success Rate: 99.98%
七、调优实战
7.1 高吞吐场景
// Producer 配置(高吞吐)
DefaultMQProducer producer = new DefaultMQProducer("high-throughput-producer");
producer.setNamesrvAddr("localhost:9876");
producer.setVipChannelEnabled(false); // 关闭 VIP
producer.setRetryTimesWhenSendFailed(0); // 不重试
producer.setRetryTimesWhenSendAsyncFailed(0);
producer.start();
// 异步发送
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {}
@Override
public void onException(Throwable e) {}
});
// Consumer 配置(高吞吐)
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("high-throughput-consumer");
consumer.setConsumeThreadMin(40);
consumer.setConsumeThreadMax(100);
consumer.setPullBatchSize(64);
consumer.setPullThresholdForQueue(500);
7.2 低延迟场景
// Producer 配置(低延迟)
DefaultMQProducer producer = new DefaultMQProducer("low-latency-producer");
producer.setNamesrvAddr("localhost:9876");
producer.setSendMsgTimeout(3000); // 3 秒超时
producer.setRetryTimesWhenSendFailed(3);
producer.start();
// 同步发送(保证可靠性)
SendResult result = producer.send(msg);
// Consumer 配置(低延迟)
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("low-latency-consumer");
consumer.setConsumeThreadMin(10);
consumer.setConsumeThreadMax(32);
consumer.setPullBatchSize(16);
consumer.setPullInterval(0); // 不延迟
7.3 大消息场景
# Broker 配置
maxMessageSize=10485760 # 10MB
transferMsgByHeap=true # 通过堆传输
# Producer 配置
producer.setMaxMessageSize(10485760); # 10MB
# Consumer 配置
consumer.setPullBatchSize(10); # 减少批量
八、常见问题排查
8.1 TPS 下降
原因:
- 磁盘 IO 瓶颈
- GC 停顿
- 网络带宽不足
解决:
# 1. 使用 SSD 磁盘
# 2. 优化 GC 配置
-XX:+UseG1GC
-XX:MaxGCPauseMillis=20
# 3. 增加网络带宽
# 或减少消息大小
8.2 延迟过高
原因:
- 同步刷盘
- 同步复制
- 消费者处理慢
解决:
# 改为异步刷盘
flushDiskType=ASYNC_FLUSH
# 改为异步复制
brokerRole=ASYNC_MASTER
# 优化消费者
consumer.setConsumeThreadMax(64);
8.3 消息堆积
原因:
- 消费者处理慢
- 消费者故障
- Topic 分区不足
解决:
// 1. 增加消费者数量
// 2. 增加消费线程
consumer.setConsumeThreadMax(100);
// 3. 优化处理逻辑
// 异步处理、批量处理
// 4. 增加 Queue 数量
// 需要重新创建 Topic
九、监控告警
9.1 关键指标
| 指标 | 告警阈值 | 说明 |
|---|---|---|
consumeDelay | > 10000 | 消费延迟 |
brokerPutTPS | < 1000 | 写入 TPS 过低 |
brokerGetTPS | < 1000 | 消费 TPS 过低 |
sendFailed | > 1% | 发送失败率 |
consumeFailed | > 1% | 消费失败率 |
9.2 Prometheus 监控
# prometheus.yml
scrape_configs:
- job_name: 'rocketmq-broker'
static_configs:
- targets: ['broker-1:5557', 'broker-2:5557']
- job_name: 'rocketmq-namesrv'
static_configs:
- targets: ['namesrv-1:9878']
# alerting_rules.yml
groups:
- name: rocketmq
rules:
- alert: RocketMQConsumerLag
expr: rocketmq_consumer_lag > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "消费滞后"
- alert: RocketMQSendFailed
expr: rate(rocketmq_send_failed[5m]) > 0.01
for: 5m
labels:
severity: critical
annotations:
summary: "发送失败率过高"
十、最佳实践
10.1 配置建议
| 场景 | 刷盘方式 | 复制方式 | 消费线程 |
|---|---|---|---|
| 日志收集 | 异步 | 异步 | 40-64 |
| 一般业务 | 异步 | 异步 | 20-32 |
| 金融交易 | 同步 | 同步 | 10-20 |
| 订单处理 | 异步 | 异步 | 20-32 |
10.2 运维建议
- 定期清理:删除过期消息,释放磁盘空间
- 监控告警:配置关键指标告警
- 容量规划:根据业务增长提前扩容
- 故障演练:定期进行故障切换演练
总结
RocketMQ 性能调优的核心要点:
- Producer 优化:异步发送、批量发送、消息压缩
- Consumer 优化:并发消费、拉取配置、位移提交
- Broker 优化:JVM、存储配置、网络优化
- 性能测试:基准测试、结果分析
- 监控告警:关键指标、告警规则
核心要点:
- 根据场景选择配置(高吞吐 vs 低延迟)
- 异步发送和消费是性能关键
- 合理配置 JVM 和存储参数
- 持续监控和调优
参考资料
- RocketMQ 性能调优官方文档
- RocketMQ 源码
- 《RocketMQ 技术内幕》第 10 章