Skip to content
清晨的一缕阳光
返回

RocketMQ 容量规划与性能优化实战

RocketMQ 容量规划是保障集群稳定运行的关键。本文将深入探讨容量评估、性能基准、扩容方案、性能优化等实战技巧。

一、容量评估

1.1 容量指标

指标说明计算公式
TPS每秒消息数消息总量 / 时间
带宽网络带宽占用TPS × 消息大小
存储磁盘存储需求TPS × 消息大小 × 保留时间
内存JVM 堆内存连接数 × 缓存大小

1.2 容量评估模型

示例场景

业务需求:
- 日均消息量:1 亿条
- 平均消息大小:1KB
- 峰值 TPS:5000
- 消息保留:7 天

容量计算:
- 日均带宽:1 亿 × 1KB / 86400 = 1.16MB/s = 9.3Mbps
- 峰值带宽:5000 × 1KB = 5MB/s = 40Mbps
- 存储需求:1 亿 × 1KB × 7 天 = 700GB
- 考虑副本(2 副本):700GB × 2 = 1.4TB

1.3 容量评估工具

#!/bin/bash
# 容量评估脚本

# 输入参数
AVG_MSG_SIZE=${1:-1024}  # 平均消息大小(字节)
DAILY_MSG_COUNT=${2:-100000000}  # 日均消息量
PEAK_TPS=${3:-5000}  # 峰值 TPS
RETENTION_DAYS=${4:-7}  # 保留天数
REPLICATION_FACTOR=${5:-2}  # 副本数

echo "=== RocketMQ 容量评估 ==="
echo "平均消息大小:$AVG_MSG_SIZE 字节"
echo "日均消息量:$DAILY_MSG_COUNT"
echo "峰值 TPS: $PEAK_TPS"
echo "保留时间:$RETENTION_DAYS"
echo "副本数:$REPLICATION_FACTOR"

# 计算带宽
DAILY_BANDWIDTH=$((DAILY_MSG_COUNT * AVG_MSG_SIZE / 86400))
PEAK_BANDWIDTH=$((PEAK_TPS * AVG_MSG_SIZE))

echo -e "\n=== 带宽需求 ==="
echo "日均带宽:$DAILY_BANDWIDTH B/s = $(echo "scale=2; $DAILY_BANDWIDTH * 8 / 1000000" | bc) Mbps"
echo "峰值带宽:$PEAK_BANDWIDTH B/s = $(echo "scale=2; $PEAK_BANDWIDTH * 8 / 1000000" | bc) Mbps"

# 计算存储
STORAGE=$((DAILY_MSG_COUNT * AVG_MSG_SIZE * RETENTION_DAYS / 1024 / 1024 / 1024))
TOTAL_STORAGE=$((STORAGE * REPLICATION_FACTOR))

echo -e "\n=== 存储需求 ==="
echo "单副本存储:$STORAGE GB"
echo "总存储($REPLICATION_FACTOR 副本):$TOTAL_STORAGE GB"

# 推荐配置
echo -e "\n=== 推荐配置 ==="
if [ $PEAK_TPS -lt 1000 ]; then
    echo "Broker 数量:2"
    echo "每 Broker CPU: 4 核"
    echo "每 Broker 内存:8GB"
elif [ $PEAK_TPS -lt 5000 ]; then
    echo "Broker 数量:3"
    echo "每 Broker CPU: 8 核"
    echo "每 Broker 内存:16GB"
else
    echo "Broker 数量:5+"
    echo "每 Broker CPU: 16 核"
    echo "每 Broker 内存:32GB"
fi

二、性能基准

2.1 基准测试

测试脚本

#!/bin/bash
# 性能基准测试脚本

NAMESRV="ns1:9876"
TOPIC="benchmark-topic"
DURATION=60  # 测试时长(秒)

echo "=== RocketMQ 性能基准测试 ==="

# 1. 创建测试 Topic
mqadmin updateTopic -n $NAMESRV \
  -c DefaultCluster \
  -t $TOPIC \
  -p 8 -r 8

# 2. Producer 测试
echo -e "\n=== Producer 测试 ==="
sh $ROCKETMQ_HOME/bin/benchmarkproducer.sh \
  -n $NAMESRV \
  -t $TOPIC \
  -g benchmark-producer \
  -m 1024 \
  -s $DURATION

