本文汇总了 RocketMQ 常用的运维脚本和工具,包括部署、监控、备份、故障处理等实用脚本,帮助提升运维效率。
一、部署脚本
1.1 一键部署脚本
#!/bin/bash
# RocketMQ 一键部署脚本
set -e
ROCKETMQ_VERSION="5.1.0"
ROCKETMQ_HOME="/opt/rocketmq"
JAVA_HOME="/usr/lib/jvm/java-8-openjdk"
echo "=== RocketMQ 一键部署 ==="
echo "版本:$ROCKETMQ_VERSION"
echo "安装目录:$ROCKETMQ_HOME"
# 1. 检查 Java
if ! command -v java &> /dev/null; then
echo "ERROR: Java 未安装"
exit 1
fi
java_version=$(java -version 2>&1 | awk -F '"' '/version/ {print $2}')
echo "Java 版本:$java_version"
# 2. 下载 RocketMQ
cd /tmp
wget https://archive.apache.org/dist/rocketmq/$ROCKETMQ_VERSION/rocketmq-all-$ROCKETMQ_VERSION-bin-release.zip
unzip rocketmq-all-$ROCKETMQ_VERSION-bin-release.zip
mv rocketmq-$ROCKETMQ_VERSION $ROCKETMQ_HOME
# 3. 创建数据目录
mkdir -p /data/rocketmq/store
mkdir -p /data/rocketmq/logs
chown -R rocketmq:rocketmq /data/rocketmq
# 4. 配置环境变量
cat >> /etc/profile.d/rocketmq.sh << EOF
export ROCKETMQ_HOME=$ROCKETMQ_HOME
export PATH=\$PATH:\$ROCKETMQ_HOME/bin
EOF
source /etc/profile.d/rocketmq.sh
# 5. 创建 systemd 服务
cat > /etc/systemd/system/rocketmq-namesrv.service << EOF
[Unit]
Description=RocketMQ NameServer
After=network.target
[Service]
Type=forking
User=rocketmq
Group=rocketmq
Environment="JAVA_HOME=$JAVA_HOME"
Environment="ROCKETMQ_HOME=$ROCKETMQ_HOME"
ExecStart=$ROCKETMQ_HOME/bin/mqnamesrv -d
ExecStop=$ROCKETMQ_HOME/bin/mqshutdown namesrv
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF
cat > /etc/systemd/system/rocketmq-broker.service << EOF
[Unit]
Description=RocketMQ Broker
After=network.target rocketmq-namesrv.service
[Service]
Type=forking
User=rocketmq
Group=rocketmq
Environment="JAVA_HOME=$JAVA_HOME"
Environment="ROCKETMQ_HOME=$ROCKETMQ_HOME"
ExecStart=$ROCKETMQ_HOME/bin/mqbroker -c $ROCKETMQ_HOME/conf/broker.conf -d
ExecStop=$ROCKETMQ_HOME/bin/mqshutdown broker
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF
# 6. 启动服务
systemctl daemon-reload
systemctl enable rocketmq-namesrv
systemctl enable rocketmq-broker
systemctl start rocketmq-namesrv
systemctl start rocketmq-broker
# 7. 验证
sleep 10
mqadmin clusterList -n localhost:9876
echo "=== 部署完成 ==="
echo "NameServer: localhost:9876"
echo "Broker: localhost:10911"
echo "Dashboard: http://localhost:8080"
1.2 集群部署脚本
#!/bin/bash
# RocketMQ 集群部署脚本
BROKERS=("broker-1" "broker-2" "broker-3")
NAMESRVS=("ns1" "ns2")
echo "=== RocketMQ 集群部署 ==="
# 1. 部署 NameServer
for ns in "${NAMESRVS[@]}"; do
echo "部署 NameServer: $ns"
ssh $ns "bash /tmp/install-namesrv.sh"
done
# 2. 部署 Broker
for i in "${!BROKERS[@]}"; do
broker=${BROKERS[$i]}
broker_id=$((i + 1))
echo "部署 Broker: $broker (ID: $broker_id)"
# 创建 Broker 配置
cat > /tmp/broker-$broker.conf << EOF
brokerClusterName=DefaultCluster
brokerName=$broker
brokerId=$broker_id
namesrvAddr=ns1:9876;ns2:9876
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH
storePathRootDir=/data/rocketmq/store
storePathCommitLog=/data/rocketmq/store/commitlog
EOF
scp /tmp/broker-$broker.conf $broker:/opt/rocketmq/conf/broker.conf
ssh $broker "bash /tmp/install-broker.sh"
done
# 3. 验证集群
echo "验证集群..."
mqadmin clusterList -n ns1:9876
echo "=== 集群部署完成 ==="
二、监控脚本
2.1 健康检查脚本
#!/bin/bash
# RocketMQ 健康检查脚本
NAMESRV="ns1:9876;ns2:9876"
ALERT_EMAIL="ops@example.com"
check_health() {
local status=0
# 1. 检查 NameServer
echo "=== 检查 NameServer ==="
for ns in $(echo $NAMESRV | tr ';' ' '); do
host=${ns%:*}
port=${ns#*:}
if ! nc -z $host $port &>/dev/null; then
echo "❌ NameServer $ns 无法连接"
status=1
else
echo "✅ NameServer $ns 正常"
fi
done
# 2. 检查 Broker
echo -e "\n=== 检查 Broker ==="
broker_list=$(mqadmin clusterList -n $NAMESRV 2>/dev/null)
if [ -z "$broker_list" ]; then
echo "❌ 无法获取 Broker 列表"
status=1
else
echo "✅ Broker 列表获取成功"
echo "$broker_list"
fi
# 3. 检查磁盘使用率
echo -e "\n=== 检查磁盘使用率 ==="
disk_ratio=$(mqadmin brokerStatus -n $NAMESRV -b broker-1:10911 2>/dev/null | \
grep "commitLogDiskRatio" | awk -F: '{print $2}' | tr -d ' ')
if [ -n "$disk_ratio" ]; then
if [ "${disk_ratio%.*}" -gt 80 ]; then
echo "❌ CommitLog 磁盘使用率过高:$disk_ratio%"
status=1
else
echo "✅ CommitLog 磁盘使用率:$disk_ratio%"
fi
fi
# 4. 检查消费堆积
echo -e "\n=== 检查消费堆积 ==="
total_lag=$(mqadmin consumerProgress -n $NAMESRV 2>/dev/null | \
awk 'NR>3 {sum+=$5} END {print sum}')
if [ -n "$total_lag" ] && [ "$total_lag" -gt 10000 ]; then
echo "❌ 消费堆积严重:$total_lag"
status=1
else
echo "✅ 消费堆积:$total_lag"
fi
# 5. 发送告警
if [ $status -ne 0 ]; then
echo -e "\n❌ 健康检查失败"
send_alert "RocketMQ 健康检查失败"
else
echo -e "\n✅ 健康检查通过"
fi
return $status
}
send_alert() {
local message=$1
echo "发送告警:$message"
# 发送邮件
# echo "$message" | mail -s "RocketMQ Alert" $ALERT_EMAIL
# 发送钉钉
# curl -X POST https://oapi.dingtalk.com/robot/send \
# -H "Content-Type: application/json" \
# -d "{\"msgtype\":\"text\",\"text\":{\"content\":\"$message\"}}"
}
# 主程序
check_health
exit $?
2.2 性能监控脚本
#!/bin/bash
# RocketMQ 性能监控脚本
NAMESRV="ns1:9876"
OUTPUT_DIR="/var/log/rocketmq/metrics"
INTERVAL=60
mkdir -p $OUTPUT_DIR
while true; do
timestamp=$(date +%Y%m%d_%H%M%S)
output_file="$OUTPUT_DIR/metrics_$timestamp.csv"
echo "timestamp,broker,put_tps,get_tps,put_latency,consumer_lag" > $output_file
# 获取 Broker 指标
for broker in broker-1 broker-2 broker-3; do
# TPS
put_tps=$(mqadmin brokerStatus -n $NAMESRV -b $broker:10911 2>/dev/null | \
grep "putTps" | awk '{print $2}')
get_tps=$(mqadmin brokerStatus -n $NAMESRV -b $broker:10911 2>/dev/null | \
grep "getTps" | awk '{print $2}')
# 延迟
put_latency=$(mqadmin brokerStatus -n $NAMESRV -b $broker:10911 2>/dev/null | \
grep "putMessageAverageLatency" | awk '{print $2}')
# 消费堆积
consumer_lag=$(mqadmin consumerProgress -n $NAMESRV 2>/dev/null | \
awk 'NR>3 {sum+=$5} END {print sum}')
echo "$(date +%s),$broker,$put_tps,$get_tps,$put_latency,$consumer_lag" >> $output_file
done
echo "[$(date)] 性能指标已记录:$output_file"
sleep $INTERVAL
done
三、备份脚本
3.1 元数据备份脚本
#!/bin/bash
# RocketMQ 元数据备份脚本
NAMESRV="ns1:9876;ns2:9876"
BACKUP_DIR="/backup/rocketmq"
RETENTION_DAYS=30
mkdir -p $BACKUP_DIR
date_str=$(date +%Y%m%d_%H%M%S)
echo "=== RocketMQ 元数据备份 ==="
echo "备份目录:$BACKUP_DIR"
# 1. 备份 Topic 配置
echo "备份 Topic 配置..."
mqadmin updateTopic -n $NAMESRV -t all -c DefaultCluster > \
$BACKUP_DIR/topics_$date_str.txt 2>/dev/null
# 2. 备份消费组配置
echo "备份消费组配置..."
mqadmin consumerProgress -n $NAMESRV > \
$BACKUP_DIR/consumer-groups_$date_str.txt 2>/dev/null
# 3. 备份 Broker 配置
echo "备份 Broker 配置..."
for broker in broker-1 broker-2 broker-3; do
mqadmin brokerStatus -n $NAMESRV -b $broker:10911 > \
$BACKUP_DIR/broker_${broker}_$date_str.txt 2>/dev/null
done
# 4. 备份 ACL 配置
echo "备份 ACL 配置..."
if [ -f /opt/rocketmq/conf/plain_acl.yml ]; then
cp /opt/rocketmq/conf/plain_acl.yml \
$BACKUP_DIR/plain_acl_$date_str.yml
fi
# 5. 清理旧备份
echo "清理 $RETENTION_DAYS 天前的备份..."
find $BACKUP_DIR -name "*.txt" -mtime +$RETENTION_DAYS -delete
find $BACKUP_DIR -name "*.yml" -mtime +$RETENTION_DAYS -delete
echo "=== 备份完成 ==="
ls -lh $BACKUP_DIR/*_$date_str.*
3.2 消息备份脚本
#!/bin/bash
# RocketMQ 消息备份脚本
NAMESRV="ns1:9876"
TOPIC="order-topic"
BACKUP_DIR="/backup/rocketmq/messages"
OFFSET_FILE="/tmp/backup_offset.txt"
mkdir -p $BACKUP_DIR
# 读取上次备份的偏移量
if [ -f $OFFSET_FILE ]; then
last_offset=$(cat $OFFSET_FILE)
else
last_offset=0
fi
echo "=== 消息备份 ==="
echo "Topic: $TOPIC"
echo "起始偏移量:$last_offset"
# 查询当前最大偏移量
max_offset=$(mqadmin queryMsgByOffset -n $NAMESRV -t $TOPIC -o 999999999 2>/dev/null | \
grep "queueId" | wc -l)
echo "最大偏移量:$max_offset"
# 备份消息
if [ $max_offset -gt $last_offset ]; then
echo "备份消息:$last_offset -> $max_offset"
for offset in $(seq $last_offset $max_offset); do
mqadmin queryMsgByOffset -n $NAMESRV -t $TOPIC -o $offset >> \
$BACKUP_DIR/messages_$TOPIC_$(date +%Y%m%d).txt 2>/dev/null
done
# 更新偏移量
echo $max_offset > $OFFSET_FILE
echo "备份完成"
else
echo "无新消息需要备份"
fi
四、故障处理脚本
4.1 消息堆积处理脚本
#!/bin/bash
# 消息堆积应急处理脚本
NAMESRV="ns1:9876"
TOPIC=$1
GROUP=$2
if [ -z "$TOPIC" ] || [ -z "$GROUP" ]; then
echo "用法:$0 <topic> <group>"
exit 1
fi
echo "=== 消息堆积处理 ==="
echo "Topic: $TOPIC"
echo "Group: $GROUP"
# 1. 查看堆积情况
echo -e "\n=== 当前堆积情况 ==="
mqadmin consumerProgress -n $NAMESRV -g $GROUP
# 2. 查看消费者状态
echo -e "\n=== 消费者状态 ==="
mqadmin consumerStatus -n $NAMESRV -g $GROUP
# 3. 临时增加消费者
echo -e "\n=== 启动临时消费者 ==="
nohup java -jar /opt/consumer.jar \
--namesrv=$NAMESRV \
--topic=$TOPIC \
--group=$GROUP-temp \
--threads=20 > /var/log/consumer-temp.log 2>&1 &
echo "临时消费者已启动,PID: $!"
# 4. 监控堆积变化
echo -e "\n=== 监控堆积变化(每 10 秒) ==="
for i in {1..6}; do
sleep 10
lag=$(mqadmin consumerProgress -n $NAMESRV -g $GROUP 2>/dev/null | \
awk 'NR>3 {sum+=$5} END {print sum}')
echo "[$(date +%H:%M:%S)] 堆积量:$lag"
done
# 5. 提供重置偏移量选项
echo -e "\n=== 是否重置偏移量? ==="
read -p "警告:这将跳过未消费的消息!(y/n): " confirm
if [ "$confirm" = "y" ]; then
echo "重置偏移量到当前时间..."
mqadmin resetOffsetByTime -n $NAMESRV -t $TOPIC -g $GROUP -s $(date +%s)000 -f true
echo "偏移量已重置"
fi
echo "=== 处理完成 ==="
4.2 Broker 故障切换脚本
#!/bin/bash
# Broker 故障切换脚本
NAMESRV="ns1:9876"
FAILEDBROKER=$1
NEWBROKER=$2
if [ -z "$FAILEDBROKER" ] || [ -z "$NEWBROKER" ]; then
echo "用法:$0 <failed-broker> <new-broker>"
exit 1
fi
echo "=== Broker 故障切换 ==="
echo "故障 Broker: $FAILEDBROKER"
echo "新 Broker: $NEWBROKER"
# 1. 停止故障 Broker
echo -e "\n=== 停止故障 Broker ==="
ssh $FAILEDBROKER "systemctl stop rocketmq-broker"
echo "故障 Broker 已停止"
# 2. 启动新 Broker
echo -e "\n=== 启动新 Broker ==="
ssh $NEWBROKER "systemctl start rocketmq-broker"
sleep 10
echo "新 Broker 已启动"
# 3. 验证状态
echo -e "\n=== 验证 Broker 状态 ==="
mqadmin brokerStatus -n $NAMESRV -b $NEWBROKER:10911
# 4. 更新监控
echo -e "\n=== 更新监控配置 ==="
# 更新 Prometheus 配置
# 更新 Grafana 仪表盘
# 5. 发送告警
echo -e "\n=== 发送告警 ==="
send_alert "Broker 故障切换完成:$FAILEDBROKER -> $NEWBROKER"
echo "=== 切换完成 ==="
五、运维工具
5.1 消息查询工具
#!/bin/bash
# RocketMQ 消息查询工具
NAMESRV="ns1:9876"
show_menu() {
echo "=== RocketMQ 消息查询工具 ==="
echo "1. 按 MessageId 查询"
echo "2. 按 Offset 查询"
echo "3. 按 Key 查询"
echo "4. 按时间查询"
echo "5. 查看 Topic 详情"
echo "0. 退出"
echo ""
}
query_by_msgid() {
read -p "请输入 MessageId: " msgid
mqadmin queryMsgById -n $NAMESRV -i $msgid
}
query_by_offset() {
read -p "请输入 Topic: " topic
read -p "请输入 QueueId: " queueId
read -p "请输入 Offset: " offset
mqadmin queryMsgByOffset -n $NAMESRV -t $topic -q $queueId -o $offset
}
query_by_key() {
read -p "请输入 Topic: " topic
read -p "请输入 Key: " key
mqadmin queryMsgByKey -n $NAMESRV -t $topic -k $key
}
query_by_time() {
read -p "请输入 Topic: " topic
read -p "请输入时间 (yyyy-MM-dd HH:mm:ss): " time
timestamp=$(date -d "$time" +%s)000
mqadmin queryMsgByOffset -n $NAMESRV -t $topic -o $timestamp
}
show_topic() {
read -p "请输入 Topic: " topic
mqadmin updateTopic -n $NAMESRV -t $topic
}
# 主程序
while true; do
show_menu
read -p "请选择操作:" choice
case $choice in
1) query_by_msgid ;;
2) query_by_offset ;;
3) query_by_key ;;
4) query_by_time ;;
5) show_topic ;;
0) exit 0 ;;
*) echo "无效选择" ;;
esac
echo ""
read -p "按回车继续..."
done
5.2 批量创建 Topic 脚本
#!/bin/bash
# 批量创建 Topic 脚本
NAMESRV="ns1:9876"
CLUSTER="DefaultCluster"
# Topic 列表
declare -a topics=(
"order-topic:8:8"
"pay-topic:8:8"
"user-topic:4:4"
"log-topic:16:16"
)
echo "=== 批量创建 Topic ==="
for topic_info in "${topics[@]}"; do
IFS=':' read -r topic readQueue writeQueue <<< "$topic_info"
echo "创建 Topic: $topic (R:$readQueue, W:$writeQueue)"
mqadmin updateTopic -n $NAMESRV \
-t $topic \
-c $CLUSTER \
-p $readQueue \
-r $writeQueue \
-o true # 允许自动创建
if [ $? -eq 0 ]; then
echo "✅ $topic 创建成功"
else
echo "❌ $topic 创建失败"
fi
done
echo -e "\n=== 验证创建结果 ==="
mqadmin updateTopic -n $NAMESRV | grep -E "order-topic|pay-topic|user-topic|log-topic"
echo "=== 批量创建完成 ==="
六、最佳实践
6.1 脚本使用规范
1. 执行前备份
- 重要操作前先备份
2. 测试环境验证
- 先在测试环境测试
3. 记录操作日志
- 所有操作记录日志
4. 权限控制
- 限制脚本执行权限
6.2 定时任务配置
# crontab 配置
# 每小时健康检查
0 * * * * /opt/rocketmq/scripts/health-check.sh >> /var/log/rocketmq/health-check.log 2>&1
# 每天备份元数据
0 2 * * * /opt/rocketmq/scripts/backup-metadata.sh >> /var/log/rocketmq/backup.log 2>&1
# 每 5 分钟监控性能
*/5 * * * * /opt/rocketmq/scripts/monitor-performance.sh >> /var/log/rocketmq/monitor.log 2>&1
# 每周清理旧日志
0 3 * * 0 find /var/log/rocketmq -name "*.log" -mtime +7 -delete
总结
RocketMQ 运维脚本的核心要点:
- 部署脚本:一键部署、集群部署
- 监控脚本:健康检查、性能监控
- 备份脚本:元数据备份、消息备份
- 故障处理:消息堆积、Broker 切换
- 运维工具:消息查询、批量操作
核心要点:
- 脚本化提升运维效率
- 定期备份元数据
- 建立监控告警体系
- 完善故障处理流程
参考资料
- RocketMQ 官方工具
- RocketMQ 运维指南
- 《RocketMQ 运维实战》