Skip to content
清晨的一缕阳光
返回

Redis 多活架构设计与实战

Redis 多活架构设计与实战

异地多活是保障业务连续性的终极方案。如何在多个地域部署 Redis 并保证数据一致性?本文将分享完整的多活架构设计和实战经验。

一、多活架构概述

1.1 什么是多活

多活架构演进:

单活 → 主从 → 双活 → 多活

┌─────────────────────────────────────┐
│ 架构    │ 可用性 │ 延迟   │ 成本   │
├─────────────────────────────────────┤
│ 单活    │  99.9% │  低    │  低    │
│ 主从    │ 99.99% │  中    │  中    │
│ 双活    │ 99.99% │  低    │  高    │
│ 多活    │99.999% │  最低  │  最高  │
└─────────────────────────────────────┘

1.2 多活场景

适用场景

挑战

二、多活架构模式

2.1 主从复制模式

异地主从架构:

┌─────────────┐
│  Master     │  北京
│  (读写)     │
└──────┬──────┘
       │ 复制

┌──────┴──────┐
│  Slave      │  上海
│  (只读)     │
└─────────────┘

优点:简单、数据一致
缺点:写延迟高、单点故障

2.2 双主互备模式

双主互备架构:

┌─────────────┐         ┌─────────────┐
│  Master 1   │◄───────►│  Master 2   │
│   北京      │  互相复制│   上海      │
│  (读写)     │         │  (读写)     │
└─────────────┘         └─────────────┘

优点:双写、高可用
缺点:数据冲突、脑裂风险

2.3 多活分片模式

多活分片架构:

┌─────────────┐         ┌─────────────┐
│  Cluster 1  │         │  Cluster 2  │
│   北京      │         │   上海      │
│  0-8191     │         │  8192-16383 │
└─────────────┘         └─────────────┘
       ▲                       ▲
       │                       │
       └───────────┬───────────┘

           ┌───────┴───────┐
           │  全局路由层   │
           └───────────────┘

优点:水平扩展、就近访问
缺点:路由复杂、跨地域同步

2.4 单元化架构

单元化多活架构:

┌─────────────────────────────────────┐
│         全局流量调度 (GSLB)          │
└───────────────┬─────────────────────┘

    ┌───────────┼───────────┐
    │           │           │
┌───▼───┐  ┌───▼───┐  ┌───▼───┐
│单元 1 │  │单元 2 │  │单元 3 │
│ 北京  │  │ 上海  │  │ 广州  │
│Redis  │  │Redis  │  │Redis  │
└───────┘  └───────┘  └───────┘

优点:完全隔离、故障隔离
缺点:数据分片复杂、运维成本高

三、数据同步方案

3.1 主从复制同步

# 配置异地主从
# 北京 Master
bind 0.0.0.0
port 6379
requirepass master_password

# 上海 Slave
replicaof beijing-ip 6379
masterauth master_password
requirepass slave_password

# 优化跨地域复制
repl-ping-replica-period 10        # 心跳间隔
repl-timeout 60                     # 超时时间
repl-backlog-size 256mb            # 复制缓冲

3.2 双向同步

# 使用 RedisShake 实现双向同步

# shake1.conf (北京→上海)
[sync]
reader.address = "beijing-ip:6379"
reader.password = "beijing_pass"
writer.address = "shanghai-ip:6379"
writer.password = "shanghai_pass"
mode = "sync"

# shake2.conf (上海→北京)
[sync]
reader.address = "shanghai-ip:6379"
reader.password = "shanghai_pass"
writer.address = "beijing-ip:6379"
writer.password = "beijing_pass"
mode = "sync"

# 启动双向同步
./redis-shake -conf=shake1.conf &
./redis-shake -conf=shake2.conf &

3.3 Java 实现数据同步

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.util.*;
import java.util.concurrent.*;

public class RedisSyncService {
    private JedisPool sourcePool;
    private JedisPool targetPool;
    private ExecutorService executor = Executors.newFixedThreadPool(4);
    private volatile boolean running = true;
    
