Skip to content
清晨的一缕阳光
返回

Redis 数据恢复与迁移实战

Redis 数据恢复与迁移实战

数据是企业的核心资产。当 Redis 出现故障或需要迁移时,如何快速恢复数据、安全迁移?本文将分享完整的实战方案。

一、数据恢复

1.1 RDB 恢复

适用场景

# ==================== 步骤 1:准备 RDB 文件 ====================

# 检查备份文件
ls -lh /backup/dump-20251105.rdb
file /backup/dump-20251105.rdb

# 验证 RDB 文件完整性
redis-check-rdb /backup/dump-20251105.rdb

# ==================== 步骤 2:停止 Redis ====================

# 优雅关闭
redis-cli shutdown

# 或强制停止
kill -9 <redis-pid>

# ==================== 步骤 3:替换 RDB 文件 ====================

# 备份当前数据(可选)
mv /var/lib/redis/dump.rdb /var/lib/redis/dump.rdb.bak

# 复制备份文件
cp /backup/dump-20251105.rdb /var/lib/redis/dump.rdb

# 设置权限
chown redis:redis /var/lib/redis/dump.rdb
chmod 640 /var/lib/redis/dump.rdb

# ==================== 步骤 4:启动 Redis ====================

# 启动服务
redis-server /etc/redis/redis.conf

# 验证启动
redis-cli ping

# 检查数据
redis-cli dbsize
redis-cli info keyspace

1.2 AOF 恢复

适用场景

# ==================== 步骤 1:准备 AOF 文件 ====================

# 检查 AOF 文件
ls -lh /var/lib/redis/appendonly.aof

# 验证 AOF 文件
redis-check-aof /var/lib/redis/appendonly.aof

# ==================== 步骤 2:修复损坏的 AOF(如有必要) ====================

# 检查并修复
redis-check-aof --fix /var/lib/redis/appendonly.aof

# 输出示例
# Start checking Multi Bulk Block 18446744073709551615
# AOF analyzed: size=1048576, ok_up_to=1045678, ok_up_to_line=3456
# This will shrink the AOF from 1048576 bytes, to 1045678 bytes
# Continue? [y/N]: y
# Successfully truncated AOF file

# ==================== 步骤 3:从备份恢复 ====================

# 停止 Redis
redis-cli shutdown

# 备份当前 AOF
mv /var/lib/redis/appendonly.aof /var/lib/redis/appendonly.aof.bak

# 恢复备份
cp /backup/appendonly-20251105.aof /var/lib/redis/appendonly.aof

# 如果是混合持久化,确保格式正确
file /var/lib/redis/appendonly.aof
# 输出:Redis RDB format 9 with AOF preamble

# 启动 Redis
redis-server /etc/redis/redis.conf

# 验证数据
redis-cli keys "*" | wc -l

1.3 部分恢复

适用场景

#!/bin/bash
# partial_restore.sh

# 从 RDB 中提取特定 key 的数据

RDB_FILE="/backup/dump-20251105.rdb"
TARGET_KEYS="user:*"
OUTPUT_FILE="/tmp/partial_data.rdb"

# 使用 rdb-tools 工具
# 安装:pip install rdbtools

# 解析 RDB 文件
rdb -c json $RDB_FILE > /tmp/full_data.json

# 提取目标 key
cat /tmp/full_data.json | jq "to_entries | map(select(.key | test(\"user:*\"))) | from_entries" > /tmp/partial_data.json

# 转换为 Redis 命令
cat /tmp/partial_data.json | jq -r 'to_entries[] | "SELECT 0\nSET \(.key) \(.value | tojson)"' > /tmp/restore_commands.txt

# 执行恢复
redis-cli < /tmp/restore_commands.txt

echo "部分恢复完成"

1.4 Java 实现数据恢复

import redis.clients.jedis.Jedis;
import java.io.*;
import java.nio.file.*;
import java.util.*;

public class RedisDataRecovery {
    private Jedis jedis;
    private String dataDir;
    
