Skip to content
清晨的一缕阳光
返回

Kafka 集群迁移与升级实战指南

Kafka 集群迁移和升级是运维工作中的重要场景。本文将深入探讨各种迁移升级方案,包括版本升级、集群扩容、数据迁移等。

一、版本升级

1.1 升级路径

推荐升级路径

0.10.0.x → 0.10.1.x → 0.10.2.x → 0.11.0.x → 1.0.x → 1.1.x → 2.0.x → 2.1.x → 2.2.x → 2.3.x → 2.4.x → 2.5.x → 2.6.x → 2.7.x → 2.8.x → 3.0.x → 3.1.x → 3.2.x → 3.3.x → 3.4.x

注意:
- 每次升级最多跨越一个小版本
- 跨大版本需要逐步升级
- 升级前备份配置和数据

1.2 滚动升级

升级步骤

#!/bin/bash
# Kafka 滚动升级脚本

OLD_VERSION="2.8.0"
NEW_VERSION="3.4.0"
BROKERS=("broker-1" "broker-2" "broker-3")

echo "=== Kafka 滚动升级 ==="
echo "从版本:$OLD_VERSION"
echo "到版本:$NEW_VERSION"

for broker in "${BROKERS[@]}"; do
    echo -e "\n=== 升级 $broker ==="
    
    # 1. 停止 Broker
    echo "停止 Broker..."
    ssh $broker "systemctl stop kafka"
    
    # 2. 备份配置
    echo "备份配置..."
    ssh $broker "cp -r /opt/kafka-$OLD_VERSION/config /opt/kafka-$OLD_VERSION/config.bak"
    
    # 3. 安装新版本
    echo "安装新版本..."
    ssh $broker "wget https://archive.apache.org/dist/kafka/$NEW_VERSION/kafka_2.13-$NEW_VERSION.tgz"
    ssh $broker "tar -xzf kafka_2.13-$NEW_VERSION.tgz -C /opt/"
    ssh $broker "mv /opt/kafka_2.13-$NEW_VERSION /opt/kafka-$NEW_VERSION"
    
    # 4. 更新配置
    echo "更新配置..."
    ssh $broker "cp /opt/kafka-$OLD_VERSION/config/server.properties /opt/kafka-$NEW_VERSION/config/"
    
    # 5. 启动 Broker
    echo "启动 Broker..."
    ssh $broker "systemctl start kafka"
    
    # 6. 验证
    echo "验证 Broker..."
    ssh $broker "kafka-broker-api-versions.sh --bootstrap-server localhost:9092"
    
    # 7. 等待同步
    echo "等待副本同步..."
    sleep 60
done

echo -e "\n=== 升级完成 ==="

1.3 升级配置

inter.broker.protocol.version

# 升级过程中临时配置
inter.broker.protocol.version=2.8.0  # 保持旧版本

# 所有 Broker 升级完成后
inter.broker.protocol.version=3.4.0  # 升级到新版本
log.message.format.version=3.4.0

1.4 升级验证

#!/bin/bash
# 升级验证脚本

echo "=== 验证集群状态 ==="

# 1. 检查 Broker 版本
for broker in broker-1 broker-2 broker-3; do
    version=$(ssh $broker "kafka-topics.sh --version")
    echo "$broker: $version"
done

# 2. 检查 Topic
kafka-topics.sh --bootstrap-server localhost:9092 --describe

# 3. 检查 Consumer Group
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe

# 4. 发送测试消息
echo "test" | kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --max-messages 1 --timeout-ms 5000

echo "=== 验证完成 ==="

二、集群扩容

2.1 添加 Broker

步骤

#!/bin/bash
# 添加新 Broker

NEW_BROKER="broker-4"
CLUSTER="broker-1:9092,broker-2:9092,broker-3:9092"

echo "=== 添加新 Broker ==="

# 1. 准备新服务器
echo "准备新服务器..."
ssh $NEW_BROKER "yum install -y java-11-openjdk"
ssh $NEW_BROKER "wget https://archive.apache.org/dist/kafka/3.4.0/kafka_2.13-3.4.0.tgz"
ssh $NEW_BROKER "tar -xzf kafka_2.13-3.4.0.tgz -C /opt/"
ssh $NEW_BROKER "mv /opt/kafka_2.13-3.4.0 /opt/kafka"

