Skip to content
清晨的一缕阳光
返回

Redis 集群扩容与缩容实战

Redis 集群扩容与缩容实战

随着业务增长,Redis 集群需要动态调整规模。如何在不停机的情况下完成扩缩容?本文将分享完整的实战经验。

一、扩容场景

1.1 何时需要扩容

扩容信号:
┌─────────────────────────────────────┐
│ 内存使用率 > 80%                    │
│ QPS 持续超过阈值                     │
│ 网络带宽接近上限                    │
│ 延迟明显增加                        │
│ 频繁触发内存淘汰                    │
└─────────────────────────────────────┘

决策指标:
- 内存使用率:持续 > 80% 考虑扩容
- CPU 使用率:持续 > 70% 考虑扩容
- 网络带宽:持续 > 70% 考虑扩容
- 响应延迟:P99 > 10ms 考虑扩容

1.2 扩容方案

方案停机时间复杂度风险
离线扩容需要
在线扩容
渐进扩容

二、在线扩容实战

2.1 扩容流程

Cluster 扩容流程(3 主→5 主):

原始状态:
┌─────────┐  ┌─────────┐  ┌─────────┐
│ Master1 │  │ Master2 │  │ Master3 │
│ 0-5460  │  │5461-10922│ │10923-16383│
└─────────┘  └─────────┘  └─────────┘

步骤 1:添加新节点
┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐
│ Master1 │  │ Master2 │  │ Master3 │  │ Master4 │  │ Master5 │
│         │  │         │  │         │  │ (空)    │  │ (空)    │
└─────────┘  └─────────┘  └─────────┘  └─────────┘  └─────────┘

步骤 2:重新分片
┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────┐
│Master1  │  │Master2  │  │Master3  │  │Master4  │  │Master5  │
│0-3276   │  │3277-6553│  │6554-9830│  │9831-13107││13108-16383│
└─────────┘  └─────────┘  └─────────┘  └─────────┘  └─────────┘

步骤 3:数据迁移完成

2.2 准备新节点

# 1. 准备服务器(2 台新节点)
# 192.168.1.13:7003 (Master4)
# 192.168.1.14:7004 (Master5)

# 2. 安装 Redis
tar -xzf redis-7.0.0.tar.gz
cd redis-7.0.0
make && make install

# 3. 创建目录
mkdir -p /var/lib/redis/7003
mkdir -p /var/log/redis
mkdir -p /var/run/redis

# 4. 配置文件 (redis-7003.conf)
cat > /etc/redis/redis-7003.conf << EOF
bind 0.0.0.0
port 7003
daemonize yes
pidfile /var/run/redis/redis-7003.pid
logfile /var/log/redis/redis-7003.log
dir /var/lib/redis/7003

cluster-enabled yes
cluster-config-file nodes-7003.conf
cluster-node-timeout 5000

requirepass cluster_password
masterauth cluster_password

maxmemory 4gb
maxmemory-policy allkeys-lru
EOF

# 5. 启动新节点
redis-server /etc/redis/redis-7003.conf
redis-server /etc/redis/redis-7004.conf

# 6. 验证启动
redis-cli -p 7003 -a cluster_password ping
redis-cli -p 7004 -a cluster_password ping

2.3 添加节点到集群

# 1. 将新节点加入集群(作为空节点)
redis-cli --cluster add-node \
    192.168.1.13:7003 \
    192.168.1.10:7000 \
    -a cluster_password

redis-cli --cluster add-node \
    192.168.1.14:7004 \
    192.168.1.10:7000 \
    -a cluster_password

# 2. 查看集群节点
redis-cli -p 7000 -a cluster_password cluster nodes

# 输出示例:
# a1b2c3d4... 192.168.1.10:7000@17000 myself,master - 0 1725523200000 1 connected 0-5460
# e5f6g7h8... 192.168.1.11:7001@17001 master - 0 1725523200000 2 connected 5461-10922
# i9j0k1l2... 192.168.1.12:7002@17002 master - 0 1725523200000 3 connected 10923-16383
# m3n4o5p6... 192.168.1.13:7003@17003 master - 0 1725523200000 0 connected
# q7r8s9t0... 192.168.1.14:7004@17004 master - 0 1725523200000 0 connected

2.4 重新分片

# 使用 redis-cli 重新分片
redis-cli --cluster reshard \
    192.168.1.10:7000 \
    -a cluster_password

# 交互式操作:
# How many slots do you want to move? 3277
# What is the receiving node ID? (新节点 ID)
# Source node ? (原节点 ID,或 all)

# 示例:从每个现有节点迁移 3277 个槽位到新节点

自动化脚本