    public RedisDataRecovery(Jedis jedis, String dataDir) {
        this.jedis = jedis;
        this.dataDir = dataDir;
    }
    
    /**
     * 从 RDB 恢复
     */
    public void recoverFromRdb(String rdbPath) throws IOException, InterruptedException {
        // 1. 验证 RDB 文件
        if (!validateRdbFile(rdbPath)) {
            throw new IOException("RDB 文件无效");
        }
        
        // 2. 备份当前数据
        backupCurrentData();
        
        // 3. 复制 RDB 文件
        String destPath = dataDir + "/dump.rdb";
        Files.copy(Paths.get(rdbPath), Paths.get(destPath), 
                   StandardCopyOption.REPLACE_EXISTING);
        
        // 4. 重启 Redis(需要外部脚本配合)
        restartRedis();
        
        // 5. 验证恢复
        Thread.sleep(3000);
        if (!verifyRecovery()) {
            throw new RuntimeException("数据恢复失败");
        }
        
        System.out.println("RDB 恢复成功");
    }
    
    /**
     * 从 AOF 恢复
     */
    public void recoverFromAof(String aofPath) throws IOException, InterruptedException {
        // 1. 检查并修复 AOF
        if (!checkAndFixAof(aofPath)) {
            throw new IOException("AOF 文件损坏且无法修复");
        }
        
        // 2. 备份当前 AOF
        backupCurrentData();
        
        // 3. 复制 AOF 文件
        String destPath = dataDir + "/appendonly.aof";
        Files.copy(Paths.get(aofPath), Paths.get(destPath), 
                   StandardCopyOption.REPLACE_EXISTING);
        
        // 4. 重启 Redis
        restartRedis();
        
        // 5. 验证恢复
        Thread.sleep(3000);
        if (!verifyRecovery()) {
            throw new RuntimeException("数据恢复失败");
        }
        
        System.out.println("AOF 恢复成功");
    }
    
    /**
     * 验证 RDB 文件
     */
    private boolean validateRdbFile(String rdbPath) throws IOException, InterruptedException {
        ProcessBuilder pb = new ProcessBuilder("redis-check-rdb", rdbPath);
        Process process = pb.start();
        int exitCode = process.waitFor();
        return exitCode == 0;
    }
    
    /**
     * 检查并修复 AOF
     */
    private boolean checkAndFixAof(String aofPath) throws IOException, InterruptedException {
        ProcessBuilder pb = new ProcessBuilder("redis-check-aof", "--fix", aofPath);
        pb.redirectInput(ProcessBuilder.Redirect.PIPE);
        Process process = pb.start();
        
        // 自动确认修复
        try (OutputStream os = process.getOutputStream()) {
            os.write("y\n".getBytes());
        }
        
        int exitCode = process.waitFor();
        return exitCode == 0;
    }
    
    /**
     * 备份当前数据
     */
    private void backupCurrentData() throws IOException {
        String timestamp = new java.text.SimpleDateFormat("yyyyMMdd_HHmmss")
            .format(new java.util.Date());
        
        // 触发 BGSAVE
        jedis.bgsave();
        
        // 等待保存完成
        Thread.sleep(2000);
        
        // 备份 RDB
        Files.copy(Paths.get(dataDir + "/dump.rdb"), 
                   Paths.get(dataDir + "/dump_" + timestamp + ".rdb.bak"));
        
        // 备份 AOF
        File aofDir = new File(dataDir + "/appendonlydir");
        if (aofDir.exists()) {
            // 备份整个目录
            // 实际项目中需要使用工具类复制目录
        }
    }
    
    /**
     * 重启 Redis
     */
    private void restartRedis() throws IOException, InterruptedException {
        // 停止
        ProcessBuilder stopPb = new ProcessBuilder("redis-cli", "shutdown");
        stopPb.start().waitFor();
        
        Thread.sleep(2000);
        
        // 启动
        ProcessBuilder startPb = new ProcessBuilder("redis-server", "/etc/redis/redis.conf");
        startPb.start();
    }
    
