Skip to content
清晨的一缕阳光
返回

RocketMQ 运维脚本与工具集

本文汇总了 RocketMQ 常用的运维脚本和工具,包括部署、监控、备份、故障处理等实用脚本,帮助提升运维效率。

一、部署脚本

1.1 一键部署脚本

#!/bin/bash
# RocketMQ 一键部署脚本

set -e

ROCKETMQ_VERSION="5.1.0"
ROCKETMQ_HOME="/opt/rocketmq"
JAVA_HOME="/usr/lib/jvm/java-8-openjdk"

echo "=== RocketMQ 一键部署 ==="
echo "版本:$ROCKETMQ_VERSION"
echo "安装目录:$ROCKETMQ_HOME"

# 1. 检查 Java
if ! command -v java &> /dev/null; then
    echo "ERROR: Java 未安装"
    exit 1
fi

java_version=$(java -version 2>&1 | awk -F '"' '/version/ {print $2}')
echo "Java 版本:$java_version"

# 2. 下载 RocketMQ
cd /tmp
wget https://archive.apache.org/dist/rocketmq/$ROCKETMQ_VERSION/rocketmq-all-$ROCKETMQ_VERSION-bin-release.zip
unzip rocketmq-all-$ROCKETMQ_VERSION-bin-release.zip
mv rocketmq-$ROCKETMQ_VERSION $ROCKETMQ_HOME

# 3. 创建数据目录
mkdir -p /data/rocketmq/store
mkdir -p /data/rocketmq/logs
chown -R rocketmq:rocketmq /data/rocketmq

# 4. 配置环境变量
cat >> /etc/profile.d/rocketmq.sh << EOF
export ROCKETMQ_HOME=$ROCKETMQ_HOME
export PATH=\$PATH:\$ROCKETMQ_HOME/bin
EOF

source /etc/profile.d/rocketmq.sh

# 5. 创建 systemd 服务
cat > /etc/systemd/system/rocketmq-namesrv.service << EOF
[Unit]
Description=RocketMQ NameServer
After=network.target

[Service]
Type=forking
User=rocketmq
Group=rocketmq
Environment="JAVA_HOME=$JAVA_HOME"
Environment="ROCKETMQ_HOME=$ROCKETMQ_HOME"
ExecStart=$ROCKETMQ_HOME/bin/mqnamesrv -d
ExecStop=$ROCKETMQ_HOME/bin/mqshutdown namesrv
Restart=on-failure

[Install]
WantedBy=multi-user.target
EOF

cat > /etc/systemd/system/rocketmq-broker.service << EOF
[Unit]
Description=RocketMQ Broker
After=network.target rocketmq-namesrv.service

[Service]
Type=forking
User=rocketmq
Group=rocketmq
Environment="JAVA_HOME=$JAVA_HOME"
Environment="ROCKETMQ_HOME=$ROCKETMQ_HOME"
ExecStart=$ROCKETMQ_HOME/bin/mqbroker -c $ROCKETMQ_HOME/conf/broker.conf -d
ExecStop=$ROCKETMQ_HOME/bin/mqshutdown broker
Restart=on-failure

[Install]
WantedBy=multi-user.target
EOF

# 6. 启动服务
systemctl daemon-reload
systemctl enable rocketmq-namesrv
systemctl enable rocketmq-broker
systemctl start rocketmq-namesrv
systemctl start rocketmq-broker

# 7. 验证
sleep 10
mqadmin clusterList -n localhost:9876

echo "=== 部署完成 ==="
echo "NameServer: localhost:9876"
echo "Broker: localhost:10911"
echo "Dashboard: http://localhost:8080"

1.2 集群部署脚本

#!/bin/bash
# RocketMQ 集群部署脚本

BROKERS=("broker-1" "broker-2" "broker-3")
NAMESRVS=("ns1" "ns2")

echo "=== RocketMQ 集群部署 ==="

# 1. 部署 NameServer
for ns in "${NAMESRVS[@]}"; do
    echo "部署 NameServer: $ns"
    ssh $ns "bash /tmp/install-namesrv.sh"
done

