Redis 数据恢复与迁移实战
数据是企业的核心资产。当 Redis 出现故障或需要迁移时,如何快速恢复数据、安全迁移?本文将分享完整的实战方案。
一、数据恢复
1.1 RDB 恢复
适用场景:
- 从 RDB 备份恢复
- 快速恢复到某个时间点
- 数据量较大时
# ==================== 步骤 1:准备 RDB 文件 ====================
# 检查备份文件
ls -lh /backup/dump-20251105.rdb
file /backup/dump-20251105.rdb
# 验证 RDB 文件完整性
redis-check-rdb /backup/dump-20251105.rdb
# ==================== 步骤 2:停止 Redis ====================
# 优雅关闭
redis-cli shutdown
# 或强制停止
kill -9 <redis-pid>
# ==================== 步骤 3:替换 RDB 文件 ====================
# 备份当前数据(可选)
mv /var/lib/redis/dump.rdb /var/lib/redis/dump.rdb.bak
# 复制备份文件
cp /backup/dump-20251105.rdb /var/lib/redis/dump.rdb
# 设置权限
chown redis:redis /var/lib/redis/dump.rdb
chmod 640 /var/lib/redis/dump.rdb
# ==================== 步骤 4:启动 Redis ====================
# 启动服务
redis-server /etc/redis/redis.conf
# 验证启动
redis-cli ping
# 检查数据
redis-cli dbsize
redis-cli info keyspace
1.2 AOF 恢复
适用场景:
- 需要最大数据完整性
- 从 AOF 备份恢复
- 混合持久化恢复
# ==================== 步骤 1:准备 AOF 文件 ====================
# 检查 AOF 文件
ls -lh /var/lib/redis/appendonly.aof
# 验证 AOF 文件
redis-check-aof /var/lib/redis/appendonly.aof
# ==================== 步骤 2:修复损坏的 AOF(如有必要) ====================
# 检查并修复
redis-check-aof --fix /var/lib/redis/appendonly.aof
# 输出示例
# Start checking Multi Bulk Block 18446744073709551615
# AOF analyzed: size=1048576, ok_up_to=1045678, ok_up_to_line=3456
# This will shrink the AOF from 1048576 bytes, to 1045678 bytes
# Continue? [y/N]: y
# Successfully truncated AOF file
# ==================== 步骤 3:从备份恢复 ====================
# 停止 Redis
redis-cli shutdown
# 备份当前 AOF
mv /var/lib/redis/appendonly.aof /var/lib/redis/appendonly.aof.bak
# 恢复备份
cp /backup/appendonly-20251105.aof /var/lib/redis/appendonly.aof
# 如果是混合持久化,确保格式正确
file /var/lib/redis/appendonly.aof
# 输出:Redis RDB format 9 with AOF preamble
# 启动 Redis
redis-server /etc/redis/redis.conf
# 验证数据
redis-cli keys "*" | wc -l
1.3 部分恢复
适用场景:
- 只恢复部分数据
- 合并多个备份
- 选择性恢复
#!/bin/bash
# partial_restore.sh
# 从 RDB 中提取特定 key 的数据
RDB_FILE="/backup/dump-20251105.rdb"
TARGET_KEYS="user:*"
OUTPUT_FILE="/tmp/partial_data.rdb"
# 使用 rdb-tools 工具
# 安装:pip install rdbtools
# 解析 RDB 文件
rdb -c json $RDB_FILE > /tmp/full_data.json
# 提取目标 key
cat /tmp/full_data.json | jq "to_entries | map(select(.key | test(\"user:*\"))) | from_entries" > /tmp/partial_data.json
# 转换为 Redis 命令
cat /tmp/partial_data.json | jq -r 'to_entries[] | "SELECT 0\nSET \(.key) \(.value | tojson)"' > /tmp/restore_commands.txt
# 执行恢复
redis-cli < /tmp/restore_commands.txt
echo "部分恢复完成"
1.4 Java 实现数据恢复
import redis.clients.jedis.Jedis;
import java.io.*;
import java.nio.file.*;
import java.util.*;
public class RedisDataRecovery {
private Jedis jedis;
private String dataDir;
public RedisDataRecovery(Jedis jedis, String dataDir) {
this.jedis = jedis;
this.dataDir = dataDir;
}
/**
* 从 RDB 恢复
*/
public void recoverFromRdb(String rdbPath) throws IOException, InterruptedException {
// 1. 验证 RDB 文件
if (!validateRdbFile(rdbPath)) {
throw new IOException("RDB 文件无效");
}
// 2. 备份当前数据
backupCurrentData();
// 3. 复制 RDB 文件
String destPath = dataDir + "/dump.rdb";
Files.copy(Paths.get(rdbPath), Paths.get(destPath),
StandardCopyOption.REPLACE_EXISTING);
// 4. 重启 Redis(需要外部脚本配合)
restartRedis();
// 5. 验证恢复
Thread.sleep(3000);
if (!verifyRecovery()) {
throw new RuntimeException("数据恢复失败");
}
System.out.println("RDB 恢复成功");
}
/**
* 从 AOF 恢复
*/
public void recoverFromAof(String aofPath) throws IOException, InterruptedException {
// 1. 检查并修复 AOF
if (!checkAndFixAof(aofPath)) {
throw new IOException("AOF 文件损坏且无法修复");
}
// 2. 备份当前 AOF
backupCurrentData();
// 3. 复制 AOF 文件
String destPath = dataDir + "/appendonly.aof";
Files.copy(Paths.get(aofPath), Paths.get(destPath),
StandardCopyOption.REPLACE_EXISTING);
// 4. 重启 Redis
restartRedis();
// 5. 验证恢复
Thread.sleep(3000);
if (!verifyRecovery()) {
throw new RuntimeException("数据恢复失败");
}
System.out.println("AOF 恢复成功");
}
/**
* 验证 RDB 文件
*/
private boolean validateRdbFile(String rdbPath) throws IOException, InterruptedException {
ProcessBuilder pb = new ProcessBuilder("redis-check-rdb", rdbPath);
Process process = pb.start();
int exitCode = process.waitFor();
return exitCode == 0;
}
/**
* 检查并修复 AOF
*/
private boolean checkAndFixAof(String aofPath) throws IOException, InterruptedException {
ProcessBuilder pb = new ProcessBuilder("redis-check-aof", "--fix", aofPath);
pb.redirectInput(ProcessBuilder.Redirect.PIPE);
Process process = pb.start();
// 自动确认修复
try (OutputStream os = process.getOutputStream()) {
os.write("y\n".getBytes());
}
int exitCode = process.waitFor();
return exitCode == 0;
}
/**
* 备份当前数据
*/
private void backupCurrentData() throws IOException {
String timestamp = new java.text.SimpleDateFormat("yyyyMMdd_HHmmss")
.format(new java.util.Date());
// 触发 BGSAVE
jedis.bgsave();
// 等待保存完成
Thread.sleep(2000);
// 备份 RDB
Files.copy(Paths.get(dataDir + "/dump.rdb"),
Paths.get(dataDir + "/dump_" + timestamp + ".rdb.bak"));
// 备份 AOF
File aofDir = new File(dataDir + "/appendonlydir");
if (aofDir.exists()) {
// 备份整个目录
// 实际项目中需要使用工具类复制目录
}
}
/**
* 重启 Redis
*/
private void restartRedis() throws IOException, InterruptedException {
// 停止
ProcessBuilder stopPb = new ProcessBuilder("redis-cli", "shutdown");
stopPb.start().waitFor();
Thread.sleep(2000);
// 启动
ProcessBuilder startPb = new ProcessBuilder("redis-server", "/etc/redis/redis.conf");
startPb.start();
}
/**
* 验证恢复
*/
private boolean verifyRecovery() {
try {
long dbSize = jedis.dbSize();
System.out.println("数据库大小:" + dbSize);
return dbSize > 0;
} catch (Exception e) {
System.err.println("验证失败:" + e.getMessage());
return false;
}
}
/**
* 导出指定 key 的数据
*/
public void exportKeys(String pattern, String outputFile) throws IOException {
Set<String> keys = jedis.keys(pattern);
try (PrintWriter writer = new PrintWriter(new FileWriter(outputFile))) {
for (String key : keys) {
String type = jedis.type(key);
writer.println("# Key: " + key + ", Type: " + type);
switch (type) {
case "string":
writer.println("SET " + key + " " + jedis.get(key));
break;
case "list":
List<String> list = jedis.lrange(key, 0, -1);
for (String value : list) {
writer.println("RPUSH " + key + " " + value);
}
break;
case "set":
Set<String> set = jedis.smembers(key);
for (String value : set) {
writer.println("SADD " + key + " " + value);
}
break;
case "zset":
Set<Tuple> zset = jedis.zrangeWithScores(key, 0, -1);
for (Tuple tuple : zset) {
writer.println("ZADD " + key + " " + tuple.getScore() + " " + tuple.getElement());
}
break;
case "hash":
Map<String, String> hash = jedis.hgetAll(key);
for (Map.Entry<String, String> entry : hash.entrySet()) {
writer.println("HSET " + key + " " + entry.getKey() + " " + entry.getValue());
}
break;
}
}
}
System.out.println("导出完成:" + outputFile);
}
}
二、数据迁移
2.1 迁移方案对比
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| RDB 文件复制 | 简单、快速 | 需要停机、数据可能过期 | 小数据量、可停机 |
| AOF 回放 | 数据完整 | 慢、需要重放 | 大数据量、要求完整 |
| 主从同步 | 在线、无感知 | 配置复杂、需要额外资源 | 生产环境、零停机 |
| Redis Shake | 功能强大、支持同步 | 需要额外工具 | 跨云、跨版本 |
| 自定义脚本 | 灵活可控 | 开发成本高 | 特殊需求 |
2.2 主从同步迁移
最推荐的在线迁移方案
迁移架构:
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 旧 Redis │ ──────▶ │ 从节点 │ ──────▶ │ 新 Redis │
│ (Master) │ 全量 │ (Slave) │ 切换 │ (New Master)│
└─────────────┘ └─────────────┘ └─────────────┘
步骤 1:配置新 Redis 为从节点
# 新 Redis 配置(redis-new.conf)
bind 0.0.0.0
port 6380
daemonize yes
pidfile /var/run/redis/redis-new.pid
logfile /var/log/redis/redis-new.log
dir /var/lib/redis-new
# 可选:设置密码
requirepass your_password
masterauth your_password
步骤 2:启动新 Redis 并建立主从关系
# 启动新 Redis
redis-server /etc/redis/redis-new.conf
# 配置为主从
redis-cli -p 6380 SLAVEOF <old-redis-ip> 6379
# 如果开启了密码
redis-cli -p 6380 AUTH your_password
redis-cli -p 6380 CONFIG SET masterauth your_password
# 检查同步状态
redis-cli -p 6380 INFO replication
步骤 3:等待数据同步完成
# 监控同步进度
watch -n 1 'redis-cli -p 6380 INFO replication | grep -E "role|master_sync|connected"'
# 输出示例
# role:slave
# master_sync_in_progress:0
# master_last_io_seconds_ago:0
# master_sync_left_bytes:0
# connected:yes
步骤 4:切换流量
# 1. 停止旧 Redis 写入(应用层)
# 2. 等待数据完全同步
redis-cli -p 6380 INFO replication
# 确认 master_sync_left_bytes: 0
# 3. 提升从节点为主节点
redis-cli -p 6380 SLAVEOF NO ONE
# 4. 验证新主节点
redis-cli -p 6380 INFO replication
# role:master
# 5. 更新应用配置,指向新 Redis
# 6. (可选)旧 Redis 降级为从节点
redis-cli -p 6379 SLAVEOF <new-redis-ip> 6380
2.3 Redis Shake 迁移
Redis Shake 是阿里开源的 Redis 数据同步工具
# ==================== 安装 ====================
# 下载
wget https://github.com/tair-opensource/RedisShake/releases/download/v4.0.0/redis-shake-4.0.0-linux-amd64.tar.gz
# 解压
tar -xzf redis-shake-4.0.0-linux-amd64.tar.gz
cd redis-shake-4.0.0-linux-amd64
# ==================== 配置 ====================
# 创建配置文件 shake.conf
cat > shake.conf << EOF
[sync]
# 源 Redis
reader.address = "192.168.1.100:6379"
reader.password = "source_password"
# 目标 Redis
writer.address = "192.168.1.101:6380"
writer.password = "target_password"
# 同步模式:restore(一次性)、sync(持续同步)
mode = "sync"
# 并发数
parallel = 10
# 过滤 key(可选)
filter.key_prefix = "user:"
EOF
# ==================== 执行迁移 ====================
# 全量 + 增量同步
./redis-shake -conf=shake.conf
# 只全量(不持续同步)
./redis-shake -conf=shake.conf -mode=restore
# ==================== 监控进度 ====================
# 查看日志
tail -f redis-shake.log
# 查看统计信息
curl http://localhost:9233/metrics
2.4 使用 redis-cli 迁移
适合小数据量迁移
#!/bin/bash
# migrate_with_cli.sh
SOURCE_HOST="192.168.1.100"
SOURCE_PORT="6379"
SOURCE_AUTH="source_password"
TARGET_HOST="192.168.1.101"
TARGET_PORT="6380"
TARGET_AUTH="target_password"
# 导出所有数据
redis-cli -h $SOURCE_HOST -p $SOURCE_PORT -a $SOURCE_AUTH --rdb /tmp/dump.rdb
# 或使用 SCAN + 导出
redis-cli -h $SOURCE_HOST -p $SOURCE_PORT -a $SOURCE_AUTH --scan --pattern "*" | \
while read key; do
# 获取类型
type=$(redis-cli -h $SOURCE_HOST -p $SOURCE_PORT -a $SOURCE_AUTH TYPE $key)
# 根据类型导出
case $type in
string)
value=$(redis-cli -h $SOURCE_HOST -p $SOURCE_PORT -a $SOURCE_AUTH GET $key)
redis-cli -h $TARGET_HOST -p $TARGET_PORT -a $TARGET_AUTH SET $key "$value"
;;
list)
redis-cli -h $SOURCE_HOST -p $SOURCE_PORT -a $SOURCE_AUTH LRANGE $key 0 -1 | \
while read value; do
redis-cli -h $TARGET_HOST -p $TARGET_PORT -a $TARGET_AUTH RPUSH $key "$value"
done
;;
# 其他类型类似处理
esac
done
echo "迁移完成"
2.5 Java 实现数据迁移
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Tuple;
import java.util.*;
public class RedisDataMigration {
private Jedis sourceJedis;
private Jedis targetJedis;
private int batchSize = 100;
public RedisDataMigration(Jedis sourceJedis, Jedis targetJedis) {
this.sourceJedis = sourceJedis;
this.targetJedis = targetJedis;
}
/**
* 全量迁移
*/
public void fullMigration() {
long totalKeys = 0;
long migratedKeys = 0;
// 使用 SCAN 遍历所有 key
String cursor = ScanParams.SCAN_POINTER_START;
ScanParams scanParams = new ScanParams();
scanParams.count(batchSize);
while (true) {
ScanResult<String> scanResult = sourceJedis.scan(cursor, scanParams);
cursor = scanResult.getCursor();
List<String> keys = scanResult.getResult();
totalKeys += keys.size();
// 批量迁移
for (String key : keys) {
migrateKey(key);
migratedKeys++;
if (migratedKeys % 1000 == 0) {
System.out.println("已迁移:" + migratedKeys + " 个 key");
}
}
// SCAN 完成
if (ScanParams.SCAN_POINTER_START.equals(cursor)) {
break;
}
}
System.out.println("迁移完成!总计:" + totalKeys + " 个 key");
}
/**
* 迁移单个 key
*/
private void migrateKey(String key) {
String type = sourceJedis.type(key);
switch (type) {
case "string":
String value = sourceJedis.get(key);
Long ttl = sourceJedis.ttl(key);
if (ttl == -2) {
targetJedis.set(key, value);
} else if (ttl == -1) {
targetJedis.set(key, value);
} else {
targetJedis.setex(key, ttl, value);
}
break;
case "list":
List<String> list = sourceJedis.lrange(key, 0, -1);
if (!list.isEmpty()) {
targetJedis.del(key);
for (String item : list) {
targetJedis.rpush(key, item);
}
Long ttl = sourceJedis.ttl(key);
if (ttl > 0) {
targetJedis.expire(key, ttl);
}
}
break;
case "set":
Set<String> set = sourceJedis.smembers(key);
if (!set.isEmpty()) {
targetJedis.del(key);
for (String item : set) {
targetJedis.sadd(key, item);
}
Long ttl = sourceJedis.ttl(key);
if (ttl > 0) {
targetJedis.expire(key, ttl);
}
}
break;
case "zset":
Set<Tuple> zset = sourceJedis.zrangeWithScores(key, 0, -1);
if (!zset.isEmpty()) {
targetJedis.del(key);
for (Tuple tuple : zset) {
targetJedis.zadd(key, tuple.getScore(), tuple.getElement());
}
Long ttl = sourceJedis.ttl(key);
if (ttl > 0) {
targetJedis.expire(key, ttl);
}
}
break;
case "hash":
Map<String, String> hash = sourceJedis.hgetAll(key);
if (!hash.isEmpty()) {
targetJedis.del(key);
targetJedis.hmset(key, hash);
Long ttl = sourceJedis.ttl(key);
if (ttl > 0) {
targetJedis.expire(key, ttl);
}
}
break;
default:
System.err.println("未知类型:" + type + ", key: " + key);
}
}
/**
* 带过滤的迁移
*/
public void filteredMigration(String pattern) {
String cursor = ScanParams.SCAN_POINTER_START;
ScanParams scanParams = new ScanParams();
scanParams.match(pattern);
scanParams.count(batchSize);
long count = 0;
while (true) {
ScanResult<String> scanResult = sourceJedis.scan(cursor, scanParams);
cursor = scanResult.getCursor();
List<String> keys = scanResult.getResult();
for (String key : keys) {
migrateKey(key);
count++;
}
if (ScanParams.SCAN_POINTER_START.equals(cursor)) {
break;
}
}
System.out.println("过滤迁移完成,pattern: " + pattern + ", 迁移:" + count + " 个 key");
}
/**
* 验证迁移结果
*/
public boolean verifyMigration() {
long sourceCount = sourceJedis.dbSize();
long targetCount = targetJedis.dbSize();
System.out.println("源数据库:" + sourceCount + " 个 key");
System.out.println("目标数据库:" + targetCount + " 个 key");
if (sourceCount != targetCount) {
System.err.println("⚠️ 数量不一致!");
return false;
}
System.out.println("✓ 数量一致");
return true;
}
}
三、跨云迁移
3.1 阿里云 Redis 迁移
# 使用阿里云 DTS(数据传输服务)
# 1. 登录阿里云控制台
# 2. 进入 DTS 服务
# 3. 创建数据同步任务
# 4. 配置源和目标
# 5. 预检查并启动
3.2 腾讯云 Redis 迁移
# 使用腾讯云 DTS
# 1. 登录腾讯云控制台
# 2. 进入数据传输 DTS
# 3. 创建数据迁移任务
# 4. 选择 Redis 迁移
# 5. 配置源和目标实例
# 6. 启动迁移
3.3 AWS ElastiCache 迁移
# 使用 AWS Data Migration Service
# 1. 创建 DMS 复制实例
# 2. 创建源和目标端点
# 3. 创建迁移任务
# 4. 选择迁移类型(全量/增量)
# 5. 启动任务
四、迁移验证
4.1 数据一致性检查
#!/bin/bash
# verify_migration.sh
SOURCE_HOST="192.168.1.100"
SOURCE_PORT="6379"
TARGET_HOST="192.168.1.101"
TARGET_PORT="6380"
# 检查 key 数量
SOURCE_COUNT=$(redis-cli -h $SOURCE_HOST -p $SOURCE_PORT DBSIZE | awk '{print $2}')
TARGET_COUNT=$(redis-cli -h $TARGET_HOST -p $TARGET_PORT DBSIZE | awk '{print $2}')
echo "源数据库 key 数量:$SOURCE_COUNT"
echo "目标数据库 key 数量:$TARGET_COUNT"
if [ "$SOURCE_COUNT" != "$TARGET_COUNT" ]; then
echo "❌ 数量不一致"
exit 1
fi
echo "✓ 数量一致"
# 随机抽样检查
echo "开始抽样检查..."
for i in {1..100}; do
# 随机获取一个 key
RANDOM_KEY=$(redis-cli -h $SOURCE_HOST -p $SOURCE_PORT RANDOMKEY)
if [ -n "$RANDOM_KEY" ]; then
# 获取源数据
SOURCE_VALUE=$(redis-cli -h $SOURCE_HOST -p $SOURCE_PORT GET "$RANDOM_KEY")
# 获取目标数据
TARGET_VALUE=$(redis-cli -h $TARGET_HOST -p $TARGET_PORT GET "$RANDOM_KEY")
if [ "$SOURCE_VALUE" != "$TARGET_VALUE" ]; then
echo "❌ 数据不一致:$RANDOM_KEY"
echo " 源:$SOURCE_VALUE"
echo " 目标:$TARGET_VALUE"
fi
fi
done
echo "✓ 抽样检查完成"
4.2 性能验证
# 使用 redis-benchmark 测试性能
# 源 Redis 性能
redis-benchmark -h 192.168.1.100 -p 6379 -q
# 目标 Redis 性能
redis-benchmark -h 192.168.1.101 -p 6380 -q
# 对比结果
# SET: 源 100000 ops/sec, 目标 100000 ops/sec ✓
# GET: 源 150000 ops/sec, 目标 150000 ops/sec ✓
五、总结
5.1 迁移方案选择
| 数据量 | 可停机时间 | 推荐方案 |
|---|---|---|
| < 1GB | 可停机 | RDB 文件复制 |
| < 10GB | 短暂停机 | 主从同步 |
| > 10GB | 零停机 | 主从同步 + Redis Shake |
| 跨云 | 零停机 | 云厂商 DTS 服务 |
5.2 注意事项
-
迁移前
- 完整备份源数据
- 评估迁移时间窗口
- 准备回滚方案
-
迁移中
- 监控同步进度
- 避免大量写入
- 保持网络稳定
-
迁移后
- 验证数据一致性
- 性能基准测试
- 观察运行状态
参考资料