    /**
     * 验证恢复
     */
    private boolean verifyRecovery() {
        try {
            long dbSize = jedis.dbSize();
            System.out.println("数据库大小:" + dbSize);
            return dbSize > 0;
        } catch (Exception e) {
            System.err.println("验证失败:" + e.getMessage());
            return false;
        }
    }
    
    /**
     * 导出指定 key 的数据
     */
    public void exportKeys(String pattern, String outputFile) throws IOException {
        Set<String> keys = jedis.keys(pattern);
        
        try (PrintWriter writer = new PrintWriter(new FileWriter(outputFile))) {
            for (String key : keys) {
                String type = jedis.type(key);
                writer.println("# Key: " + key + ", Type: " + type);
                
                switch (type) {
                    case "string":
                        writer.println("SET " + key + " " + jedis.get(key));
                        break;
                    case "list":
                        List<String> list = jedis.lrange(key, 0, -1);
                        for (String value : list) {
                            writer.println("RPUSH " + key + " " + value);
                        }
                        break;
                    case "set":
                        Set<String> set = jedis.smembers(key);
                        for (String value : set) {
                            writer.println("SADD " + key + " " + value);
                        }
                        break;
                    case "zset":
                        Set<Tuple> zset = jedis.zrangeWithScores(key, 0, -1);
                        for (Tuple tuple : zset) {
                            writer.println("ZADD " + key + " " + tuple.getScore() + " " + tuple.getElement());
                        }
                        break;
                    case "hash":
                        Map<String, String> hash = jedis.hgetAll(key);
                        for (Map.Entry<String, String> entry : hash.entrySet()) {
                            writer.println("HSET " + key + " " + entry.getKey() + " " + entry.getValue());
                        }
                        break;
                }
            }
        }
        
        System.out.println("导出完成:" + outputFile);
    }
}

二、数据迁移

2.1 迁移方案对比

方案优点缺点适用场景
RDB 文件复制简单、快速需要停机、数据可能过期小数据量、可停机
AOF 回放数据完整慢、需要重放大数据量、要求完整
主从同步在线、无感知配置复杂、需要额外资源生产环境、零停机
Redis Shake功能强大、支持同步需要额外工具跨云、跨版本
自定义脚本灵活可控开发成本高特殊需求

2.2 主从同步迁移

最推荐的在线迁移方案

迁移架构:
┌─────────────┐         ┌─────────────┐         ┌─────────────┐
│  旧 Redis   │ ──────▶ │  从节点     │ ──────▶ │  新 Redis   │
│  (Master)   │  全量   │  (Slave)    │  切换   │  (New Master)│
└─────────────┘         └─────────────┘         └─────────────┘

步骤 1:配置新 Redis 为从节点

# 新 Redis 配置(redis-new.conf)
bind 0.0.0.0
port 6380
daemonize yes
pidfile /var/run/redis/redis-new.pid
logfile /var/log/redis/redis-new.log
dir /var/lib/redis-new

# 可选:设置密码
requirepass your_password
masterauth your_password

步骤 2:启动新 Redis 并建立主从关系

# 启动新 Redis
redis-server /etc/redis/redis-new.conf

# 配置为主从
redis-cli -p 6380 SLAVEOF <old-redis-ip> 6379

# 如果开启了密码
redis-cli -p 6380 AUTH your_password
redis-cli -p 6380 CONFIG SET masterauth your_password

# 检查同步状态
redis-cli -p 6380 INFO replication

步骤 3:等待数据同步完成

# 监控同步进度
watch -n 1 'redis-cli -p 6380 INFO replication | grep -E "role|master_sync|connected"'

# 输出示例
# role:slave
# master_sync_in_progress:0
# master_last_io_seconds_ago:0
# master_sync_left_bytes:0
# connected:yes

步骤 4:切换流量

# 1. 停止旧 Redis 写入(应用层)

# 2. 等待数据完全同步
redis-cli -p 6380 INFO replication
# 确认 master_sync_left_bytes: 0

# 3. 提升从节点为主节点
redis-cli -p 6380 SLAVEOF NO ONE