#!/bin/bash
# cluster_reshard.sh

CLUSTER_HOST="192.168.1.10"
CLUSTER_PORT="7000"
PASSWORD="cluster_password"

# 新节点
NEW_NODE_1="192.168.1.13:7003"
NEW_NODE_2="192.168.1.14:7004"

# 获取新节点 ID
NEW_NODE_ID_1=$(redis-cli -h $CLUSTER_HOST -p $CLUSTER_PORT -a $PASSWORD cluster nodes | grep $NEW_NODE_1 | awk '{print $1}')
NEW_NODE_ID_2=$(redis-cli -h $CLUSTER_HOST -p $CLUSTER_PORT -a $PASSWORD cluster nodes | grep $NEW_NODE_2 | awk '{print $1}')

echo "新节点 1 ID: $NEW_NODE_ID_1"
echo "新节点 2 ID: $NEW_NODE_ID_2"

# 计算需要迁移的槽位数
# 16384 / 5 = 3276.8,每个节点约 3277 个槽位
SLOTS_PER_NODE=3277

# 从节点 1 迁移槽位到新节点 1
redis-cli --cluster reshard $CLUSTER_HOST:$CLUSTER_PORT \
    --cluster-from a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6 \
    --cluster-to $NEW_NODE_ID_1 \
    --cluster-slots $SLOTS_PER_NODE \
    --cluster-yes \
    -a $PASSWORD

# 从节点 2 迁移槽位到新节点 1
redis-cli --cluster reshard $CLUSTER_HOST:$CLUSTER_PORT \
    --cluster-from e5f6g7h8i9j0k1l2m3n4o5p6q7r8s9t0 \
    --cluster-to $NEW_NODE_ID_1 \
    --cluster-slots $SLOTS_PER_NODE \
    --cluster-yes \
    -a $PASSWORD

# 从节点 3 迁移槽位到新节点 2
redis-cli --cluster reshard $CLUSTER_HOST:$CLUSTER_PORT \
    --cluster-from i9j0k1l2m3n4o5p6q7r8s9t0u1v2w3x4 \
    --cluster-to $NEW_NODE_ID_2 \
    --cluster-slots $SLOTS_PER_NODE \
    --cluster-yes \
    -a $PASSWORD

echo "分片完成!"

# 验证集群状态
redis-cli -h $CLUSTER_HOST -p $CLUSTER_PORT -a $PASSWORD cluster info

2.5 使用 redis-trib 工具

# redis-trib.rb 是 Ruby 实现的集群管理工具

# 1. 安装依赖
gem install redis

# 2. 重新分片
redis-trib.rb reshard \
    192.168.1.10:7000 \
    --from <source-node-id> \
    --to <new-node-id> \
    --slots 3277 \
    --yes

# 3. 批量操作
redis-trib.rb rebalance \
    192.168.1.10:7000 \
    --use-empty-masters \
    --threshold 2 \
    --pipeline 10

2.6 Java 实现数据迁移监控

import redis.clients.jedis.Jedis;
import redis.clients.jedis.ClusterCommandArguments;

import java.util.*;

public class ClusterMigrationMonitor {
    private Jedis jedis;
    
    public ClusterMigrationMonitor(Jedis jedis) {
        this.jedis = jedis;
    }
    
    /**
     * 获取迁移进度
     */
    public MigrationProgress getMigrationProgress() {
        String clusterInfo = jedis.clusterInfo();
        
        Map<String, String> info = parseClusterInfo(clusterInfo);
        
        int clusterSize = Integer.parseInt(info.get("cluster_size"));
        int knownNodes = Integer.parseInt(info.get("cluster_known_nodes"));
        
        return new MigrationProgress(
            clusterSize,
            knownNodes,
            "cluster_state".equals(info.get("cluster_state"))
        );
    }
    
    /**
     * 获取节点槽位分布
     */
    public Map<String, SlotRange> getSlotDistribution() {
        Map<String, SlotRange> distribution = new HashMap<>();
        
        String clusterNodes = jedis.clusterNodes();
        String[] lines = clusterNodes.split("\n");
        
        for (String line : lines) {
            String[] parts = line.split(" ");
            String nodeId = parts[0];
            String address = parts[1];
            String flags = parts[2];
            String slots = parts.length > 8 ? parts[8] : "";
            
            if (flags.contains("master") && !slots.isEmpty()) {
                distribution.put(nodeId, new SlotRange(address, slots));
            }
        }
        
        return distribution;
    }
    
    /**
     * 检查迁移状态
     */
    public boolean checkMigrationComplete() {
        String clusterInfo = jedis.clusterInfo();
        return clusterInfo.contains("cluster_state:ok");
    }
    