# 3. Consumer 测试
echo -e "\n=== Consumer 测试 ==="
sh $ROCKETMQ_HOME/bin/benchmarkconsumer.sh \
  -n $NAMESRV \
  -t $TOPIC \
  -g benchmark-consumer \
  -s $DURATION

# 4. 清理
mqadmin deleteTopic -n $NAMESRV -t $TOPIC -c DefaultCluster

echo -e "\n=== 测试完成 ==="

2.2 性能指标

典型性能数据

单 Broker 性能(8 核 16GB):
- 生产 TPS:3 万 -5 万
- 消费 TPS:5 万 -8 万
- 平均延迟:< 10ms
- P99 延迟:< 50ms

集群性能(3 Broker):
- 生产 TPS:10 万 -15 万
- 消费 TPS:15 万 -24 万

2.3 性能监控

#!/bin/bash
# 性能监控脚本

echo "=== RocketMQ 性能监控 ==="

# 1. Broker TPS
echo -e "\n=== Broker TPS ==="
for broker in broker-1 broker-2 broker-3; do
    put_tps=$(mqadmin brokerStatus -n ns1:9876 -b $broker:10911 2>/dev/null | \
        grep "putTps" | awk '{print $2}')
    get_tps=$(mqadmin brokerStatus -n ns1:9876 -b $broker:10911 2>/dev/null | \
        grep "getTps" | awk '{print $2}')
    echo "$broker: Put=$put_tps, Get=$get_tps"
done

# 2. 延迟统计
echo -e "\n=== 延迟统计 ==="
for broker in broker-1 broker-2 broker-3; do
    avg_latency=$(mqadmin brokerStatus -n ns1:9876 -b $broker:10911 2>/dev/null | \
        grep "putMessageAverageLatency" | awk '{print $2}')
    echo "$broker: 平均延迟=${avg_latency}ms"
done

# 3. 堆积情况
echo -e "\n=== 消费堆积 ==="
mqadmin consumerProgress -n ns1:9876 | awk 'NR>3 {sum+=$5} END {print "总堆积量:" sum}'

三、扩容方案

3.1 水平扩容

添加 Broker

#!/bin/bash
# 水平扩容脚本

NEW_BROKER="broker-4"
CLUSTER="DefaultCluster"

echo "=== 水平扩容 ==="

# 1. 准备新服务器
echo "准备新服务器..."
ssh $NEW_BROKER "yum install -y java-11-openjdk"
ssh $NEW_BROKER "wget https://archive.apache.org/dist/rocketmq/5.0.0/rocketmq-all-5.0.0-bin-release.zip"
ssh $NEW_BROKER "unzip rocketmq-all-5.0.0-bin-release.zip"
ssh $NEW_BROKER "mv rocketmq-5.0.0 /opt/rocketmq"

# 2. 配置 Broker
echo "配置 Broker..."
cat > /tmp/broker.conf << EOF
brokerClusterName=$CLUSTER
brokerName=$NEW_BROKER
brokerId=0
namesrvAddr=ns1:9876;ns2:9876
listenPort=10911
storePathRootDir=/data/rocketmq/store
storePathCommitLog=/data/rocketmq/store/commitlog
EOF
scp /tmp/broker.conf $NEW_BROKER:/opt/rocketmq/conf/

# 3. 启动 Broker
echo "启动 Broker..."
ssh $NEW_BROKER "nohup /opt/rocketmq/bin/mqbroker -c /opt/rocketmq/conf/broker.conf &"

# 4. 验证
echo "验证 Broker..."
sleep 30
mqadmin brokerStatus -n ns1:9876 -b $NEW_BROKER:10911

echo "=== 扩容完成 ==="

3.2 分区扩容

增加 Queue 数量

#!/bin/bash
# 分区扩容脚本

NAMESRV="ns1:9876"
TOPIC="order-topic"
OLD_QUEUE=8
NEW_QUEUE=16

echo "=== 分区扩容 ==="
echo "Topic: $TOPIC"
echo "Queue: $OLD_QUEUE -> $NEW_QUEUE"

# 1. 查看当前配置
echo -e "\n当前配置:"
mqadmin updateTopic -n $NAMESRV -t $TOPIC

# 2. 更新 Queue 数量
echo -e "\n更新 Queue 数量..."
mqadmin updateTopic -n $NAMESRV \
  -c DefaultCluster \
  -t $TOPIC \
  -p $NEW_QUEUE -r $NEW_QUEUE