# 4. 验证新主节点
redis-cli -p 6380 INFO replication
# role:master

# 5. 更新应用配置,指向新 Redis

# 6. (可选)旧 Redis 降级为从节点
redis-cli -p 6379 SLAVEOF <new-redis-ip> 6380

2.3 Redis Shake 迁移

Redis Shake 是阿里开源的 Redis 数据同步工具

# ==================== 安装 ====================

# 下载
wget https://github.com/tair-opensource/RedisShake/releases/download/v4.0.0/redis-shake-4.0.0-linux-amd64.tar.gz

# 解压
tar -xzf redis-shake-4.0.0-linux-amd64.tar.gz
cd redis-shake-4.0.0-linux-amd64

# ==================== 配置 ====================

# 创建配置文件 shake.conf
cat > shake.conf << EOF
[sync]
# 源 Redis
reader.address = "192.168.1.100:6379"
reader.password = "source_password"

# 目标 Redis
writer.address = "192.168.1.101:6380"
writer.password = "target_password"

# 同步模式:restore(一次性)、sync(持续同步)
mode = "sync"

# 并发数
parallel = 10

# 过滤 key(可选)
filter.key_prefix = "user:"
EOF

# ==================== 执行迁移 ====================

# 全量 + 增量同步
./redis-shake -conf=shake.conf

# 只全量(不持续同步)
./redis-shake -conf=shake.conf -mode=restore

# ==================== 监控进度 ====================

# 查看日志
tail -f redis-shake.log

# 查看统计信息
curl http://localhost:9233/metrics

2.4 使用 redis-cli 迁移

适合小数据量迁移

#!/bin/bash
# migrate_with_cli.sh

SOURCE_HOST="192.168.1.100"
SOURCE_PORT="6379"
SOURCE_AUTH="source_password"

TARGET_HOST="192.168.1.101"
TARGET_PORT="6380"
TARGET_AUTH="target_password"

# 导出所有数据
redis-cli -h $SOURCE_HOST -p $SOURCE_PORT -a $SOURCE_AUTH --rdb /tmp/dump.rdb

# 或使用 SCAN + 导出
redis-cli -h $SOURCE_HOST -p $SOURCE_PORT -a $SOURCE_AUTH --scan --pattern "*" | \
while read key; do
    # 获取类型
    type=$(redis-cli -h $SOURCE_HOST -p $SOURCE_PORT -a $SOURCE_AUTH TYPE $key)
    
    # 根据类型导出
    case $type in
        string)
            value=$(redis-cli -h $SOURCE_HOST -p $SOURCE_PORT -a $SOURCE_AUTH GET $key)
            redis-cli -h $TARGET_HOST -p $TARGET_PORT -a $TARGET_AUTH SET $key "$value"
            ;;
        list)
            redis-cli -h $SOURCE_HOST -p $SOURCE_PORT -a $SOURCE_AUTH LRANGE $key 0 -1 | \
            while read value; do
                redis-cli -h $TARGET_HOST -p $TARGET_PORT -a $TARGET_AUTH RPUSH $key "$value"
            done
            ;;
        # 其他类型类似处理
    esac
done

echo "迁移完成"

2.5 Java 实现数据迁移

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Tuple;
import java.util.*;

public class RedisDataMigration {
    private Jedis sourceJedis;
    private Jedis targetJedis;
    private int batchSize = 100;
    
    public RedisDataMigration(Jedis sourceJedis, Jedis targetJedis) {
        this.sourceJedis = sourceJedis;
        this.targetJedis = targetJedis;
    }
    