    public RedisSyncService(JedisPool sourcePool, JedisPool targetPool) {
        this.sourcePool = sourcePool;
        this.targetPool = targetPool;
    }
    
    /**
     * 全量同步
     */
    public void fullSync() {
        try (Jedis source = sourcePool.getResource()) {
            String cursor = ScanParams.SCAN_POINTER_START;
            ScanParams scanParams = new ScanParams();
            scanParams.count(1000);
            
            while (running) {
                ScanResult<String> scanResult = source.scan(cursor, scanParams);
                cursor = scanResult.getCursor();
                List<String> keys = scanResult.getResult();
                
                // 批量同步
                syncKeys(source, keys);
                
                if (ScanParams.SCAN_POINTER_START.equals(cursor)) {
                    break;
                }
            }
        }
        
        System.out.println("全量同步完成");
    }
    
    /**
     * 增量同步(监听命令)
     */
    public void incrementalSync() {
        executor.submit(() -> {
            try (Jedis source = sourcePool.getResource()) {
                // 使用 PSYNC 监听变化
                source.psync("", -1);
                
                while (running) {
                    // 解析复制流
                    // 实际项目需要使用底层协议
                    Thread.sleep(100);
                }
            }
        });
    }
    
    /**
     * 同步 Keys
     */
    private void syncKeys(Jedis source, List<String> keys) {
        try (Jedis target = targetPool.getResource()) {
            for (String key : keys) {
                try {
                    String type = source.type(key);
                    
                    switch (type) {
                        case "string":
                            String value = source.get(key);
                            Long ttl = source.ttl(key);
                            if (ttl == -2) {
                                target.del(key);
                            } else if (ttl == -1) {
                                target.set(key, value);
                            } else {
                                target.setex(key, ttl, value);
                            }
                            break;
                            
                        case "hash":
                            Map<String, String> hash = source.hgetAll(key);
                            target.del(key);
                            target.hmset(key, hash);
                            break;
                            
                        case "list":
                            List<String> list = source.lrange(key, 0, -1);
                            target.del(key);
                            for (String item : list) {
                                target.rpush(key, item);
                            }
                            break;
                            
                        case "set":
                            Set<String> set = source.smembers(key);
                            target.del(key);
                            for (String item : set) {
                                target.sadd(key, item);
                            }
                            break;
                            
                        case "zset":
                            Set<Tuple> zset = source.zrangeWithScores(key, 0, -1);
                            target.del(key);
                            for (Tuple tuple : zset) {
                                target.zadd(key, tuple.getScore(), tuple.getElement());
                            }
                            break;
                    }
                } catch (Exception e) {
                    System.err.println("同步失败:" + key + ", " + e.getMessage());
                }
            }
        }
    }
    
    /**
     * 停止同步
     */
    public void stop() {
        running = false;
        executor.shutdown();
    }
}

四、冲突解决策略

4.1 冲突场景

数据冲突示例:

时间线:
T1: 北京写入 user:1001:name = "张三"
T2: 上海写入 user:1001:name = "李四"
T3: 同步到达,产生冲突

冲突类型:
1. 写 - 写冲突
2. 删 - 写冲突
3. 覆盖冲突

4.2 解决策略

策略 1:时间戳优先

import redis.clients.jedis.Jedis;
import java.util.*;

public class TimestampConflictResolver {
    
    /**
     * 带时间戳的写入
     */
    public void setWithTimestamp(Jedis jedis, String key, String value) {
        long timestamp = System.currentTimeMillis();
        String versionedValue = timestamp + ":" + value;
        jedis.set(key, versionedValue);
    }
    
    /**
     * 冲突解决
     */
    public String resolveConflict(Jedis jedis, String key, String newValue) {
        String existingValue = jedis.get(key);
        
        if (existingValue == null) {
            return newValue;
        }
        
        // 解析时间戳
        String[] parts = existingValue.split(":", 2);
        long existingTimestamp = Long.parseLong(parts[0]);
        long newTimestamp = System.currentTimeMillis();
        
        // 新时间戳优先
        if (newTimestamp > existingTimestamp) {
            return newTimestamp + ":" + newValue;
        } else {
            return existingValue;
        }
    }
    