    /**
     * 等待迁移完成
     */
    public void waitForMigrationComplete(int timeoutSeconds) throws InterruptedException {
        long startTime = System.currentTimeMillis();
        
        while (System.currentTimeMillis() - startTime < timeoutSeconds * 1000) {
            if (checkMigrationComplete()) {
                System.out.println("迁移完成!");
                return;
            }
            
            System.out.println("等待迁移完成...");
            Thread.sleep(5000);
        }
        
        throw new RuntimeException("迁移超时");
    }
    
    static class MigrationProgress {
        int clusterSize;
        int knownNodes;
        boolean isOk;
        
        MigrationProgress(int clusterSize, int knownNodes, boolean isOk) {
            this.clusterSize = clusterSize;
            this.knownNodes = knownNodes;
            this.isOk = isOk;
        }
        
        @Override
        public String toString() {
            return String.format("集群大小:%d, 已知节点:%d, 状态:%s", 
                               clusterSize, knownNodes, isOk ? "正常" : "异常");
        }
    }
    
    static class SlotRange {
        String address;
        String slots;
        
        SlotRange(String address, String slots) {
            this.address = address;
            this.slots = slots;
        }
    }
    
    private Map<String, String> parseClusterInfo(String info) {
        Map<String, String> result = new HashMap<>();
        String[] lines = info.split("\r\n");
        
        for (String line : lines) {
            String[] parts = line.split(":");
            if (parts.length == 2) {
                result.put(parts[0].trim(), parts[1].trim());
            }
        }
        
        return result;
    }
}

三、在线缩容实战

3.1 缩容场景

缩容信号:
┌─────────────────────────────────────┐
│ 内存使用率 < 30%(持续一周)         │
│ QPS 低于阈值                        │
│ 成本优化需求                        │
│ 业务合并                            │
└─────────────────────────────────────┘

缩容原则:
- 选择数据量最少的节点
- 在低峰期执行
- 逐步缩容,避免大规模迁移

3.2 缩容流程

# 1. 查看各节点数据量
redis-cli -p 7000 -a cluster_password cluster nodes | \
    grep master | \
    awk '{print $1, $2, $8}'

# 2. 迁移槽位(从要删除的节点迁移到其他节点)
redis-cli --cluster reshard \
    192.168.1.10:7000 \
    --from <node-to-remove-id> \
    --to <target-node-id> \
    --slots <slot-count> \
    --cluster-yes \
    -a cluster_password

# 3. 确认节点为空
redis-cli -p 7003 -a cluster_password cluster slots

# 4. 删除节点
redis-cli --cluster del-node \
    192.168.1.10:7000 \
    <node-to-remove-id> \
    -a cluster_password

# 5. 停止节点
redis-cli -p 7003 -a cluster_password shutdown

3.3 缩容脚本

#!/bin/bash
# cluster_shrink.sh

CLUSTER_HOST="192.168.1.10"
CLUSTER_PORT="7000"
PASSWORD="cluster_password"

# 要删除的节点
NODE_TO_REMOVE="192.168.1.13:7003"

echo "=== Redis 集群缩容 ==="
echo "要删除的节点:$NODE_TO_REMOVE"

# 1. 获取节点 ID
NODE_ID=$(redis-cli -h $CLUSTER_HOST -p $CLUSTER_PORT -a $PASSWORD cluster nodes | \
    grep $NODE_TO_REMOVE | awk '{print $1}')

echo "节点 ID: $NODE_ID"

# 2. 查看节点槽位
echo "当前槽位分布:"
redis-cli -h $CLUSTER_HOST -p $CLUSTER_PORT -a $PASSWORD cluster slots

# 3. 选择目标节点(选择数据量最少的节点)
TARGET_NODE=$(redis-cli -h $CLUSTER_HOST -p $CLUSTER_PORT -a $PASSWORD cluster nodes | \
    grep master | grep -v $NODE_ID | head -1 | awk '{print $1}')

echo "目标节点 ID: $TARGET_NODE"

# 4. 迁移槽位
echo "开始迁移槽位..."
redis-cli --cluster reshard $CLUSTER_HOST:$CLUSTER_PORT \
    --from $NODE_ID \
    --to $TARGET_NODE \
    --slots 16384 \
    --cluster-yes \
    -a $PASSWORD

# 5. 验证迁移完成
echo "验证迁移..."
SLOTS_LEFT=$(redis-cli -h $CLUSTER_HOST -p $CLUSTER_PORT -a $PASSWORD cluster nodes | \
    grep $NODE_ID | awk '{print $9}' | wc -c)

if [ "$SLOTS_LEFT" -eq 0 ]; then
    echo "✓ 槽位迁移完成"