    /**
     * 全量迁移
     */
    public void fullMigration() {
        long totalKeys = 0;
        long migratedKeys = 0;
        
        // 使用 SCAN 遍历所有 key
        String cursor = ScanParams.SCAN_POINTER_START;
        ScanParams scanParams = new ScanParams();
        scanParams.count(batchSize);
        
        while (true) {
            ScanResult<String> scanResult = sourceJedis.scan(cursor, scanParams);
            cursor = scanResult.getCursor();
            List<String> keys = scanResult.getResult();
            
            totalKeys += keys.size();
            
            // 批量迁移
            for (String key : keys) {
                migrateKey(key);
                migratedKeys++;
                
                if (migratedKeys % 1000 == 0) {
                    System.out.println("已迁移:" + migratedKeys + " 个 key");
                }
            }
            
            // SCAN 完成
            if (ScanParams.SCAN_POINTER_START.equals(cursor)) {
                break;
            }
        }
        
        System.out.println("迁移完成!总计:" + totalKeys + " 个 key");
    }
    
    /**
     * 迁移单个 key
     */
    private void migrateKey(String key) {
        String type = sourceJedis.type(key);
        
        switch (type) {
            case "string":
                String value = sourceJedis.get(key);
                Long ttl = sourceJedis.ttl(key);
                if (ttl == -2) {
                    targetJedis.set(key, value);
                } else if (ttl == -1) {
                    targetJedis.set(key, value);
                } else {
                    targetJedis.setex(key, ttl, value);
                }
                break;
                
            case "list":
                List<String> list = sourceJedis.lrange(key, 0, -1);
                if (!list.isEmpty()) {
                    targetJedis.del(key);
                    for (String item : list) {
                        targetJedis.rpush(key, item);
                    }
                    Long ttl = sourceJedis.ttl(key);
                    if (ttl > 0) {
                        targetJedis.expire(key, ttl);
                    }
                }
                break;
                
            case "set":
                Set<String> set = sourceJedis.smembers(key);
                if (!set.isEmpty()) {
                    targetJedis.del(key);
                    for (String item : set) {
                        targetJedis.sadd(key, item);
                    }
                    Long ttl = sourceJedis.ttl(key);
                    if (ttl > 0) {
                        targetJedis.expire(key, ttl);
                    }
                }
                break;
                
            case "zset":
                Set<Tuple> zset = sourceJedis.zrangeWithScores(key, 0, -1);
                if (!zset.isEmpty()) {
                    targetJedis.del(key);
                    for (Tuple tuple : zset) {
                        targetJedis.zadd(key, tuple.getScore(), tuple.getElement());
                    }
                    Long ttl = sourceJedis.ttl(key);
                    if (ttl > 0) {
                        targetJedis.expire(key, ttl);
                    }
                }
                break;
                
            case "hash":
                Map<String, String> hash = sourceJedis.hgetAll(key);
                if (!hash.isEmpty()) {
                    targetJedis.del(key);
                    targetJedis.hmset(key, hash);
                    Long ttl = sourceJedis.ttl(key);
                    if (ttl > 0) {
                        targetJedis.expire(key, ttl);
                    }
                }
                break;
                
            default:
                System.err.println("未知类型:" + type + ", key: " + key);
        }
    }
    
    /**
     * 带过滤的迁移
     */
    public void filteredMigration(String pattern) {
        String cursor = ScanParams.SCAN_POINTER_START;
        ScanParams scanParams = new ScanParams();
        scanParams.match(pattern);
        scanParams.count(batchSize);
        
        long count = 0;
        
        while (true) {
            ScanResult<String> scanResult = sourceJedis.scan(cursor, scanParams);
            cursor = scanResult.getCursor();
            List<String> keys = scanResult.getResult();
            
            for (String key : keys) {
                migrateKey(key);
                count++;
            }
            
            if (ScanParams.SCAN_POINTER_START.equals(cursor)) {
                break;
            }
        }
        
        System.out.println("过滤迁移完成,pattern: " + pattern + ", 迁移:" + count + " 个 key");
    }
    
    /**
     * 验证迁移结果
     */
    public boolean verifyMigration() {
        long sourceCount = sourceJedis.dbSize();
        long targetCount = targetJedis.dbSize();
        
        System.out.println("源数据库:" + sourceCount + " 个 key");
        System.out.println("目标数据库:" + targetCount + " 个 key");
        
        if (sourceCount != targetCount) {
            System.err.println("⚠️  数量不一致!");
            return false;
        }
        
        System.out.println("✓ 数量一致");
        return true;
    }
}