# 2. 部署 Broker
for i in "${!BROKERS[@]}"; do
    broker=${BROKERS[$i]}
    broker_id=$((i + 1))
    
    echo "部署 Broker: $broker (ID: $broker_id)"
    
    # 创建 Broker 配置
    cat > /tmp/broker-$broker.conf << EOF
brokerClusterName=DefaultCluster
brokerName=$broker
brokerId=$broker_id
namesrvAddr=ns1:9876;ns2:9876

brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH

storePathRootDir=/data/rocketmq/store
storePathCommitLog=/data/rocketmq/store/commitlog
EOF
    
    scp /tmp/broker-$broker.conf $broker:/opt/rocketmq/conf/broker.conf
    ssh $broker "bash /tmp/install-broker.sh"
done

# 3. 验证集群
echo "验证集群..."
mqadmin clusterList -n ns1:9876

echo "=== 集群部署完成 ==="

二、监控脚本

2.1 健康检查脚本

#!/bin/bash
# RocketMQ 健康检查脚本

NAMESRV="ns1:9876;ns2:9876"
ALERT_EMAIL="ops@example.com"

check_health() {
    local status=0
    
    # 1. 检查 NameServer
    echo "=== 检查 NameServer ==="
    for ns in $(echo $NAMESRV | tr ';' ' '); do
        host=${ns%:*}
        port=${ns#*:}
        
        if ! nc -z $host $port &>/dev/null; then
            echo "❌ NameServer $ns 无法连接"
            status=1
        else
            echo "✅ NameServer $ns 正常"
        fi
    done
    
    # 2. 检查 Broker
    echo -e "\n=== 检查 Broker ==="
    broker_list=$(mqadmin clusterList -n $NAMESRV 2>/dev/null)
    
    if [ -z "$broker_list" ]; then
        echo "❌ 无法获取 Broker 列表"
        status=1
    else
        echo "✅ Broker 列表获取成功"
        echo "$broker_list"
    fi
    
    # 3. 检查磁盘使用率
    echo -e "\n=== 检查磁盘使用率 ==="
    disk_ratio=$(mqadmin brokerStatus -n $NAMESRV -b broker-1:10911 2>/dev/null | \
        grep "commitLogDiskRatio" | awk -F: '{print $2}' | tr -d ' ')
    
    if [ -n "$disk_ratio" ]; then
        if [ "${disk_ratio%.*}" -gt 80 ]; then
            echo "❌ CommitLog 磁盘使用率过高:$disk_ratio%"
            status=1
        else
            echo "✅ CommitLog 磁盘使用率:$disk_ratio%"
        fi
    fi
    
    # 4. 检查消费堆积
    echo -e "\n=== 检查消费堆积 ==="
    total_lag=$(mqadmin consumerProgress -n $NAMESRV 2>/dev/null | \
        awk 'NR>3 {sum+=$5} END {print sum}')
    
    if [ -n "$total_lag" ] && [ "$total_lag" -gt 10000 ]; then
        echo "❌ 消费堆积严重:$total_lag"
        status=1
    else
        echo "✅ 消费堆积:$total_lag"
    fi
    
    # 5. 发送告警
    if [ $status -ne 0 ]; then
        echo -e "\n❌ 健康检查失败"
        send_alert "RocketMQ 健康检查失败"
    else
        echo -e "\n✅ 健康检查通过"
    fi
    
    return $status
}

send_alert() {
    local message=$1
    echo "发送告警:$message"
    # 发送邮件
    # echo "$message" | mail -s "RocketMQ Alert" $ALERT_EMAIL
    
    # 发送钉钉
    # curl -X POST https://oapi.dingtalk.com/robot/send \
    #   -H "Content-Type: application/json" \
    #   -d "{\"msgtype\":\"text\",\"text\":{\"content\":\"$message\"}}"
}

# 主程序
check_health
exit $?

2.2 性能监控脚本

#!/bin/bash
# RocketMQ 性能监控脚本

NAMESRV="ns1:9876"
OUTPUT_DIR="/var/log/rocketmq/metrics"
INTERVAL=60

mkdir -p $OUTPUT_DIR

while true; do
    timestamp=$(date +%Y%m%d_%H%M%S)
    output_file="$OUTPUT_DIR/metrics_$timestamp.csv"
    
    echo "timestamp,broker,put_tps,get_tps,put_latency,consumer_lag" > $output_file
    
    # 获取 Broker 指标
    for broker in broker-1 broker-2 broker-3; do
        # TPS
        put_tps=$(mqadmin brokerStatus -n $NAMESRV -b $broker:10911 2>/dev/null | \
            grep "putTps" | awk '{print $2}')
        get_tps=$(mqadmin brokerStatus -n $NAMESRV -b $broker:10911 2>/dev/null | \
            grep "getTps" | awk '{print $2}')
        
        # 延迟
        put_latency=$(mqadmin brokerStatus -n $NAMESRV -b $broker:10911 2>/dev/null | \
            grep "putMessageAverageLatency" | awk '{print $2}')
        
        # 消费堆积
        consumer_lag=$(mqadmin consumerProgress -n $NAMESRV 2>/dev/null | \
            awk 'NR>3 {sum+=$5} END {print sum}')
        
        echo "$(date +%s),$broker,$put_tps,$get_tps,$put_latency,$consumer_lag" >> $output_file
    done
    
    echo "[$(date)] 性能指标已记录:$output_file"
    
    sleep $INTERVAL
done

三、备份脚本

3.1 元数据备份脚本

#!/bin/bash
# RocketMQ 元数据备份脚本

NAMESRV="ns1:9876;ns2:9876"
BACKUP_DIR="/backup/rocketmq"
RETENTION_DAYS=30

mkdir -p $BACKUP_DIR

date_str=$(date +%Y%m%d_%H%M%S)

echo "=== RocketMQ 元数据备份 ==="
echo "备份目录:$BACKUP_DIR"

# 1. 备份 Topic 配置
echo "备份 Topic 配置..."
mqadmin updateTopic -n $NAMESRV -t all -c DefaultCluster > \
    $BACKUP_DIR/topics_$date_str.txt 2>/dev/null

# 2. 备份消费组配置
echo "备份消费组配置..."
mqadmin consumerProgress -n $NAMESRV > \
    $BACKUP_DIR/consumer-groups_$date_str.txt 2>/dev/null

# 3. 备份 Broker 配置
echo "备份 Broker 配置..."
for broker in broker-1 broker-2 broker-3; do
    mqadmin brokerStatus -n $NAMESRV -b $broker:10911 > \
        $BACKUP_DIR/broker_${broker}_$date_str.txt 2>/dev/null
done

# 4. 备份 ACL 配置
echo "备份 ACL 配置..."
if [ -f /opt/rocketmq/conf/plain_acl.yml ]; then
    cp /opt/rocketmq/conf/plain_acl.yml \
        $BACKUP_DIR/plain_acl_$date_str.yml
fi

# 5. 清理旧备份
echo "清理 $RETENTION_DAYS 天前的备份..."
find $BACKUP_DIR -name "*.txt" -mtime +$RETENTION_DAYS -delete
find $BACKUP_DIR -name "*.yml" -mtime +$RETENTION_DAYS -delete

echo "=== 备份完成 ==="
ls -lh $BACKUP_DIR/*_$date_str.*

3.2 消息备份脚本

#!/bin/bash
# RocketMQ 消息备份脚本

NAMESRV="ns1:9876"
TOPIC="order-topic"
BACKUP_DIR="/backup/rocketmq/messages"
OFFSET_FILE="/tmp/backup_offset.txt"

mkdir -p $BACKUP_DIR

# 读取上次备份的偏移量
if [ -f $OFFSET_FILE ]; then
    last_offset=$(cat $OFFSET_FILE)
else
    last_offset=0
fi

echo "=== 消息备份 ==="
echo "Topic: $TOPIC"
echo "起始偏移量:$last_offset"

# 查询当前最大偏移量
max_offset=$(mqadmin queryMsgByOffset -n $NAMESRV -t $TOPIC -o 999999999 2>/dev/null | \
    grep "queueId" | wc -l)

echo "最大偏移量:$max_offset"

# 备份消息
if [ $max_offset -gt $last_offset ]; then
    echo "备份消息:$last_offset -> $max_offset"
    
    for offset in $(seq $last_offset $max_offset); do
        mqadmin queryMsgByOffset -n $NAMESRV -t $TOPIC -o $offset >> \
            $BACKUP_DIR/messages_$TOPIC_$(date +%Y%m%d).txt 2>/dev/null
    done
    
    # 更新偏移量
    echo $max_offset > $OFFSET_FILE
    
    echo "备份完成"
else
    echo "无新消息需要备份"
fi

四、故障处理脚本

4.1 消息堆积处理脚本

#!/bin/bash
# 消息堆积应急处理脚本

NAMESRV="ns1:9876"
TOPIC=$1
GROUP=$2

if [ -z "$TOPIC" ] || [ -z "$GROUP" ]; then
    echo "用法:$0 <topic> <group>"
    exit 1
fi

echo "=== 消息堆积处理 ==="
echo "Topic: $TOPIC"
echo "Group: $GROUP"

# 1. 查看堆积情况
echo -e "\n=== 当前堆积情况 ==="
mqadmin consumerProgress -n $NAMESRV -g $GROUP

# 2. 查看消费者状态
echo -e "\n=== 消费者状态 ==="
mqadmin consumerStatus -n $NAMESRV -g $GROUP

# 3. 临时增加消费者
echo -e "\n=== 启动临时消费者 ==="
nohup java -jar /opt/consumer.jar \
    --namesrv=$NAMESRV \
    --topic=$TOPIC \
    --group=$GROUP-temp \
    --threads=20 > /var/log/consumer-temp.log 2>&1 &

echo "临时消费者已启动,PID: $!"

# 4. 监控堆积变化
echo -e "\n=== 监控堆积变化(每 10 秒) ==="
for i in {1..6}; do
    sleep 10
    lag=$(mqadmin consumerProgress -n $NAMESRV -g $GROUP 2>/dev/null | \
        awk 'NR>3 {sum+=$5} END {print sum}')
    echo "[$(date +%H:%M:%S)] 堆积量:$lag"
done

# 5. 提供重置偏移量选项
echo -e "\n=== 是否重置偏移量? ==="
read -p "警告:这将跳过未消费的消息!(y/n): " confirm

if [ "$confirm" = "y" ]; then
    echo "重置偏移量到当前时间..."
    mqadmin resetOffsetByTime -n $NAMESRV -t $TOPIC -g $GROUP -s $(date +%s)000 -f true
    echo "偏移量已重置"
fi

echo "=== 处理完成 ==="

4.2 Broker 故障切换脚本

#!/bin/bash
# Broker 故障切换脚本

NAMESRV="ns1:9876"
FAILEDBROKER=$1
NEWBROKER=$2

if [ -z "$FAILEDBROKER" ] || [ -z "$NEWBROKER" ]; then
    echo "用法:$0 <failed-broker> <new-broker>"
    exit 1
fi

echo "=== Broker 故障切换 ==="
echo "故障 Broker: $FAILEDBROKER"
echo "新 Broker: $NEWBROKER"

# 1. 停止故障 Broker
echo -e "\n=== 停止故障 Broker ==="
ssh $FAILEDBROKER "systemctl stop rocketmq-broker"
echo "故障 Broker 已停止"

# 2. 启动新 Broker
echo -e "\n=== 启动新 Broker ==="
ssh $NEWBROKER "systemctl start rocketmq-broker"
sleep 10
echo "新 Broker 已启动"

# 3. 验证状态
echo -e "\n=== 验证 Broker 状态 ==="
mqadmin brokerStatus -n $NAMESRV -b $NEWBROKER:10911

# 4. 更新监控
echo -e "\n=== 更新监控配置 ==="
# 更新 Prometheus 配置
# 更新 Grafana 仪表盘

# 5. 发送告警
echo -e "\n=== 发送告警 ==="
send_alert "Broker 故障切换完成:$FAILEDBROKER -> $NEWBROKER"

echo "=== 切换完成 ==="

五、运维工具

5.1 消息查询工具

#!/bin/bash
# RocketMQ 消息查询工具

NAMESRV="ns1:9876"

show_menu() {
    echo "=== RocketMQ 消息查询工具 ==="
    echo "1. 按 MessageId 查询"
    echo "2. 按 Offset 查询"
    echo "3. 按 Key 查询"
    echo "4. 按时间查询"
    echo "5. 查看 Topic 详情"
    echo "0. 退出"
    echo ""
}

query_by_msgid() {
    read -p "请输入 MessageId: " msgid
    mqadmin queryMsgById -n $NAMESRV -i $msgid
}

query_by_offset() {
    read -p "请输入 Topic: " topic
    read -p "请输入 QueueId: " queueId
    read -p "请输入 Offset: " offset
    mqadmin queryMsgByOffset -n $NAMESRV -t $topic -q $queueId -o $offset
}

query_by_key() {
    read -p "请输入 Topic: " topic
    read -p "请输入 Key: " key
    mqadmin queryMsgByKey -n $NAMESRV -t $topic -k $key
}

query_by_time() {
    read -p "请输入 Topic: " topic
    read -p "请输入时间 (yyyy-MM-dd HH:mm:ss): " time
    timestamp=$(date -d "$time" +%s)000
    mqadmin queryMsgByOffset -n $NAMESRV -t $topic -o $timestamp
}

show_topic() {
    read -p "请输入 Topic: " topic
    mqadmin updateTopic -n $NAMESRV -t $topic
}

# 主程序
while true; do
    show_menu
    read -p "请选择操作:" choice
    
    case $choice in
        1) query_by_msgid ;;
        2) query_by_offset ;;
        3) query_by_key ;;
        4) query_by_time ;;
        5) show_topic ;;
        0) exit 0 ;;
        *) echo "无效选择" ;;
    esac
    
    echo ""
    read -p "按回车继续..."
done

5.2 批量创建 Topic 脚本

#!/bin/bash
# 批量创建 Topic 脚本

NAMESRV="ns1:9876"
CLUSTER="DefaultCluster"

# Topic 列表
declare -a topics=(
    "order-topic:8:8"
    "pay-topic:8:8"
    "user-topic:4:4"
    "log-topic:16:16"
)

echo "=== 批量创建 Topic ==="

for topic_info in "${topics[@]}"; do
    IFS=':' read -r topic readQueue writeQueue <<< "$topic_info"
    
    echo "创建 Topic: $topic (R:$readQueue, W:$writeQueue)"
    
    mqadmin updateTopic -n $NAMESRV \
        -t $topic \
        -c $CLUSTER \
        -p $readQueue \
        -r $writeQueue \
        -o true  # 允许自动创建
    
    if [ $? -eq 0 ]; then
        echo "$topic 创建成功"
    else
        echo "$topic 创建失败"
    fi
done

echo -e "\n=== 验证创建结果 ==="
mqadmin updateTopic -n $NAMESRV | grep -E "order-topic|pay-topic|user-topic|log-topic"

echo "=== 批量创建完成 ==="

六、最佳实践

6.1 脚本使用规范

1. 执行前备份
   - 重要操作前先备份
   
2. 测试环境验证
   - 先在测试环境测试
   
3. 记录操作日志
   - 所有操作记录日志
   
4. 权限控制
   - 限制脚本执行权限

6.2 定时任务配置

# crontab 配置

# 每小时健康检查
0 * * * * /opt/rocketmq/scripts/health-check.sh >> /var/log/rocketmq/health-check.log 2>&1

# 每天备份元数据
0 2 * * * /opt/rocketmq/scripts/backup-metadata.sh >> /var/log/rocketmq/backup.log 2>&1

# 每 5 分钟监控性能
*/5 * * * * /opt/rocketmq/scripts/monitor-performance.sh >> /var/log/rocketmq/monitor.log 2>&1

# 每周清理旧日志
0 3 * * 0 find /var/log/rocketmq -name "*.log" -mtime +7 -delete

总结

RocketMQ 运维脚本的核心要点:

  1. 部署脚本:一键部署、集群部署
  2. 监控脚本:健康检查、性能监控
  3. 备份脚本:元数据备份、消息备份
  4. 故障处理:消息堆积、Broker 切换
  5. 运维工具:消息查询、批量操作

核心要点

参考资料


分享这篇文章到:

上一篇文章
RocketMQ 消费者组与订阅关系详解
下一篇文章
Kafka Connect 高级应用与自定义开发