Redis 分布式锁实战
分布式锁是分布式系统中协调多进程访问共享资源的关键技术。Redis 凭借高性能和原子操作,成为实现分布式锁的热门选择。本文将深入 Redis 分布式锁的实现原理和最佳实践。
一、分布式锁概述
1.1 为什么需要分布式锁
场景:秒杀系统
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Service │ │ Service │ │ Service │
│ A │ │ B │ │ C │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
└────────────┼────────────┘
▼
┌─────────────────┐
│ Redis/DB │
│ 库存:100 │
└─────────────────┘
问题:多个服务同时扣减库存,可能超卖
解决:分布式锁保证同一时间只有一个服务操作
1.2 锁的特性要求
| 特性 | 说明 |
|---|---|
| 互斥性 | 同一时间只有一个客户端持有锁 |
| 防死锁 | 锁超时自动释放 |
| 容错性 | Redis 故障不影响系统可用性 |
| 安全性 | 只能释放自己持有的锁 |
二、基础实现
2.1 SETNX 实现
# 获取锁
SETNX lock_key unique_value
# 返回 1 = 成功,0 = 失败
# 释放锁
DEL lock_key
问题:
1. 没有超时时间,可能死锁
2. 客户端故障,锁无法释放
2.2 SETNX + EXPIRE
# 获取锁
SETNX lock_key unique_value
EXPIRE lock_key 30
# 释放锁
DEL lock_key
问题:
原子性问题:SETNX 和 EXPIRE 不是原子操作
可能 SETNX 成功,EXPIRE 失败,导致死锁
2.3 SET NX EX(推荐)
# Redis 2.6.12+ 支持
SET lock_key unique_value NX EX 30
# 参数说明
# NX = Not Exists(仅当不存在)
# EX = EXpire(过期时间,秒)
# PX = 过期时间(毫秒)
# 成功返回
OK
# 失败返回
(nil)
完整实现:
import redis
import uuid
import time
class RedisLock:
def __init__(self, redis_client, key, timeout=30):
self.redis = redis_client
self.key = key
self.timeout = timeout
self.token = str(uuid.uuid4())
def acquire(self):
"""获取锁"""
return self.redis.set(
self.key,
self.token,
nx=True, # NX
ex=self.timeout # EX
)
def release(self):
"""释放锁(Lua 脚本保证原子性)"""
lua_script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
return self.redis.eval(lua_script, 1, self.key, self.token)
def __enter__(self):
if not self.acquire():
raise Exception("获取锁失败")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
# 使用示例
with RedisLock(redis, "lock:order:1001", timeout=30):
# 临界区代码
process_order()
三、锁续期
3.1 看门狗机制
问题:业务执行时间超过锁超时时间
解决:后台线程定期续期
实现:
import threading
import time
class RedisLockWithWatchdog(RedisLock):
def __init__(self, redis_client, key, timeout=30, renew_interval=10):
super().__init__(redis_client, key, timeout)
self.renew_interval = renew_interval
self._stop_renew = threading.Event()
def acquire(self):
if super().acquire():
# 启动看门狗
self._start_watchdog()
return True
return False
def _start_watchdog(self):
"""启动后台续期线程"""
def renew_loop():
while not self._stop_renew.wait(self.renew_interval):
try:
# 续期
if self.redis.get(self.key) == self.token:
self.redis.expire(self.key, self.timeout)
except:
break
thread = threading.Thread(target=renew_loop, daemon=True)
thread.start()
def release(self):
# 停止续期
self._stop_renew.set()
super().release()
3.2 Redlock 算法
多 Redis 实例的分布式锁:
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Redis 1 │ │ Redis 2 │ │ Redis 3 │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
└────────────┼────────────┘
▼
┌─────────────────┐
│ Client │
│ 获取多数节点锁 │
└─────────────────┘
算法步骤:
1. 记录当前时间
2. 依次向 N 个 Redis 实例请求锁
3. 计算获取锁耗时
4. 如果成功获取 >= N/2+1 个锁,且耗时 < 超时时间,则成功
5. 锁的有效时间 = 超时时间 - 耗时
Python 实现:
import redis
import time
import uuid
class Redlock:
def __init__(self, redis_instances, key, timeout=30):
self.instances = redis_instances # [Redis1, Redis2, Redis3]
self.key = key
self.timeout = timeout
self.token = str(uuid.uuid4())
self.validity = 0
def acquire(self):
start_time = time.time()
acquired = 0
# 1. 尝试获取所有实例的锁
for redis_client in self.instances:
try:
if redis_client.set(
self.key,
self.token,
nx=True,
ex=self.timeout,
socket_timeout=0.1
):
acquired += 1
except:
continue
# 2. 计算耗时
elapsed = time.time() - start_time
self.validity = self.timeout - elapsed
# 3. 检查是否成功(多数节点)
if acquired >= (len(self.instances) // 2 + 1) and self.validity > 0:
return True
# 4. 失败则释放所有锁
self.release()
return False
def release(self):
lua_script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
for redis_client in self.instances:
try:
redis_client.eval(lua_script, 1, self.key, self.token)
except:
pass
def __enter__(self):
if not self.acquire():
raise Exception("获取 Redlock 失败")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()
# 使用示例
redis_instances = [
redis.Redis(host='redis1', port=6379),
redis.Redis(host='redis2', port=6379),
redis.Redis(host='redis3', port=6379),
]
with Redlock(redis_instances, "lock:resource", timeout=30):
process_critical_section()
四、应用场景
4.1 秒杀系统
class SeckillService:
def __init__(self, redis_client):
self.redis = redis_client
def seckill(self, product_id, user_id):
lock_key = f"lock:seckill:{product_id}"
with RedisLock(self.redis, lock_key, timeout=10):
# 1. 检查库存
stock = self.redis.get(f"product:{product_id}:stock")
if not stock or int(stock) <= 0:
return False
# 2. 扣减库存
self.redis.decr(f"product:{product_id}:stock")
# 3. 创建订单
self.create_order(product_id, user_id)
return True
def create_order(self, product_id, user_id):
# 创建订单逻辑
pass
4.2 分布式任务调度
class DistributedScheduler:
def __init__(self, redis_client):
self.redis = redis_client
self.node_id = uuid.uuid4()
def schedule(self):
while True:
lock_key = "lock:scheduler:task_processor"
if self.redis.set(lock_key, self.node_id, nx=True, ex=60):
try:
# 执行任务
self.process_tasks()
finally:
# 释放锁
lua_script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
self.redis.eval(lua_script, 1, lock_key, self.node_id)
time.sleep(10)
def process_tasks(self):
# 处理任务逻辑
pass
4.3 防止缓存击穿
def get_cache_with_lock(key):
# 1. 查询缓存
value = redis.get(key)
if value:
return value
# 2. 获取锁
lock_key = f"lock:{key}"
if redis.set(lock_key, "1", nx=True, ex=10):
try:
# 3. 双重检查
value = redis.get(key)
if value:
return value
# 4. 查询数据库
value = query_db(key)
# 5. 写入缓存
redis.setex(key, 300, value)
return value
finally:
# 释放锁
redis.delete(lock_key)
else:
# 6. 等待后重试
time.sleep(0.1)
return get_cache_with_lock(key)
五、常见问题与优化
5.1 锁误删问题
问题:
客户端 A 获取锁(超时 30s)
客户端 A 业务执行慢(40s)
锁自动释放
客户端 B 获取锁
客户端 A 执行完,删除锁(实际是 B 的锁)
解决:
# 使用唯一标识
token = uuid.uuid4()
SET lock_key token NX EX 30
# 释放时检查
if redis.get(lock_key) == token:
DEL lock_key
5.2 锁超时问题
问题:
业务执行时间 > 锁超时时间
锁自动释放,其他客户端进入
解决:
# 1. 使用看门狗续期
# 2. 设置合理的超时时间
# 3. 业务幂等性设计
5.3 性能优化
# 1. 批量操作
# 使用 Pipeline 减少网络往返
# 2. 锁粒度
# 细粒度锁 vs 粗粒度锁
lock:user:1001 # 细粒度
lock:users # 粗粒度
# 3. 锁等待
# 使用重试机制
def acquire_with_retry(lock, max_retries=5):
for i in range(max_retries):
if lock.acquire():
return True
time.sleep(0.1 * (2 ** i)) # 指数退避
return False
5.4 红锁争议
Redlock 争议:
Martin Kleppmann vs Antirez 的辩论
问题:
1. 时钟跳跃可能导致锁失效
2. GC 停顿可能导致并发
建议:
1. 使用 ZooKeeper/etcd 实现强一致性锁
2. Redis 锁用于性能要求高、允许短暂并发的场景
六、最佳实践总结
6.1 实现要点
# ✅ 推荐
SET lock_key unique_value NX EX timeout # 原子操作
DEL with Lua script # 原子释放
Watchdog for long tasks # 看门狗续期
# ❌ 避免
SETNX + EXPIRE separately # 非原子
DEL without check # 可能误删
Infinite timeout # 可能死锁
6.2 配置建议
| 参数 | 推荐值 | 说明 |
|---|---|---|
| timeout | 10-30s | 根据业务复杂度 |
| renew_interval | timeout/3 | 续期间隔 |
| max_retries | 3-5 | 最大重试次数 |
| retry_delay | 指数退避 | 重试间隔 |
6.3 监控指标
# 监控锁状态
lock_info = {
'acquire_success': counter,
'acquire_fail': counter,
'lock_timeout': counter,
'lock_duration': histogram,
}
# 告警
if lock_timeout_rate > 0.1:
alert("锁超时率过高")
总结
Redis 分布式锁核心要点:
| 实现方式 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| SET NX EX | 简单、高性能 | 单点故障 | 一般场景 |
| Redlock | 高可用 | 复杂、有时钟问题 | 高可用要求 |
| ZooKeeper | 强一致 | 性能较低 | 强一致要求 |
最佳实践:
- 使用 SET NX EX 原子操作
- 释放锁使用 Lua 脚本
- 长任务使用看门狗续期
- 设置合理的超时时间
- 重要场景使用 Redlock 或 ZooKeeper
掌握 Redis 分布式锁,构建可靠的分布式系统!