三、跨云迁移

3.1 阿里云 Redis 迁移

# 使用阿里云 DTS(数据传输服务)

# 1. 登录阿里云控制台
# 2. 进入 DTS 服务
# 3. 创建数据同步任务
# 4. 配置源和目标
# 5. 预检查并启动

3.2 腾讯云 Redis 迁移

# 使用腾讯云 DTS

# 1. 登录腾讯云控制台
# 2. 进入数据传输 DTS
# 3. 创建数据迁移任务
# 4. 选择 Redis 迁移
# 5. 配置源和目标实例
# 6. 启动迁移

3.3 AWS ElastiCache 迁移

# 使用 AWS Data Migration Service

# 1. 创建 DMS 复制实例
# 2. 创建源和目标端点
# 3. 创建迁移任务
# 4. 选择迁移类型(全量/增量)
# 5. 启动任务

四、迁移验证

4.1 数据一致性检查

#!/bin/bash
# verify_migration.sh

SOURCE_HOST="192.168.1.100"
SOURCE_PORT="6379"
TARGET_HOST="192.168.1.101"
TARGET_PORT="6380"

# 检查 key 数量
SOURCE_COUNT=$(redis-cli -h $SOURCE_HOST -p $SOURCE_PORT DBSIZE | awk '{print $2}')
TARGET_COUNT=$(redis-cli -h $TARGET_HOST -p $TARGET_PORT DBSIZE | awk '{print $2}')

echo "源数据库 key 数量:$SOURCE_COUNT"
echo "目标数据库 key 数量:$TARGET_COUNT"

if [ "$SOURCE_COUNT" != "$TARGET_COUNT" ]; then
    echo "❌ 数量不一致"
    exit 1
fi

echo "✓ 数量一致"

# 随机抽样检查
echo "开始抽样检查..."
for i in {1..100}; do
    # 随机获取一个 key
    RANDOM_KEY=$(redis-cli -h $SOURCE_HOST -p $SOURCE_PORT RANDOMKEY)
    
    if [ -n "$RANDOM_KEY" ]; then
        # 获取源数据
        SOURCE_VALUE=$(redis-cli -h $SOURCE_HOST -p $SOURCE_PORT GET "$RANDOM_KEY")
        # 获取目标数据
        TARGET_VALUE=$(redis-cli -h $TARGET_HOST -p $TARGET_PORT GET "$RANDOM_KEY")
        
        if [ "$SOURCE_VALUE" != "$TARGET_VALUE" ]; then
            echo "❌ 数据不一致:$RANDOM_KEY"
            echo "  源:$SOURCE_VALUE"
            echo "  目标:$TARGET_VALUE"
        fi
    fi
done

echo "✓ 抽样检查完成"

4.2 性能验证

# 使用 redis-benchmark 测试性能

# 源 Redis 性能
redis-benchmark -h 192.168.1.100 -p 6379 -q

# 目标 Redis 性能
redis-benchmark -h 192.168.1.101 -p 6380 -q

# 对比结果
# SET: 源 100000 ops/sec, 目标 100000 ops/sec ✓
# GET: 源 150000 ops/sec, 目标 150000 ops/sec ✓

五、总结

5.1 迁移方案选择

数据量可停机时间推荐方案
< 1GB可停机RDB 文件复制
< 10GB短暂停机主从同步
> 10GB零停机主从同步 + Redis Shake
跨云零停机云厂商 DTS 服务

5.2 注意事项

  1. 迁移前

    • 完整备份源数据
    • 评估迁移时间窗口
    • 准备回滚方案
  2. 迁移中

    • 监控同步进度
    • 避免大量写入
    • 保持网络稳定
  3. 迁移后

    • 验证数据一致性
    • 性能基准测试
    • 观察运行状态

参考资料


分享这篇文章到:

上一篇文章
Redis 内存管理与优化
下一篇文章
Kafka 运维自动化与 DevOps 实践