Redis 缓存一致性方案
缓存一致性是分布式系统的经典难题。如何保证缓存与数据库数据一致,是高性能系统必须面对的挑战。本文将深入缓存一致性的各种实现方案。
一、一致性问题分析
1.1 典型场景
用户请求
↓
┌─────────────────┐
│ Application │
└────┬────────────┘
│
├───▶ ┌─────────────┐
│ │ Cache │ ← 缓存层
│ │ (Redis) │
│ └─────────────┘
│
└───▶ ┌─────────────┐
│ Database │ ← 数据库层
│ (MySQL) │
└─────────────┘
问题:如何保证 Cache 和 Database 数据一致?
1.2 不一致场景
场景 1:先更新数据库,再删除缓存
时间 操作 缓存 数据库
T1 读缓存 旧值 (A) 旧值 (A)
T2 缓存未命中
T3 读数据库 旧值 (A) 旧值 (A)
T4 写入缓存 旧值 (A) 旧值 (A)
T5 更新数据库 旧值 (A) 新值 (B)
T6 删除缓存 (删除) 新值 (B)
T7 读缓存 (未命中) 新值 (B)
T8 读数据库 (未命中) 新值 (B)
T9 写入缓存 新值 (B) 新值 (B)
✓ 一致
场景 2:先删除缓存,再更新数据库
时间 操作 缓存 数据库
T1 删除缓存 (删除) 旧值 (A)
T2 读缓存 (未命中) 旧值 (A)
T3 读数据库 (未命中) 旧值 (A)
T4 写入缓存 旧值 (A) 旧值 (A) ← 问题
T5 更新数据库 旧值 (A) 新值 (B)
✗ 不一致(缓存是旧值)
1.3 一致性级别
| 级别 | 说明 | 适用场景 |
|---|---|---|
| 强一致 | 实时一致 | 金融交易 |
| 弱一致 | 允许短暂不一致 | 一般业务 |
| 最终一致 | 最终会一致 | 高并发场景 |
二、双写模式
2.1 先更新数据库,再删除缓存
def update_user(user_id, data):
"""更新用户信息"""
# 1. 更新数据库
db.execute(
"UPDATE users SET name = ? WHERE id = ?",
data['name'], user_id
)
# 2. 删除缓存
redis.delete(f"user:{user_id}")
# 优点:简单
# 缺点:极端情况下可能不一致
时序图:
客户端 应用 数据库 缓存
│ │ │ │
│── 更新请求 ────▶│ │ │
│ │ │ │
│ │── 更新 DB ─────▶│ │
│ │ │ │
│ │── 删除缓存 ─────────────────────▶│
│ │ │ │
│ │◀─ 成功 ─────────│ │
│ │ │ │
│◀─ 成功 ─────────│ │ │
│ │ │ │
│── 读请求 ──────▶│ │ │
│ │── 读缓存 ───────────────────────▶│
│ │◀─ 未命中 ────────────────────────│
│ │ │ │
│ │── 读 DB ───────▶│ │
│ │◀─ 新值 ─────────│ │
│ │ │ │
│ │── 写缓存 ───────────────────────▶│
│ │ │ │
│◀─ 新值 ─────────│ │ │
2.2 先删除缓存,再更新数据库
def update_user(user_id, data):
"""更新用户信息"""
# 1. 删除缓存
redis.delete(f"user:{user_id}")
# 2. 更新数据库
db.execute(
"UPDATE users SET name = ? WHERE id = ?",
data['name'], user_id
)
# 不推荐:可能导致不一致
2.3 双写一致性保障
def update_user_with_retry(user_id, data, max_retries=3):
"""带重试的双写"""
for i in range(max_retries):
try:
# 1. 更新数据库
db.execute(
"UPDATE users SET name = ? WHERE id = ?",
data['name'], user_id
)
# 2. 删除缓存
deleted = redis.delete(f"user:{user_id}")
if deleted:
return True
# 缓存不存在,可能是并发读取
# 重试删除
except Exception as e:
log.error(f"Update failed: {e}")
if i == max_retries - 1:
raise
return False
三、延时双删
3.1 基本原理
延时双删流程:
1. 删除缓存
2. 更新数据库
3. 等待 N 毫秒
4. 再次删除缓存
目的:删除在更新期间写入缓存的旧数据
3.2 实现方案
import time
def update_user_delayed_double_delete(user_id, data, delay_ms=500):
"""延时双删"""
# 1. 删除缓存
redis.delete(f"user:{user_id}")
# 2. 更新数据库
db.execute(
"UPDATE users SET name = ? WHERE id = ?",
data['name'], user_id
)
# 3. 等待
time.sleep(delay_ms / 1000)
# 4. 再次删除缓存
redis.delete(f"user:{user_id}")
3.3 异步延时删除
import threading
from celery import Celery
app = Celery('tasks', broker='redis://localhost/0')
@app.task
def delayed_delete(key, delay_ms):
"""异步延时删除"""
time.sleep(delay_ms / 1000)
redis.delete(key)
def update_user_async(user_id, data):
"""异步延时双删"""
# 1. 删除缓存
redis.delete(f"user:{user_id}")
# 2. 更新数据库
db.execute(
"UPDATE users SET name = ? WHERE id = ?",
data['name'], user_id
)
# 3. 异步延时删除
delayed_delete.delay(f"user:{user_id}", 500)
3.4 延迟时间计算
def calculate_delay():
"""计算延迟时间"""
# 基于数据库主从同步延迟
# 基于业务读取频率
# 基于网络延迟
# 经验值:500ms - 1000ms
return 500
四、监听 Binlog
4.1 Canal 方案
MySQL ──▶ Binlog ──▶ Canal ──▶ Redis
│
└──▶ 消息队列
4.2 Canal 配置
// Canal 客户端
public class CanalClient {
private CanalConnector connector;
private RedisClient redis;
public void start() {
// 连接 Canal
connector = CanalConnectors.newSingleConnector(
new InetSocketAddress("127.0.0.1", 11111),
"example", "", ""
);
connector.connect();
connector.subscribe(".*\\..*");
while (true) {
// 获取 binlog
Message message = connector.getWithoutAck(100, null);
List<CanalEntry.Entry> entries = message.getEntries();
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
// 解析 row 数据
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
// 处理更新
if (rowChange.getEventType() == CanalEntry.EventType.UPDATE) {
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
// 提取主键
String userId = extractUserId(rowData.getAfterColumnsList());
// 删除缓存
redis.del("user:" + userId);
}
}
}
}
// 确认
connector.ack(message.getId());
}
}
}
4.3 Debezium 方案
from kafka import KafkaConsumer
import json
class DebeziumConsumer:
def __init__(self, redis_client, kafka_topic):
self.redis = redis_client
self.consumer = KafkaConsumer(
kafka_topic,
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
)
def consume(self):
"""消费 binlog 事件"""
for message in self.consumer:
event = message.value
# 处理更新事件
if event['op'] == 'u': # update
table = event['source']['table']
primary_key = event['before']['id']
# 删除缓存
if table == 'users':
self.redis.delete(f"user:{primary_key}")
# 处理删除事件
elif event['op'] == 'd': # delete
table = event['source']['table']
primary_key = event['before']['id']
# 删除缓存
if table == 'users':
self.redis.delete(f"user:{primary_key}")
# 使用示例
consumer = DebeziumConsumer(redis, 'db.server.users')
consumer.consume()
五、分布式事务
5.1 本地消息表
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Application │ │ Database │ │ Message │
│ │ │ │ │ Queue │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
│ 1. 写入业务数据 │ │
│ + 消息记录 │ │
│─────────────────▶│ │
│ │ │
│ 2. 提交事务 │ │
│◀─────────────────│ │
│ │ │
│ 3. 查询消息记录 │ │
│─────────────────▶│ │
│ │ │
│ 4. 发送消息 │ │
│────────────────────────────────────▶│
│ │ │
│ 5. 更新消息状态 │ │
│─────────────────▶│ │
│ │ │
│ │ 6. 消费消息 │
│ │ 删除缓存 │
│ │◀─────────────────│
5.2 实现代码
def update_user_with_message_table(user_id, data):
"""本地消息表方案"""
# 1. 开启事务
conn = db.get_connection()
cursor = conn.cursor()
try:
# 2. 更新数据库
cursor.execute(
"UPDATE users SET name = ? WHERE id = ?",
(data['name'], user_id)
)
# 3. 写入消息记录
message_id = str(uuid.uuid4())
cursor.execute(
"""INSERT INTO messages
(id, type, payload, status, created_at)
VALUES (?, ?, ?, 'pending', NOW())""",
(message_id, 'delete_cache', json.dumps({'key': f'user:{user_id}'}))
)
# 4. 提交事务
conn.commit()
# 5. 异步发送消息
send_message_async(message_id)
except Exception as e:
conn.rollback()
raise
finally:
cursor.close()
conn.close()
def send_message_async(message_id):
"""异步发送消息"""
message = get_message(message_id)
# 发送到消息队列
queue.publish('cache.delete', message['payload'])
# 更新消息状态
update_message_status(message_id, 'sent')
def process_cache_delete(payload):
"""处理缓存删除消息"""
key = payload['key']
redis.delete(key)
5.3 TCC 方案
class CacheTCC:
def try_operation(self, user_id, data):
"""Try 阶段"""
# 预留资源
# 记录操作日志
pass
def confirm_operation(self, user_id, data):
"""Confirm 阶段"""
# 1. 更新数据库
db.execute(
"UPDATE users SET name = ? WHERE id = ?",
data['name'], user_id
)
# 2. 删除缓存
redis.delete(f"user:{user_id}")
def cancel_operation(self, user_id, data):
"""Cancel 阶段"""
# 回滚操作
pass
六、缓存更新策略
6.1 Cache-Aside
def get_user(user_id):
"""Cache-Aside 模式"""
# 1. 读缓存
key = f"user:{user_id}"
user = redis.get(key)
if user:
return json.loads(user)
# 2. 读数据库
user = db.query("SELECT * FROM users WHERE id = ?", user_id)
if user:
# 3. 写缓存
redis.setex(key, 300, json.dumps(user))
return user
def update_user(user_id, data):
"""更新"""
# 1. 更新数据库
db.execute(
"UPDATE users SET name = ? WHERE id = ?",
data['name'], user_id
)
# 2. 删除缓存
redis.delete(f"user:{user_id}")
6.2 Read/Write Through
class ThroughCache:
def __init__(self, redis_client, db_client):
self.redis = redis_client
self.db = db_client
def get(self, key):
"""读通过"""
# 缓存自动加载
value = self.redis.get(key)
if not value:
# 缓存系统负责加载
value = self.db.query(key)
self.redis.setex(key, 300, value)
return value
def set(self, key, value):
"""写通过"""
# 缓存系统负责同步
self.redis.setex(key, 300, value)
self.db.update(key, value)
6.3 Write Behind
class WriteBehindCache:
def __init__(self, redis_client, db_client):
self.redis = redis_client
self.db = db_client
self.queue = []
def set(self, key, value):
"""写后异步"""
# 1. 只写缓存
self.redis.setex(key, 300, value)
# 2. 加入异步队列
self.queue.append((key, value))
# 3. 批量异步写入数据库
if len(self.queue) >= 100:
self.flush_queue()
def flush_queue(self):
"""刷新队列"""
if not self.queue:
return
# 批量写入数据库
self.db.batch_update(self.queue)
self.queue = []
七、方案对比
7.1 对比表
| 方案 | 一致性 | 性能 | 复杂度 | 适用场景 |
|---|---|---|---|---|
| 先 DB 后删缓存 | 弱一致 | 高 | 低 | 一般场景 |
| 延时双删 | 弱一致 | 中 | 低 | 读多写少 |
| 监听 Binlog | 最终一致 | 高 | 中 | 高并发 |
| 本地消息表 | 最终一致 | 中 | 中 | 可靠性要求高 |
| TCC | 强一致 | 低 | 高 | 金融场景 |
7.2 选择指南
选择建议:
1. 一般业务
→ 先更新数据库,再删除缓存
2. 高并发场景
→ 监听 Binlog(Canal/Debezium)
3. 数据一致性要求高
→ 本地消息表 / TCC
4. 读多写少
→ 延时双删
5. 金融交易
→ 强一致方案(分布式事务)
八、最佳实践
8.1 配置建议
缓存过期时间:
- 热点数据:300 秒
- 一般数据:3600 秒
- 配置数据:86400 秒
删除重试:
- 重试次数:3 次
- 重试间隔:100ms, 200ms, 400ms
监控指标:
- 缓存命中率
- 不一致检测
- 删除失败率
8.2 监控方案
class ConsistencyMonitor:
def __init__(self, redis_client, db_client):
self.redis = redis_client
self.db = db_client
def check_consistency(self, user_id):
"""检查一致性"""
# 获取缓存
cache_value = self.redis.get(f"user:{user_id}")
# 获取数据库
db_value = self.db.query(
"SELECT * FROM users WHERE id = ?", user_id
)
# 比较
if cache_value and db_value:
cache_data = json.loads(cache_value)
if cache_data != db_value:
# 记录不一致
log.error(f"Inconsistency: user={user_id}")
return False
return True
def periodic_check(self):
"""定期检查"""
while True:
# 随机抽查
user_ids = self.db.get_random_users(100)
for user_id in user_ids:
self.check_consistency(user_id)
time.sleep(60)
8.3 故障处理
def handle_delete_failure(key, max_retries=3):
"""处理删除失败"""
for i in range(max_retries):
try:
deleted = redis.delete(key)
if deleted:
return True
# 延迟重试
time.sleep(0.1 * (2 ** i))
except Exception as e:
log.error(f"Delete failed: {e}")
# 记录失败,人工处理
log.error(f"Delete failed after {max_retries} retries: {key}")
return False
总结
Redis 缓存一致性核心要点:
| 方案 | 核心思想 | 适用场景 |
|---|---|---|
| 双写 | 更新 DB+ 删除缓存 | 一般业务 |
| 延时双删 | 删除 - 更新 - 删除 | 读多写少 |
| 监听 Binlog | 异步删除缓存 | 高并发 |
| 分布式事务 | 保证原子性 | 金融场景 |
最佳实践:
- 优先选择先更新数据库,再删除缓存
- 高并发场景使用监听 Binlog
- 设置合理的缓存过期时间
- 实现一致性监控
- 处理删除失败情况
- 根据业务选择合适方案
掌握缓存一致性,构建可靠的高性能系统!