    /**
     * 读取(去除时间戳)
     */
    public String get(Jedis jedis, String key) {
        String value = jedis.get(key);
        if (value != null && value.contains(":")) {
            return value.split(":", 2)[1];
        }
        return value;
    }
}

策略 2:地域优先级

public class RegionPriorityResolver {
    private String localRegion;
    private Map<String, Integer> regionPriority;
    
    public RegionPriorityResolver(String localRegion) {
        this.localRegion = localRegion;
        this.regionPriority = new HashMap<>();
        regionPriority.put("beijing", 1);
        regionPriority.put("shanghai", 2);
        regionPriority.put("guangzhou", 3);
    }
    
    /**
     * 根据地域优先级解决冲突
     */
    public String resolve(String key, String localValue, 
                         String remoteValue, String remoteRegion) {
        int localPriority = regionPriority.getOrDefault(localRegion, 999);
        int remotePriority = regionPriority.getOrDefault(remoteRegion, 999);
        
        // 优先级高的地域获胜
        if (localPriority <= remotePriority) {
            return localValue;
        } else {
            return remoteValue;
        }
    }
}

策略 3:业务逻辑解决

public class BusinessConflictResolver {
    
    /**
     * 计数器冲突解决(取最大值)
     */
    public long resolveCounter(long local, long remote) {
        return Math.max(local, remote);
    }
    
    /**
     * 集合冲突解决(合并)
     */
    public Set<String> resolveSet(Set<String> local, Set<String> remote) {
        Set<String> result = new HashSet<>(local);
        result.addAll(remote);
        return result;
    }
    
    /**
     * 有序集合冲突解决(合并,分数取最新)
     */
    public Map<String, Double> resolveZset(
            Map<String, Double> local, 
            Map<String, Double> remote) {
        Map<String, Double> result = new HashMap<>(local);
        result.putAll(remote);
        return result;
    }
}

五、故障切换

5.1 切换策略

故障切换流程:

正常状态:
北京 (主) → 上海 (备)

故障检测:
- 心跳检测失败
- 数据同步延迟
- 业务报错增加

切换流程:
1. 检测故障
2. 停止同步
3. 提升备为主
4. 更新路由
5. 通知业务

5.2 Java 实现故障切换

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.util.*;
import java.util.concurrent.*;

public class FailoverManager {
    private String primaryRegion;
    private Map<String, JedisPool> regionPools;
    private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
    private volatile String currentPrimary;
    
    public FailoverManager(String primaryRegion, Map<String, JedisPool> regionPools) {
        this.primaryRegion = primaryRegion;
        this.regionPools = regionPools;
        this.currentPrimary = primaryRegion;
        startHealthCheck();
    }
    
    /**
     * 健康检查
     */
    private void startHealthCheck() {
        scheduler.scheduleAtFixedRate(() -> {
            for (Map.Entry<String, JedisPool> entry : regionPools.entrySet()) {
                String region = entry.getKey();
                JedisPool pool = entry.getValue();
                
                try (Jedis jedis = pool.getResource()) {
                    String pong = jedis.ping();
                    if (!"PONG".equals(pong)) {
                        handleFailure(region);
                    }
                } catch (Exception e) {
                    handleFailure(region);
                }
            }
        }, 10, 10, TimeUnit.SECONDS);
    }
    
    /**
     * 处理故障
     */
    private synchronized void handleFailure(String failedRegion) {
        if (failedRegion.equals(currentPrimary)) {
            System.out.println("主节点故障:" + failedRegion);
            
            // 选择新的主节点
            String newPrimary = selectNewPrimary(failedRegion);
            if (newPrimary != null) {
                failover(newPrimary);
            }
        }
    }
    