else
    echo "✗ 槽位迁移未完成"
    exit 1
fi

# 6. 删除节点
echo "删除节点..."
redis-cli --cluster del-node $CLUSTER_HOST:$CLUSTER_PORT $NODE_ID -a $PASSWORD

# 7. 关闭节点
echo "关闭节点..."
redis-cli -h $(echo $NODE_TO_REMOVE | cut -d: -f1) \
    -p $(echo $NODE_TO_REMOVE | cut -d: -f2) \
    -a $PASSWORD shutdown

echo "✓ 缩容完成!"

# 8. 验证集群状态
redis-cli -h $CLUSTER_HOST -p $CLUSTER_PORT -a $PASSWORD cluster info

四、流量切换

4.1 平滑切换

扩容后的流量切换:

阶段 1(10% 流量):
应用配置 → 新集群(10%)
        → 旧集群(90%)

阶段 2(50% 流量):
应用配置 → 新集群(50%)
        → 旧集群(50%)

阶段 3(100% 流量):
应用配置 → 新集群(100%)

4.2 双写方案

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import java.util.*;

public class DualWriteRedis {
    private JedisCluster oldCluster;
    private JedisCluster newCluster;
    private Random random = new Random();
    
    public DualWriteRedis(JedisCluster oldCluster, JedisCluster newCluster) {
        this.oldCluster = oldCluster;
        this.newCluster = newCluster;
    }
    
    /**
     * 双写(以旧集群为主)
     */
    public void set(String key, String value) {
        // 写旧集群
        oldCluster.set(key, value);
        
        // 写新集群(异步)
        new Thread(() -> {
            try {
                newCluster.set(key, value);
            } catch (Exception e) {
                // 记录日志
            }
        }).start();
    }
    
    /**
     * 读操作(根据流量比例路由)
     */
    public String get(String key, double newClusterRatio) {
        if (random.nextDouble() < newClusterRatio) {
            // 读新集群
            String value = newCluster.get(key);
            if (value != null) {
                return value;
            }
            // 新集群没有,回退到旧集群
            return oldCluster.get(key);
        } else {
            // 读旧集群
            return oldCluster.get(key);
        }
    }
    
    /**
     * 数据一致性校验
     */
    public boolean verifyConsistency(String key) {
        String oldValue = oldCluster.get(key);
        String newValue = newCluster.get(key);
        
        if (oldValue == null && newValue == null) {
            return true;
        }
        
        if (oldValue != null && oldValue.equals(newValue)) {
            return true;
        }
        
        System.err.println("数据不一致:" + key);
        System.err.println("  旧集群:" + oldValue);
        System.err.println("  新集群:" + newValue);
        return false;
    }
}

五、监控与告警

5.1 关键指标

#!/bin/bash
# monitor_migration.sh

CLUSTER_HOST="192.168.1.10"
CLUSTER_PORT="7000"
PASSWORD="cluster_password"

# 获取集群信息
CLUSTER_INFO=$(redis-cli -h $CLUSTER_HOST -p $CLUSTER_PORT -a $PASSWORD cluster info)

# 提取指标
CLUSTER_STATE=$(echo "$CLUSTER_INFO" | grep "cluster_state" | cut -d: -f2 | tr -d '\r')
CLUSTER_SLOTS_OK=$(echo "$CLUSTER_INFO" | grep "cluster_slots_ok" | cut -d: -f2 | tr -d '\r')
CLUSTER_KNOWN_NODES=$(echo "$CLUSTER_INFO" | grep "cluster_known_nodes" | cut -d: -f2 | tr -d '\r')

echo "集群状态:$CLUSTER_STATE"
echo "正常槽位:$CLUSTER_SLOTS_OK / 16384"
echo "已知节点:$CLUSTER_KNOWN_NODES"

# 告警检查
if [ "$CLUSTER_STATE" != "ok" ]; then
    echo "⚠️  告警:集群状态异常"
fi

if [ "$CLUSTER_SLOTS_OK" != "16384" ]; then
    echo "⚠️  告警:槽位不完整"
fi

六、总结

6.1 扩缩容要点

  1. 扩容

    • 提前规划槽位分配
    • 低峰期执行
    • 监控迁移进度
  2. 缩容

    • 选择数据量少的节点
    • 逐步迁移槽位
    • 验证后删除节点
  3. 流量切换

    • 双写保证数据一致
    • 渐进式切换流量
    • 准备回滚方案

6.2 注意事项


参考资料


分享这篇文章到:

上一篇文章
RocketMQ 消息回溯与重置实战
下一篇文章
RocketMQ 延迟消息与定时消息详解