# 3. 验证
echo -e "\n验证配置:"
mqadmin updateTopic -n $NAMESRV -t $TOPIC

echo "=== 扩容完成 ==="

3.3 扩容验证

#!/bin/bash
# 扩容验证脚本

echo "=== 扩容验证 ==="

# 1. 检查 Broker 数量
broker_count=$(mqadmin clusterList -n ns1:9876 | grep -c "broker-id")
echo "Broker 数量:$broker_count"

# 2. 检查分区分布
echo -e "\n分区分布:"
mqadmin updateTopic -n ns1:9876 -t order-topic | grep -o "Broker Name:.*" | sort | uniq -c

# 3. 检查数据平衡
echo -e "\n数据平衡:"
for broker in broker-1 broker-2 broker-3 broker-4; do
    size=$(ssh $broker "du -sh /data/rocketmq/store 2>/dev/null | awk '{print \$1}'")
    echo "$broker: $size"
done

# 4. 性能测试
echo -e "\n性能测试:"
sh $ROCKETMQ_HOME/bin/benchmarkproducer.sh \
  -n ns1:9876 \
  -t order-topic \
  -g test-producer \
  -m 1024 \
  -s 60

echo "=== 验证完成 ==="

四、性能优化

4.1 Broker 优化

JVM 优化

# runbroker.sh

# 堆内存
JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g"

# G1 GC
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC"
JAVA_OPT="${JAVA_OPT} -XX:MaxGCPauseMillis=20"
JAVA_OPT="${JAVA_OPT} -XX:InitiatingHeapOccupancyPercent=35"

# 堆外内存
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=16g"

# GC 日志
JAVA_OPT="${JAVA_OPT} -Xlog:gc*:file=/var/log/rocketmq/gc.log:time,level,tags"

存储优化

# broker.conf

# 异步刷盘
flushDiskType=ASYNC_FLUSH
flushCommitLogLeastPages=4
flushCommitLogThoroughInterval=200

# 提交配置
commitCommitLogLeastPages=4
commitCommitLogThoroughInterval=200

# 日志配置
logFlushInterval=1000
logFlushStartThreshold=0.95

4.2 网络优化

# broker.conf

# 网络线程
listenPort=10911
serverSocketSndBufSize=65536
serverSocketRcvBufSize=65536

# 心跳配置
sendHeartbeatTimeoutMillis=1000
clientChannelMaxIdleTimeSeconds=120

4.3 客户端优化

Producer 优化

DefaultMQProducer producer = new DefaultMQProducer("producer-group");
producer.setNamesrvAddr("ns1:9876");

// 发送超时
producer.setSendMsgTimeout(5000);

// 重试次数
producer.setRetryTimesWhenSendFailed(3);
producer.setRetryTimesWhenSendAsyncFailed(3);

// 批量发送
// 应用层实现批量

producer.start();

Consumer 优化

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
consumer.setNamesrvAddr("ns1:9876");

// 消费线程
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(64);

// 拉取配置
consumer.setPullBatchSize(64);
consumer.setPullThresholdForQueue(200);
consumer.setPullThresholdSizeForQueue(100);

consumer.start();

五、最佳实践

5.1 容量规划建议

容量规划:
1. 预留 30% 容量余量
2. 考虑业务增长因素
3. 定期评估容量使用
4. 建立容量预警机制

5.2 性能优化建议

性能优化:
1. 使用 SSD 磁盘
2. 配置异步刷盘
3. 优化 JVM 参数
4. 合理设置队列数量
5. 批量发送/消费

5.3 检查清单

容量检查:
- [ ] 评估业务需求
- [ ] 计算存储需求
- [ ] 计算带宽需求
- [ ] 规划 Broker 数量
- [ ] 预留容量余量

性能检查:
- [ ] 基准测试
- [ ] JVM 优化
- [ ] 存储优化
- [ ] 网络优化
- [ ] 客户端优化

总结

RocketMQ 容量规划与性能优化的核心要点:

  1. 容量评估:容量指标、评估模型、评估工具
  2. 性能基准:基准测试、性能指标、性能监控
  3. 扩容方案:水平扩容、分区扩容、扩容验证
  4. 性能优化:Broker 优化、网络优化、客户端优化
  5. 最佳实践:容量规划、性能优化、检查清单

核心要点

参考资料


分享这篇文章到:

上一篇文章
Kafka 架构设计与核心概念
下一篇文章
Agent 记忆系统设计实战