# 2. 配置 Broker
echo "配置 Broker..."
cat > /tmp/server.properties << EOF
broker.id=4
listeners=PLAINTEXT://:9092
log.dirs=/data/kafka-logs
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
EOF
scp /tmp/server.properties $NEW_BROKER:/opt/kafka/config/

# 3. 启动 Broker
echo "启动 Broker..."
ssh $NEW_BROKER "systemctl start kafka"

# 4. 验证
echo "验证 Broker..."
ssh $NEW_BROKER "kafka-broker-api-versions.sh --bootstrap-server localhost:9092"

echo "=== Broker 添加完成 ==="

2.2 分区重分配

生成重分配计划

# 生成重分配计划
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
  --topics-to-move-json-file topics-to-move.json \
  --broker-list "1,2,3,4" \
  --generate > reassign-plan.json

# topics-to-move.json
{
  "topics": [
    {"topic": "order-topic"},
    {"topic": "pay-topic"}
  ]
}

执行重分配

#!/bin/bash
# 执行分区重分配

echo "=== 执行分区重分配 ==="

# 1. 执行重分配
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
  --reassignment-json-file reassign-plan.json \
  --execute

# 2. 监控进度
while true; do
    kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
      --reassignment-json-file reassign-plan.json \
      --verify
    
    if [ $? -eq 0 ]; then
        echo "重分配完成"
        break
    fi
    
    echo "等待重分配..."
    sleep 60
done

echo "=== 重分配完成 ==="

2.3 扩容验证

#!/bin/bash
# 扩容验证脚本

echo "=== 验证扩容结果 ==="

# 1. 检查 Broker 数量
broker_count=$(kafka-broker-api-versions.sh --bootstrap-server localhost:9092 2>/dev/null | wc -l)
echo "Broker 数量:$broker_count"

# 2. 检查分区分布
kafka-topics.sh --bootstrap-server localhost:9092 --describe | \
  grep -o "Replicas: [0-9,]*" | sort | uniq -c

# 3. 检查数据平衡
for broker in broker-1 broker-2 broker-3 broker-4; do
    size=$(ssh $broker "du -sh /data/kafka-logs | awk '{print \$1}'")
    echo "$broker: $size"
done

echo "=== 验证完成 ==="

三、数据迁移

3.1 MirrorMaker 迁移

配置 MirrorMaker

# mm2.properties

# Source Cluster
source.cluster.alias=old-cluster
source.cluster.bootstrap.servers=old-broker-1:9092,old-broker-2:9092

# Target Cluster
target.cluster.alias=new-cluster
target.cluster.bootstrap.servers=new-broker-1:9092,new-broker-2:9092

# Topics
topics=.*
topics.exclude=.*-test

# Replication
replication.factor=3
replication.policy.class=org.apache.kafka.connect.mirror.DefaultReplicationPolicy

# Performance
consumer.fetch.max.bytes=52428800
producer.batch.size=16384
producer.linger.ms=100

启动 MirrorMaker

#!/bin/bash
# 启动 MirrorMaker 迁移

echo "=== 启动数据迁移 ==="

# 启动 MirrorMaker 2.0
connect-distributed.sh mm2.properties &

# 监控迁移进度
while true; do
    # 检查 Source 集群消息量
    source_offset=$(kafka-run-class.sh kafka.tools.GetOffsetShell \
      --broker-list old-broker-1:9092 \
      --topic order-topic | awk -F: '{sum+=$3} END {print sum}')
    
    # 检查 Target 集群消息量
    target_offset=$(kafka-run-class.sh kafka.tools.GetOffsetShell \
      --broker-list new-broker-1:9092 \
      --topic order-topic | awk -F: '{sum+=$3} END {print sum}')
    
    # 计算进度
    if [ $source_offset -gt 0 ]; then
        progress=$((target_offset * 100 / source_offset))
        echo "迁移进度:$progress%"
    fi
    
    if [ $source_offset -eq $target_offset ]; then
        echo "迁移完成"
        break
    fi
    
    sleep 60
done

echo "=== 迁移完成 ==="

3.2 流量切换

切换步骤

#!/bin/bash
# 流量切换脚本

OLD_CLUSTER="old-broker-1:9092,old-broker-2:9092"
NEW_CLUSTER="new-broker-1:9092,new-broker-2:9092"

echo "=== 流量切换 ==="

# 1. 停止 Producer
echo "通知 Producer 停止..."
# 发送信号给 Producer 应用

# 2. 等待消息处理完成
echo "等待消息处理完成..."
sleep 60

