Kafka 集群迁移和升级是运维工作中的重要场景。本文将深入探讨各种迁移升级方案,包括版本升级、集群扩容、数据迁移等。
一、版本升级
1.1 升级路径
推荐升级路径:
0.10.0.x → 0.10.1.x → 0.10.2.x → 0.11.0.x → 1.0.x → 1.1.x → 2.0.x → 2.1.x → 2.2.x → 2.3.x → 2.4.x → 2.5.x → 2.6.x → 2.7.x → 2.8.x → 3.0.x → 3.1.x → 3.2.x → 3.3.x → 3.4.x
注意:
- 每次升级最多跨越一个小版本
- 跨大版本需要逐步升级
- 升级前备份配置和数据
1.2 滚动升级
升级步骤:
#!/bin/bash
# Kafka 滚动升级脚本
OLD_VERSION="2.8.0"
NEW_VERSION="3.4.0"
BROKERS=("broker-1" "broker-2" "broker-3")
echo "=== Kafka 滚动升级 ==="
echo "从版本:$OLD_VERSION"
echo "到版本:$NEW_VERSION"
for broker in "${BROKERS[@]}"; do
echo -e "\n=== 升级 $broker ==="
# 1. 停止 Broker
echo "停止 Broker..."
ssh $broker "systemctl stop kafka"
# 2. 备份配置
echo "备份配置..."
ssh $broker "cp -r /opt/kafka-$OLD_VERSION/config /opt/kafka-$OLD_VERSION/config.bak"
# 3. 安装新版本
echo "安装新版本..."
ssh $broker "wget https://archive.apache.org/dist/kafka/$NEW_VERSION/kafka_2.13-$NEW_VERSION.tgz"
ssh $broker "tar -xzf kafka_2.13-$NEW_VERSION.tgz -C /opt/"
ssh $broker "mv /opt/kafka_2.13-$NEW_VERSION /opt/kafka-$NEW_VERSION"
# 4. 更新配置
echo "更新配置..."
ssh $broker "cp /opt/kafka-$OLD_VERSION/config/server.properties /opt/kafka-$NEW_VERSION/config/"
# 5. 启动 Broker
echo "启动 Broker..."
ssh $broker "systemctl start kafka"
# 6. 验证
echo "验证 Broker..."
ssh $broker "kafka-broker-api-versions.sh --bootstrap-server localhost:9092"
# 7. 等待同步
echo "等待副本同步..."
sleep 60
done
echo -e "\n=== 升级完成 ==="
1.3 升级配置
inter.broker.protocol.version:
# 升级过程中临时配置
inter.broker.protocol.version=2.8.0 # 保持旧版本
# 所有 Broker 升级完成后
inter.broker.protocol.version=3.4.0 # 升级到新版本
log.message.format.version=3.4.0
1.4 升级验证
#!/bin/bash
# 升级验证脚本
echo "=== 验证集群状态 ==="
# 1. 检查 Broker 版本
for broker in broker-1 broker-2 broker-3; do
version=$(ssh $broker "kafka-topics.sh --version")
echo "$broker: $version"
done
# 2. 检查 Topic
kafka-topics.sh --bootstrap-server localhost:9092 --describe
# 3. 检查 Consumer Group
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe
# 4. 发送测试消息
echo "test" | kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test-topic
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --max-messages 1 --timeout-ms 5000
echo "=== 验证完成 ==="
二、集群扩容
2.1 添加 Broker
步骤:
#!/bin/bash
# 添加新 Broker
NEW_BROKER="broker-4"
CLUSTER="broker-1:9092,broker-2:9092,broker-3:9092"
echo "=== 添加新 Broker ==="
# 1. 准备新服务器
echo "准备新服务器..."
ssh $NEW_BROKER "yum install -y java-11-openjdk"
ssh $NEW_BROKER "wget https://archive.apache.org/dist/kafka/3.4.0/kafka_2.13-3.4.0.tgz"
ssh $NEW_BROKER "tar -xzf kafka_2.13-3.4.0.tgz -C /opt/"
ssh $NEW_BROKER "mv /opt/kafka_2.13-3.4.0 /opt/kafka"
# 2. 配置 Broker
echo "配置 Broker..."
cat > /tmp/server.properties << EOF
broker.id=4
listeners=PLAINTEXT://:9092
log.dirs=/data/kafka-logs
zookeeper.connect=zk1:2181,zk2:2181,zk3:2181
EOF
scp /tmp/server.properties $NEW_BROKER:/opt/kafka/config/
# 3. 启动 Broker
echo "启动 Broker..."
ssh $NEW_BROKER "systemctl start kafka"
# 4. 验证
echo "验证 Broker..."
ssh $NEW_BROKER "kafka-broker-api-versions.sh --bootstrap-server localhost:9092"
echo "=== Broker 添加完成 ==="
2.2 分区重分配
生成重分配计划:
# 生成重分配计划
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
--topics-to-move-json-file topics-to-move.json \
--broker-list "1,2,3,4" \
--generate > reassign-plan.json
# topics-to-move.json
{
"topics": [
{"topic": "order-topic"},
{"topic": "pay-topic"}
]
}
执行重分配:
#!/bin/bash
# 执行分区重分配
echo "=== 执行分区重分配 ==="
# 1. 执行重分配
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
--reassignment-json-file reassign-plan.json \
--execute
# 2. 监控进度
while true; do
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
--reassignment-json-file reassign-plan.json \
--verify
if [ $? -eq 0 ]; then
echo "重分配完成"
break
fi
echo "等待重分配..."
sleep 60
done
echo "=== 重分配完成 ==="
2.3 扩容验证
#!/bin/bash
# 扩容验证脚本
echo "=== 验证扩容结果 ==="
# 1. 检查 Broker 数量
broker_count=$(kafka-broker-api-versions.sh --bootstrap-server localhost:9092 2>/dev/null | wc -l)
echo "Broker 数量:$broker_count"
# 2. 检查分区分布
kafka-topics.sh --bootstrap-server localhost:9092 --describe | \
grep -o "Replicas: [0-9,]*" | sort | uniq -c
# 3. 检查数据平衡
for broker in broker-1 broker-2 broker-3 broker-4; do
size=$(ssh $broker "du -sh /data/kafka-logs | awk '{print \$1}'")
echo "$broker: $size"
done
echo "=== 验证完成 ==="
三、数据迁移
3.1 MirrorMaker 迁移
配置 MirrorMaker:
# mm2.properties
# Source Cluster
source.cluster.alias=old-cluster
source.cluster.bootstrap.servers=old-broker-1:9092,old-broker-2:9092
# Target Cluster
target.cluster.alias=new-cluster
target.cluster.bootstrap.servers=new-broker-1:9092,new-broker-2:9092
# Topics
topics=.*
topics.exclude=.*-test
# Replication
replication.factor=3
replication.policy.class=org.apache.kafka.connect.mirror.DefaultReplicationPolicy
# Performance
consumer.fetch.max.bytes=52428800
producer.batch.size=16384
producer.linger.ms=100
启动 MirrorMaker:
#!/bin/bash
# 启动 MirrorMaker 迁移
echo "=== 启动数据迁移 ==="
# 启动 MirrorMaker 2.0
connect-distributed.sh mm2.properties &
# 监控迁移进度
while true; do
# 检查 Source 集群消息量
source_offset=$(kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list old-broker-1:9092 \
--topic order-topic | awk -F: '{sum+=$3} END {print sum}')
# 检查 Target 集群消息量
target_offset=$(kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list new-broker-1:9092 \
--topic order-topic | awk -F: '{sum+=$3} END {print sum}')
# 计算进度
if [ $source_offset -gt 0 ]; then
progress=$((target_offset * 100 / source_offset))
echo "迁移进度:$progress%"
fi
if [ $source_offset -eq $target_offset ]; then
echo "迁移完成"
break
fi
sleep 60
done
echo "=== 迁移完成 ==="
3.2 流量切换
切换步骤:
#!/bin/bash
# 流量切换脚本
OLD_CLUSTER="old-broker-1:9092,old-broker-2:9092"
NEW_CLUSTER="new-broker-1:9092,new-broker-2:9092"
echo "=== 流量切换 ==="
# 1. 停止 Producer
echo "通知 Producer 停止..."
# 发送信号给 Producer 应用
# 2. 等待消息处理完成
echo "等待消息处理完成..."
sleep 60
# 3. 检查数据一致性
echo "检查数据一致性..."
old_offset=$(kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list $OLD_CLUSTER \
--topic order-topic | awk -F: '{sum+=$3} END {print sum}')
new_offset=$(kafka-run-class.sh kafka.tools.GetOffsetShell \
--broker-list $NEW_CLUSTER \
--topic order-topic | awk -F: '{sum+=$3} END {print sum}')
if [ $old_offset -eq $new_offset ]; then
echo "数据一致"
else
echo "数据不一致:old=$old_offset, new=$new_offset"
exit 1
fi
# 4. 更新 Producer 配置
echo "更新 Producer 配置..."
# 更新应用配置指向新集群
# 5. 启动 Producer
echo "启动 Producer..."
# 重启 Producer 应用
# 6. 验证
echo "验证新集群..."
echo "test" | kafka-console-producer.sh --bootstrap-server $NEW_CLUSTER --topic test-topic
kafka-console-consumer.sh --bootstrap-server $NEW_CLUSTER --topic test-topic --from-beginning --max-messages 1 --timeout-ms 5000
echo "=== 流量切换完成 ==="
3.3 回滚方案
#!/bin/bash
# 回滚脚本
NEW_CLUSTER="new-broker-1:9092,new-broker-2:9092"
OLD_CLUSTER="old-broker-1:9092,old-broker-2:9092"
echo "=== 执行回滚 ==="
# 1. 停止 Producer
echo "停止 Producer..."
# 2. 恢复旧配置
echo "恢复旧配置..."
# 恢复应用配置指向旧集群
# 3. 启动 Producer
echo "启动 Producer..."
# 4. 验证
echo "验证旧集群..."
echo "test" | kafka-console-producer.sh --bootstrap-server $OLD_CLUSTER --topic test-topic
echo "=== 回滚完成 ==="
四、故障处理
4.1 升级失败
常见问题:
# 问题 1: Broker 启动失败
# 解决:检查日志,回滚配置
tail -f /var/log/kafka/server.log
# 问题 2: 副本不同步
# 解决:等待同步或重新分配
kafka-reassign-partitions.sh --verify
# 问题 3: Consumer 无法连接
# 解决:检查 inter.broker.protocol.version
4.2 迁移失败
常见问题:
# 问题 1: MirrorMaker 停止
# 解决:重启 MirrorMaker
connect-distributed.sh mm2.properties &
# 问题 2: 数据不一致
# 解决:重新运行 MirrorMaker 直到一致
# 问题 3: 性能下降
# 解决:调整 MirrorMaker 配置
4.3 应急方案
#!/bin/bash
# 应急方案脚本
echo "=== 应急方案 ==="
# 1. 停止迁移
echo "停止迁移..."
pkill -f connect-distributed
# 2. 恢复旧配置
echo "恢复旧配置..."
# 恢复应用配置
# 3. 通知相关人员
echo "通知相关人员..."
# 发送告警
# 4. 记录问题
echo "记录问题..."
# 记录日志
echo "=== 应急方案执行完成 ==="
五、最佳实践
5.1 升级建议
升级建议:
1. 在测试环境先验证
2. 选择低峰期执行
3. 逐步滚动升级
4. 备份配置和数据
5. 准备回滚方案
5.2 迁移建议
迁移建议:
1. 使用 MirrorMaker 2.0
2. 先迁移非关键业务
3. 验证数据一致性
4. 逐步切换流量
5. 保留旧集群一段时间
5.3 检查清单
升级检查:
- [ ] 备份配置和数据
- [ ] 测试环境验证
- [ ] 准备回滚方案
- [ ] 通知相关人员
- [ ] 选择低峰期
迁移检查:
- [ ] 配置 MirrorMaker
- [ ] 验证数据一致性
- [ ] 准备流量切换方案
- [ ] 准备回滚方案
- [ ] 监控迁移进度
总结
Kafka 集群迁移与升级的核心要点:
- 版本升级:滚动升级、配置更新、验证
- 集群扩容:添加 Broker、分区重分配
- 数据迁移:MirrorMaker、流量切换、回滚
- 故障处理:常见问题、应急方案
- 最佳实践:升级建议、迁移建议
核心要点:
- 升级前充分测试
- 迁移时保证数据一致
- 准备完善的回滚方案
- 选择低峰期执行
- 建立监控告警
参考资料
- Kafka Upgrade 官方文档
- Kafka Reassignment 官方文档
- 《Kafka 权威指南》第 11 章