    /**
     * 选择新的主节点
     */
    private String selectNewPrimary(String failedRegion) {
        for (String region : regionPools.keySet()) {
            if (!region.equals(failedRegion)) {
                // 检查该区域是否健康
                try (Jedis jedis = regionPools.get(region).getResource()) {
                    if ("PONG".equals(jedis.ping())) {
                        return region;
                    }
                } catch (Exception e) {
                    // 继续检查下一个
                }
            }
        }
        return null;
    }
    
    /**
     * 执行故障切换
     */
    private void failover(String newPrimary) {
        System.out.println("执行故障切换:" + currentPrimary + "" + newPrimary);
        
        // 1. 停止旧主节点的写入
        stopWrites(currentPrimary);
        
        // 2. 等待数据同步完成
        waitForSyncComplete(newPrimary);
        
        // 3. 提升新主节点
        promoteToPrimary(newPrimary);
        
        // 4. 更新路由
        currentPrimary = newPrimary;
        
        // 5. 通知业务方
        notifyBusiness(newPrimary);
        
        System.out.println("故障切换完成,新主节点:" + newPrimary);
    }
    
    private void stopWrites(String region) {
        // 实现停止写入逻辑
        System.out.println("停止 " + region + " 的写入");
    }
    
    private void waitForSyncComplete(String region) {
        // 等待数据同步完成
        System.out.println("等待 " + region + " 数据同步完成");
    }
    
    private void promoteToPrimary(String region) {
        // 提升为主节点
        System.out.println("提升 " + region + " 为主节点");
    }
    
    private void notifyBusiness(String newPrimary) {
        // 通知业务方更新配置
        System.out.println("通知业务方更新主节点为:" + newPrimary);
    }
    
    /**
     * 获取当前主节点
     */
    public String getCurrentPrimary() {
        return currentPrimary;
    }
    
    /**
     * 获取主节点连接池
     */
    public JedisPool getPrimaryPool() {
        return regionPools.get(currentPrimary);
    }
}

六、多活架构实践

6.1 架构设计

生产级多活架构:

┌─────────────────────────────────────────┐
│          全局负载均衡 (GSLB)             │
│    - DNS 智能解析                         │
│    - HTTP 调度                          │
└───────────────┬─────────────────────────┘

    ┌───────────┼───────────┐
    │           │           │
┌───▼───┐  ┌───▼───┐  ┌───▼───┐
│ 北京  │  │ 上海  │  │ 广州  │
│ 单元  │  │ 单元  │  │ 单元  │
├───────┤  ├───────┤  ├───────┤
│ Nginx │  │ Nginx │  │ Nginx │
│ App   │  │ App   │  │ App   │
│ Redis │  │ Redis │  │ Redis │
│ MySQL │  │ MySQL │  │ MySQL │
└───────┘  └───────┘  └───────┘
    │           │           │
    └───────────┼───────────┘

        ┌───────┴───────┐
        │  数据同步层   │
        │  RedisShake  │
        │  Canal       │
        └───────────────┘

6.2 配置建议

# 多活环境 Redis 配置

# 网络优化
tcp-keepalive 60
timeout 300

# 复制优化
repl-ping-replica-period 10
repl-timeout 60
repl-backlog-size 512mb

# 持久化优化
appendonly yes
appendfsync everysec
aof-use-rdb-preamble yes

# 内存优化
maxmemory 8gb
maxmemory-policy allkeys-lru

# 慢查询
slowlog-log-slower-than 10000
slowlog-max-len 128

七、总结

7.1 多活架构要点

  1. 架构选择

    • 根据业务需求选择合适模式
    • 平衡成本和可用性
  2. 数据同步

    • 选择合适的同步方案
    • 监控同步延迟
  3. 冲突解决

    • 设计合理的冲突解决策略
    • 业务逻辑优先
  4. 故障切换

    • 自动化故障检测和切换
    • 定期演练

7.2 最佳实践


参考资料


分享这篇文章到:

上一篇文章
Kafka KRaft 深度解析与实战
下一篇文章
Kafka 故障演练与应急预案实战