RocketMQ 容量规划是保障集群稳定运行的关键。本文将深入探讨容量评估、性能基准、扩容方案、性能优化等实战技巧。
一、容量评估
1.1 容量指标
| 指标 | 说明 | 计算公式 |
|---|---|---|
| TPS | 每秒消息数 | 消息总量 / 时间 |
| 带宽 | 网络带宽占用 | TPS × 消息大小 |
| 存储 | 磁盘存储需求 | TPS × 消息大小 × 保留时间 |
| 内存 | JVM 堆内存 | 连接数 × 缓存大小 |
1.2 容量评估模型
示例场景:
业务需求:
- 日均消息量:1 亿条
- 平均消息大小:1KB
- 峰值 TPS:5000
- 消息保留:7 天
容量计算:
- 日均带宽:1 亿 × 1KB / 86400 = 1.16MB/s = 9.3Mbps
- 峰值带宽:5000 × 1KB = 5MB/s = 40Mbps
- 存储需求:1 亿 × 1KB × 7 天 = 700GB
- 考虑副本(2 副本):700GB × 2 = 1.4TB
1.3 容量评估工具
#!/bin/bash
# 容量评估脚本
# 输入参数
AVG_MSG_SIZE=${1:-1024} # 平均消息大小(字节)
DAILY_MSG_COUNT=${2:-100000000} # 日均消息量
PEAK_TPS=${3:-5000} # 峰值 TPS
RETENTION_DAYS=${4:-7} # 保留天数
REPLICATION_FACTOR=${5:-2} # 副本数
echo "=== RocketMQ 容量评估 ==="
echo "平均消息大小:$AVG_MSG_SIZE 字节"
echo "日均消息量:$DAILY_MSG_COUNT 条"
echo "峰值 TPS: $PEAK_TPS"
echo "保留时间:$RETENTION_DAYS 天"
echo "副本数:$REPLICATION_FACTOR"
# 计算带宽
DAILY_BANDWIDTH=$((DAILY_MSG_COUNT * AVG_MSG_SIZE / 86400))
PEAK_BANDWIDTH=$((PEAK_TPS * AVG_MSG_SIZE))
echo -e "\n=== 带宽需求 ==="
echo "日均带宽:$DAILY_BANDWIDTH B/s = $(echo "scale=2; $DAILY_BANDWIDTH * 8 / 1000000" | bc) Mbps"
echo "峰值带宽:$PEAK_BANDWIDTH B/s = $(echo "scale=2; $PEAK_BANDWIDTH * 8 / 1000000" | bc) Mbps"
# 计算存储
STORAGE=$((DAILY_MSG_COUNT * AVG_MSG_SIZE * RETENTION_DAYS / 1024 / 1024 / 1024))
TOTAL_STORAGE=$((STORAGE * REPLICATION_FACTOR))
echo -e "\n=== 存储需求 ==="
echo "单副本存储:$STORAGE GB"
echo "总存储($REPLICATION_FACTOR 副本):$TOTAL_STORAGE GB"
# 推荐配置
echo -e "\n=== 推荐配置 ==="
if [ $PEAK_TPS -lt 1000 ]; then
echo "Broker 数量:2"
echo "每 Broker CPU: 4 核"
echo "每 Broker 内存:8GB"
elif [ $PEAK_TPS -lt 5000 ]; then
echo "Broker 数量:3"
echo "每 Broker CPU: 8 核"
echo "每 Broker 内存:16GB"
else
echo "Broker 数量:5+"
echo "每 Broker CPU: 16 核"
echo "每 Broker 内存:32GB"
fi
二、性能基准
2.1 基准测试
测试脚本:
#!/bin/bash
# 性能基准测试脚本
NAMESRV="ns1:9876"
TOPIC="benchmark-topic"
DURATION=60 # 测试时长(秒)
echo "=== RocketMQ 性能基准测试 ==="
# 1. 创建测试 Topic
mqadmin updateTopic -n $NAMESRV \
-c DefaultCluster \
-t $TOPIC \
-p 8 -r 8
# 2. Producer 测试
echo -e "\n=== Producer 测试 ==="
sh $ROCKETMQ_HOME/bin/benchmarkproducer.sh \
-n $NAMESRV \
-t $TOPIC \
-g benchmark-producer \
-m 1024 \
-s $DURATION
# 3. Consumer 测试
echo -e "\n=== Consumer 测试 ==="
sh $ROCKETMQ_HOME/bin/benchmarkconsumer.sh \
-n $NAMESRV \
-t $TOPIC \
-g benchmark-consumer \
-s $DURATION
# 4. 清理
mqadmin deleteTopic -n $NAMESRV -t $TOPIC -c DefaultCluster
echo -e "\n=== 测试完成 ==="
2.2 性能指标
典型性能数据:
单 Broker 性能(8 核 16GB):
- 生产 TPS:3 万 -5 万
- 消费 TPS:5 万 -8 万
- 平均延迟:< 10ms
- P99 延迟:< 50ms
集群性能(3 Broker):
- 生产 TPS:10 万 -15 万
- 消费 TPS:15 万 -24 万
2.3 性能监控
#!/bin/bash
# 性能监控脚本
echo "=== RocketMQ 性能监控 ==="
# 1. Broker TPS
echo -e "\n=== Broker TPS ==="
for broker in broker-1 broker-2 broker-3; do
put_tps=$(mqadmin brokerStatus -n ns1:9876 -b $broker:10911 2>/dev/null | \
grep "putTps" | awk '{print $2}')
get_tps=$(mqadmin brokerStatus -n ns1:9876 -b $broker:10911 2>/dev/null | \
grep "getTps" | awk '{print $2}')
echo "$broker: Put=$put_tps, Get=$get_tps"
done
# 2. 延迟统计
echo -e "\n=== 延迟统计 ==="
for broker in broker-1 broker-2 broker-3; do
avg_latency=$(mqadmin brokerStatus -n ns1:9876 -b $broker:10911 2>/dev/null | \
grep "putMessageAverageLatency" | awk '{print $2}')
echo "$broker: 平均延迟=${avg_latency}ms"
done
# 3. 堆积情况
echo -e "\n=== 消费堆积 ==="
mqadmin consumerProgress -n ns1:9876 | awk 'NR>3 {sum+=$5} END {print "总堆积量:" sum}'
三、扩容方案
3.1 水平扩容
添加 Broker:
#!/bin/bash
# 水平扩容脚本
NEW_BROKER="broker-4"
CLUSTER="DefaultCluster"
echo "=== 水平扩容 ==="
# 1. 准备新服务器
echo "准备新服务器..."
ssh $NEW_BROKER "yum install -y java-11-openjdk"
ssh $NEW_BROKER "wget https://archive.apache.org/dist/rocketmq/5.0.0/rocketmq-all-5.0.0-bin-release.zip"
ssh $NEW_BROKER "unzip rocketmq-all-5.0.0-bin-release.zip"
ssh $NEW_BROKER "mv rocketmq-5.0.0 /opt/rocketmq"
# 2. 配置 Broker
echo "配置 Broker..."
cat > /tmp/broker.conf << EOF
brokerClusterName=$CLUSTER
brokerName=$NEW_BROKER
brokerId=0
namesrvAddr=ns1:9876;ns2:9876
listenPort=10911
storePathRootDir=/data/rocketmq/store
storePathCommitLog=/data/rocketmq/store/commitlog
EOF
scp /tmp/broker.conf $NEW_BROKER:/opt/rocketmq/conf/
# 3. 启动 Broker
echo "启动 Broker..."
ssh $NEW_BROKER "nohup /opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/broker.conf &"
# 4. 验证
echo "验证 Broker..."
sleep 30
mqadmin brokerStatus -n ns1:9876 -b $NEW_BROKER:10911
echo "=== 扩容完成 ==="
3.2 分区扩容
增加 Queue 数量:
#!/bin/bash
# 分区扩容脚本
NAMESRV="ns1:9876"
TOPIC="order-topic"
OLD_QUEUE=8
NEW_QUEUE=16
echo "=== 分区扩容 ==="
echo "Topic: $TOPIC"
echo "Queue: $OLD_QUEUE -> $NEW_QUEUE"
# 1. 查看当前配置
echo -e "\n当前配置:"
mqadmin updateTopic -n $NAMESRV -t $TOPIC
# 2. 更新 Queue 数量
echo -e "\n更新 Queue 数量..."
mqadmin updateTopic -n $NAMESRV \
-c DefaultCluster \
-t $TOPIC \
-p $NEW_QUEUE -r $NEW_QUEUE
# 3. 验证
echo -e "\n验证配置:"
mqadmin updateTopic -n $NAMESRV -t $TOPIC
echo "=== 扩容完成 ==="
3.3 扩容验证
#!/bin/bash
# 扩容验证脚本
echo "=== 扩容验证 ==="
# 1. 检查 Broker 数量
broker_count=$(mqadmin clusterList -n ns1:9876 | grep -c "broker-id")
echo "Broker 数量:$broker_count"
# 2. 检查分区分布
echo -e "\n分区分布:"
mqadmin updateTopic -n ns1:9876 -t order-topic | grep -o "Broker Name:.*" | sort | uniq -c
# 3. 检查数据平衡
echo -e "\n数据平衡:"
for broker in broker-1 broker-2 broker-3 broker-4; do
size=$(ssh $broker "du -sh /data/rocketmq/store 2>/dev/null | awk '{print \$1}'")
echo "$broker: $size"
done
# 4. 性能测试
echo -e "\n性能测试:"
sh $ROCKETMQ_HOME/bin/benchmarkproducer.sh \
-n ns1:9876 \
-t order-topic \
-g test-producer \
-m 1024 \
-s 60
echo "=== 验证完成 ==="
四、性能优化
4.1 Broker 优化
JVM 优化:
# runbroker.sh
# 堆内存
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g"
# G1 GC
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC"
JAVA_OPT="${JAVA_OPT} -XX:MaxGCPauseMillis=20"
JAVA_OPT="${JAVA_OPT} -XX:InitiatingHeapOccupancyPercent=35"
# 堆外内存
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=16g"
# GC 日志
JAVA_OPT="${JAVA_OPT} -Xlog:gc*:file=/var/log/rocketmq/gc.log:time,level,tags"
存储优化:
# broker.conf
# 异步刷盘
flushDiskType=ASYNC_FLUSH
flushCommitLogLeastPages=4
flushCommitLogThoroughInterval=200
# 提交配置
commitCommitLogLeastPages=4
commitCommitLogThoroughInterval=200
# 日志配置
logFlushInterval=1000
logFlushStartThreshold=0.95
4.2 网络优化
# broker.conf
# 网络线程
listenPort=10911
serverSocketSndBufSize=65536
serverSocketRcvBufSize=65536
# 心跳配置
sendHeartbeatTimeoutMillis=1000
clientChannelMaxIdleTimeSeconds=120
4.3 客户端优化
Producer 优化:
DefaultMQProducer producer = new DefaultMQProducer("producer-group");
producer.setNamesrvAddr("ns1:9876");
// 发送超时
producer.setSendMsgTimeout(5000);
// 重试次数
producer.setRetryTimesWhenSendFailed(3);
producer.setRetryTimesWhenSendAsyncFailed(3);
// 批量发送
// 应用层实现批量
producer.start();
Consumer 优化:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
consumer.setNamesrvAddr("ns1:9876");
// 消费线程
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(64);
// 拉取配置
consumer.setPullBatchSize(64);
consumer.setPullThresholdForQueue(200);
consumer.setPullThresholdSizeForQueue(100);
consumer.start();
五、最佳实践
5.1 容量规划建议
容量规划:
1. 预留 30% 容量余量
2. 考虑业务增长因素
3. 定期评估容量使用
4. 建立容量预警机制
5.2 性能优化建议
性能优化:
1. 使用 SSD 磁盘
2. 配置异步刷盘
3. 优化 JVM 参数
4. 合理设置队列数量
5. 批量发送/消费
5.3 检查清单
容量检查:
- [ ] 评估业务需求
- [ ] 计算存储需求
- [ ] 计算带宽需求
- [ ] 规划 Broker 数量
- [ ] 预留容量余量
性能检查:
- [ ] 基准测试
- [ ] JVM 优化
- [ ] 存储优化
- [ ] 网络优化
- [ ] 客户端优化
总结
RocketMQ 容量规划与性能优化的核心要点:
- 容量评估:容量指标、评估模型、评估工具
- 性能基准:基准测试、性能指标、性能监控
- 扩容方案:水平扩容、分区扩容、扩容验证
- 性能优化:Broker 优化、网络优化、客户端优化
- 最佳实践:容量规划、性能优化、检查清单
核心要点:
- 建立容量评估模型
- 定期性能基准测试
- 预留充足容量余量
- 持续性能优化
- 建立监控预警
参考资料
- RocketMQ 运维官方文档
- RocketMQ 性能最佳实践
- 《RocketMQ 技术内幕》第 11 章