# 3. 检查数据一致性
echo "检查数据一致性..."
old_offset=$(kafka-run-class.sh kafka.tools.GetOffsetShell \
  --broker-list $OLD_CLUSTER \
  --topic order-topic | awk -F: '{sum+=$3} END {print sum}')
new_offset=$(kafka-run-class.sh kafka.tools.GetOffsetShell \
  --broker-list $NEW_CLUSTER \
  --topic order-topic | awk -F: '{sum+=$3} END {print sum}')

if [ $old_offset -eq $new_offset ]; then
    echo "数据一致"
else
    echo "数据不一致:old=$old_offset, new=$new_offset"
    exit 1
fi

# 4. 更新 Producer 配置
echo "更新 Producer 配置..."
# 更新应用配置指向新集群

# 5. 启动 Producer
echo "启动 Producer..."
# 重启 Producer 应用

# 6. 验证
echo "验证新集群..."
echo "test" | kafka-console-producer.sh --bootstrap-server $NEW_CLUSTER --topic test-topic
kafka-console-consumer.sh --bootstrap-server $NEW_CLUSTER --topic test-topic --from-beginning --max-messages 1 --timeout-ms 5000

echo "=== 流量切换完成 ==="

3.3 回滚方案

#!/bin/bash
# 回滚脚本

NEW_CLUSTER="new-broker-1:9092,new-broker-2:9092"
OLD_CLUSTER="old-broker-1:9092,old-broker-2:9092"

echo "=== 执行回滚 ==="

# 1. 停止 Producer
echo "停止 Producer..."

# 2. 恢复旧配置
echo "恢复旧配置..."
# 恢复应用配置指向旧集群

# 3. 启动 Producer
echo "启动 Producer..."

# 4. 验证
echo "验证旧集群..."
echo "test" | kafka-console-producer.sh --bootstrap-server $OLD_CLUSTER --topic test-topic

echo "=== 回滚完成 ==="

四、故障处理

4.1 升级失败

常见问题

# 问题 1: Broker 启动失败
# 解决:检查日志,回滚配置
tail -f /var/log/kafka/server.log

# 问题 2: 副本不同步
# 解决:等待同步或重新分配
kafka-reassign-partitions.sh --verify

# 问题 3: Consumer 无法连接
# 解决:检查 inter.broker.protocol.version

4.2 迁移失败

常见问题

# 问题 1: MirrorMaker 停止
# 解决:重启 MirrorMaker
connect-distributed.sh mm2.properties &

# 问题 2: 数据不一致
# 解决:重新运行 MirrorMaker 直到一致

# 问题 3: 性能下降
# 解决:调整 MirrorMaker 配置

4.3 应急方案

#!/bin/bash
# 应急方案脚本

echo "=== 应急方案 ==="

# 1. 停止迁移
echo "停止迁移..."
pkill -f connect-distributed

# 2. 恢复旧配置
echo "恢复旧配置..."
# 恢复应用配置

# 3. 通知相关人员
echo "通知相关人员..."
# 发送告警

# 4. 记录问题
echo "记录问题..."
# 记录日志

echo "=== 应急方案执行完成 ==="

五、最佳实践

5.1 升级建议

升级建议:
1. 在测试环境先验证
2. 选择低峰期执行
3. 逐步滚动升级
4. 备份配置和数据
5. 准备回滚方案

5.2 迁移建议

迁移建议:
1. 使用 MirrorMaker 2.0
2. 先迁移非关键业务
3. 验证数据一致性
4. 逐步切换流量
5. 保留旧集群一段时间

5.3 检查清单

升级检查:
- [ ] 备份配置和数据
- [ ] 测试环境验证
- [ ] 准备回滚方案
- [ ] 通知相关人员
- [ ] 选择低峰期

迁移检查:
- [ ] 配置 MirrorMaker
- [ ] 验证数据一致性
- [ ] 准备流量切换方案
- [ ] 准备回滚方案
- [ ] 监控迁移进度

总结

Kafka 集群迁移与升级的核心要点:

  1. 版本升级:滚动升级、配置更新、验证
  2. 集群扩容:添加 Broker、分区重分配
  3. 数据迁移:MirrorMaker、流量切换、回滚
  4. 故障处理:常见问题、应急方案
  5. 最佳实践:升级建议、迁移建议

核心要点

参考资料


分享这篇文章到:

上一篇文章
Kafka 精确一次语义(EOS)详解
下一篇文章
RocketMQ 运维监控与告警指南