Redis 多活架构设计与实战
异地多活是保障业务连续性的终极方案。如何在多个地域部署 Redis 并保证数据一致性?本文将分享完整的多活架构设计和实战经验。
一、多活架构概述
1.1 什么是多活
多活架构演进:
单活 → 主从 → 双活 → 多活
┌─────────────────────────────────────┐
│ 架构 │ 可用性 │ 延迟 │ 成本 │
├─────────────────────────────────────┤
│ 单活 │ 99.9% │ 低 │ 低 │
│ 主从 │ 99.99% │ 中 │ 中 │
│ 双活 │ 99.99% │ 低 │ 高 │
│ 多活 │99.999% │ 最低 │ 最高 │
└─────────────────────────────────────┘
1.2 多活场景
适用场景:
- 跨地域业务
- 高可用要求(99.999%)
- 灾难恢复
- 就近访问
挑战:
- 数据一致性
- 延迟问题
- 冲突解决
- 成本高
二、多活架构模式
2.1 主从复制模式
异地主从架构:
┌─────────────┐
│ Master │ 北京
│ (读写) │
└──────┬──────┘
│ 复制
│
┌──────┴──────┐
│ Slave │ 上海
│ (只读) │
└─────────────┘
优点:简单、数据一致
缺点:写延迟高、单点故障
2.2 双主互备模式
双主互备架构:
┌─────────────┐ ┌─────────────┐
│ Master 1 │◄───────►│ Master 2 │
│ 北京 │ 互相复制│ 上海 │
│ (读写) │ │ (读写) │
└─────────────┘ └─────────────┘
优点:双写、高可用
缺点:数据冲突、脑裂风险
2.3 多活分片模式
多活分片架构:
┌─────────────┐ ┌─────────────┐
│ Cluster 1 │ │ Cluster 2 │
│ 北京 │ │ 上海 │
│ 0-8191 │ │ 8192-16383 │
└─────────────┘ └─────────────┘
▲ ▲
│ │
└───────────┬───────────┘
│
┌───────┴───────┐
│ 全局路由层 │
└───────────────┘
优点:水平扩展、就近访问
缺点:路由复杂、跨地域同步
2.4 单元化架构
单元化多活架构:
┌─────────────────────────────────────┐
│ 全局流量调度 (GSLB) │
└───────────────┬─────────────────────┘
│
┌───────────┼───────────┐
│ │ │
┌───▼───┐ ┌───▼───┐ ┌───▼───┐
│单元 1 │ │单元 2 │ │单元 3 │
│ 北京 │ │ 上海 │ │ 广州 │
│Redis │ │Redis │ │Redis │
└───────┘ └───────┘ └───────┘
优点:完全隔离、故障隔离
缺点:数据分片复杂、运维成本高
三、数据同步方案
3.1 主从复制同步
# 配置异地主从
# 北京 Master
bind 0.0.0.0
port 6379
requirepass master_password
# 上海 Slave
replicaof beijing-ip 6379
masterauth master_password
requirepass slave_password
# 优化跨地域复制
repl-ping-replica-period 10 # 心跳间隔
repl-timeout 60 # 超时时间
repl-backlog-size 256mb # 复制缓冲
3.2 双向同步
# 使用 RedisShake 实现双向同步
# shake1.conf (北京→上海)
[sync]
reader.address = "beijing-ip:6379"
reader.password = "beijing_pass"
writer.address = "shanghai-ip:6379"
writer.password = "shanghai_pass"
mode = "sync"
# shake2.conf (上海→北京)
[sync]
reader.address = "shanghai-ip:6379"
reader.password = "shanghai_pass"
writer.address = "beijing-ip:6379"
writer.password = "beijing_pass"
mode = "sync"
# 启动双向同步
./redis-shake -conf=shake1.conf &
./redis-shake -conf=shake2.conf &
3.3 Java 实现数据同步
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.util.*;
import java.util.concurrent.*;
public class RedisSyncService {
private JedisPool sourcePool;
private JedisPool targetPool;
private ExecutorService executor = Executors.newFixedThreadPool(4);
private volatile boolean running = true;
public RedisSyncService(JedisPool sourcePool, JedisPool targetPool) {
this.sourcePool = sourcePool;
this.targetPool = targetPool;
}
/**
* 全量同步
*/
public void fullSync() {
try (Jedis source = sourcePool.getResource()) {
String cursor = ScanParams.SCAN_POINTER_START;
ScanParams scanParams = new ScanParams();
scanParams.count(1000);
while (running) {
ScanResult<String> scanResult = source.scan(cursor, scanParams);
cursor = scanResult.getCursor();
List<String> keys = scanResult.getResult();
// 批量同步
syncKeys(source, keys);
if (ScanParams.SCAN_POINTER_START.equals(cursor)) {
break;
}
}
}
System.out.println("全量同步完成");
}
/**
* 增量同步(监听命令)
*/
public void incrementalSync() {
executor.submit(() -> {
try (Jedis source = sourcePool.getResource()) {
// 使用 PSYNC 监听变化
source.psync("", -1);
while (running) {
// 解析复制流
// 实际项目需要使用底层协议
Thread.sleep(100);
}
}
});
}
/**
* 同步 Keys
*/
private void syncKeys(Jedis source, List<String> keys) {
try (Jedis target = targetPool.getResource()) {
for (String key : keys) {
try {
String type = source.type(key);
switch (type) {
case "string":
String value = source.get(key);
Long ttl = source.ttl(key);
if (ttl == -2) {
target.del(key);
} else if (ttl == -1) {
target.set(key, value);
} else {
target.setex(key, ttl, value);
}
break;
case "hash":
Map<String, String> hash = source.hgetAll(key);
target.del(key);
target.hmset(key, hash);
break;
case "list":
List<String> list = source.lrange(key, 0, -1);
target.del(key);
for (String item : list) {
target.rpush(key, item);
}
break;
case "set":
Set<String> set = source.smembers(key);
target.del(key);
for (String item : set) {
target.sadd(key, item);
}
break;
case "zset":
Set<Tuple> zset = source.zrangeWithScores(key, 0, -1);
target.del(key);
for (Tuple tuple : zset) {
target.zadd(key, tuple.getScore(), tuple.getElement());
}
break;
}
} catch (Exception e) {
System.err.println("同步失败:" + key + ", " + e.getMessage());
}
}
}
}
/**
* 停止同步
*/
public void stop() {
running = false;
executor.shutdown();
}
}
四、冲突解决策略
4.1 冲突场景
数据冲突示例:
时间线:
T1: 北京写入 user:1001:name = "张三"
T2: 上海写入 user:1001:name = "李四"
T3: 同步到达,产生冲突
冲突类型:
1. 写 - 写冲突
2. 删 - 写冲突
3. 覆盖冲突
4.2 解决策略
策略 1:时间戳优先
import redis.clients.jedis.Jedis;
import java.util.*;
public class TimestampConflictResolver {
/**
* 带时间戳的写入
*/
public void setWithTimestamp(Jedis jedis, String key, String value) {
long timestamp = System.currentTimeMillis();
String versionedValue = timestamp + ":" + value;
jedis.set(key, versionedValue);
}
/**
* 冲突解决
*/
public String resolveConflict(Jedis jedis, String key, String newValue) {
String existingValue = jedis.get(key);
if (existingValue == null) {
return newValue;
}
// 解析时间戳
String[] parts = existingValue.split(":", 2);
long existingTimestamp = Long.parseLong(parts[0]);
long newTimestamp = System.currentTimeMillis();
// 新时间戳优先
if (newTimestamp > existingTimestamp) {
return newTimestamp + ":" + newValue;
} else {
return existingValue;
}
}
/**
* 读取(去除时间戳)
*/
public String get(Jedis jedis, String key) {
String value = jedis.get(key);
if (value != null && value.contains(":")) {
return value.split(":", 2)[1];
}
return value;
}
}
策略 2:地域优先级
public class RegionPriorityResolver {
private String localRegion;
private Map<String, Integer> regionPriority;
public RegionPriorityResolver(String localRegion) {
this.localRegion = localRegion;
this.regionPriority = new HashMap<>();
regionPriority.put("beijing", 1);
regionPriority.put("shanghai", 2);
regionPriority.put("guangzhou", 3);
}
/**
* 根据地域优先级解决冲突
*/
public String resolve(String key, String localValue,
String remoteValue, String remoteRegion) {
int localPriority = regionPriority.getOrDefault(localRegion, 999);
int remotePriority = regionPriority.getOrDefault(remoteRegion, 999);
// 优先级高的地域获胜
if (localPriority <= remotePriority) {
return localValue;
} else {
return remoteValue;
}
}
}
策略 3:业务逻辑解决
public class BusinessConflictResolver {
/**
* 计数器冲突解决(取最大值)
*/
public long resolveCounter(long local, long remote) {
return Math.max(local, remote);
}
/**
* 集合冲突解决(合并)
*/
public Set<String> resolveSet(Set<String> local, Set<String> remote) {
Set<String> result = new HashSet<>(local);
result.addAll(remote);
return result;
}
/**
* 有序集合冲突解决(合并,分数取最新)
*/
public Map<String, Double> resolveZset(
Map<String, Double> local,
Map<String, Double> remote) {
Map<String, Double> result = new HashMap<>(local);
result.putAll(remote);
return result;
}
}
五、故障切换
5.1 切换策略
故障切换流程:
正常状态:
北京 (主) → 上海 (备)
故障检测:
- 心跳检测失败
- 数据同步延迟
- 业务报错增加
切换流程:
1. 检测故障
2. 停止同步
3. 提升备为主
4. 更新路由
5. 通知业务
5.2 Java 实现故障切换
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.util.*;
import java.util.concurrent.*;
public class FailoverManager {
private String primaryRegion;
private Map<String, JedisPool> regionPools;
private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
private volatile String currentPrimary;
public FailoverManager(String primaryRegion, Map<String, JedisPool> regionPools) {
this.primaryRegion = primaryRegion;
this.regionPools = regionPools;
this.currentPrimary = primaryRegion;
startHealthCheck();
}
/**
* 健康检查
*/
private void startHealthCheck() {
scheduler.scheduleAtFixedRate(() -> {
for (Map.Entry<String, JedisPool> entry : regionPools.entrySet()) {
String region = entry.getKey();
JedisPool pool = entry.getValue();
try (Jedis jedis = pool.getResource()) {
String pong = jedis.ping();
if (!"PONG".equals(pong)) {
handleFailure(region);
}
} catch (Exception e) {
handleFailure(region);
}
}
}, 10, 10, TimeUnit.SECONDS);
}
/**
* 处理故障
*/
private synchronized void handleFailure(String failedRegion) {
if (failedRegion.equals(currentPrimary)) {
System.out.println("主节点故障:" + failedRegion);
// 选择新的主节点
String newPrimary = selectNewPrimary(failedRegion);
if (newPrimary != null) {
failover(newPrimary);
}
}
}
/**
* 选择新的主节点
*/
private String selectNewPrimary(String failedRegion) {
for (String region : regionPools.keySet()) {
if (!region.equals(failedRegion)) {
// 检查该区域是否健康
try (Jedis jedis = regionPools.get(region).getResource()) {
if ("PONG".equals(jedis.ping())) {
return region;
}
} catch (Exception e) {
// 继续检查下一个
}
}
}
return null;
}
/**
* 执行故障切换
*/
private void failover(String newPrimary) {
System.out.println("执行故障切换:" + currentPrimary + " → " + newPrimary);
// 1. 停止旧主节点的写入
stopWrites(currentPrimary);
// 2. 等待数据同步完成
waitForSyncComplete(newPrimary);
// 3. 提升新主节点
promoteToPrimary(newPrimary);
// 4. 更新路由
currentPrimary = newPrimary;
// 5. 通知业务方
notifyBusiness(newPrimary);
System.out.println("故障切换完成,新主节点:" + newPrimary);
}
private void stopWrites(String region) {
// 实现停止写入逻辑
System.out.println("停止 " + region + " 的写入");
}
private void waitForSyncComplete(String region) {
// 等待数据同步完成
System.out.println("等待 " + region + " 数据同步完成");
}
private void promoteToPrimary(String region) {
// 提升为主节点
System.out.println("提升 " + region + " 为主节点");
}
private void notifyBusiness(String newPrimary) {
// 通知业务方更新配置
System.out.println("通知业务方更新主节点为:" + newPrimary);
}
/**
* 获取当前主节点
*/
public String getCurrentPrimary() {
return currentPrimary;
}
/**
* 获取主节点连接池
*/
public JedisPool getPrimaryPool() {
return regionPools.get(currentPrimary);
}
}
六、多活架构实践
6.1 架构设计
生产级多活架构:
┌─────────────────────────────────────────┐
│ 全局负载均衡 (GSLB) │
│ - DNS 智能解析 │
│ - HTTP 调度 │
└───────────────┬─────────────────────────┘
│
┌───────────┼───────────┐
│ │ │
┌───▼───┐ ┌───▼───┐ ┌───▼───┐
│ 北京 │ │ 上海 │ │ 广州 │
│ 单元 │ │ 单元 │ │ 单元 │
├───────┤ ├───────┤ ├───────┤
│ Nginx │ │ Nginx │ │ Nginx │
│ App │ │ App │ │ App │
│ Redis │ │ Redis │ │ Redis │
│ MySQL │ │ MySQL │ │ MySQL │
└───────┘ └───────┘ └───────┘
│ │ │
└───────────┼───────────┘
│
┌───────┴───────┐
│ 数据同步层 │
│ RedisShake │
│ Canal │
└───────────────┘
6.2 配置建议
# 多活环境 Redis 配置
# 网络优化
tcp-keepalive 60
timeout 300
# 复制优化
repl-ping-replica-period 10
repl-timeout 60
repl-backlog-size 512mb
# 持久化优化
appendonly yes
appendfsync everysec
aof-use-rdb-preamble yes
# 内存优化
maxmemory 8gb
maxmemory-policy allkeys-lru
# 慢查询
slowlog-log-slower-than 10000
slowlog-max-len 128
七、总结
7.1 多活架构要点
-
架构选择
- 根据业务需求选择合适模式
- 平衡成本和可用性
-
数据同步
- 选择合适的同步方案
- 监控同步延迟
-
冲突解决
- 设计合理的冲突解决策略
- 业务逻辑优先
-
故障切换
- 自动化故障检测和切换
- 定期演练
7.2 最佳实践
- 就近访问,降低延迟
- 数据分片,减少冲突
- 监控告警,及时响应
- 定期演练,验证方案
参考资料
- Redis 复制机制
- 异地多活架构设计
- 《分布式系统原理与范型》第 8 章