Skip to content
清晨的一缕阳光
返回

Redis 分布式锁实战

Redis 分布式锁实战

分布式锁是分布式系统中协调多进程访问共享资源的关键技术。Redis 凭借高性能和原子操作,成为实现分布式锁的热门选择。本文将深入 Redis 分布式锁的实现原理和最佳实践。

一、分布式锁概述

1.1 为什么需要分布式锁

场景:秒杀系统
┌─────────┐  ┌─────────┐  ┌─────────┐
│ Service │  │ Service │  │ Service │
│    A    │  │    B    │  │    C    │
└────┬────┘  └────┬────┘  └────┬────┘
     │            │            │
     └────────────┼────────────┘

         ┌─────────────────┐
         │   Redis/DB      │
         │  库存:100      │
         └─────────────────┘

问题:多个服务同时扣减库存,可能超卖
解决:分布式锁保证同一时间只有一个服务操作

1.2 锁的特性要求

特性说明
互斥性同一时间只有一个客户端持有锁
防死锁锁超时自动释放
容错性Redis 故障不影响系统可用性
安全性只能释放自己持有的锁

二、基础实现

2.1 SETNX 实现

# 获取锁
SETNX lock_key unique_value
# 返回 1 = 成功,0 = 失败

# 释放锁
DEL lock_key

问题

1. 没有超时时间,可能死锁
2. 客户端故障,锁无法释放

2.2 SETNX + EXPIRE

# 获取锁
SETNX lock_key unique_value
EXPIRE lock_key 30

# 释放锁
DEL lock_key

问题

原子性问题:SETNX 和 EXPIRE 不是原子操作
可能 SETNX 成功,EXPIRE 失败,导致死锁

2.3 SET NX EX(推荐)

# Redis 2.6.12+ 支持
SET lock_key unique_value NX EX 30

# 参数说明
# NX = Not Exists(仅当不存在)
# EX = EXpire(过期时间,秒)
# PX = 过期时间(毫秒)

# 成功返回
OK

# 失败返回
(nil)

完整实现

import redis
import uuid
import time

class RedisLock:
    def __init__(self, redis_client, key, timeout=30):
        self.redis = redis_client
        self.key = key
        self.timeout = timeout
        self.token = str(uuid.uuid4())
    
    def acquire(self):
        """获取锁"""
        return self.redis.set(
            self.key, 
            self.token, 
            nx=True,      # NX
            ex=self.timeout  # EX
        )
    
    def release(self):
        """释放锁(Lua 脚本保证原子性)"""
        lua_script = """
        if redis.call('get', KEYS[1]) == ARGV[1] then
            return redis.call('del', KEYS[1])
        else
            return 0
        end
        """
        return self.redis.eval(lua_script, 1, self.key, self.token)
    
    def __enter__(self):
        if not self.acquire():
            raise Exception("获取锁失败")
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()

# 使用示例
with RedisLock(redis, "lock:order:1001", timeout=30):
    # 临界区代码
    process_order()

三、锁续期

3.1 看门狗机制

问题:业务执行时间超过锁超时时间
解决:后台线程定期续期

实现

import threading
import time

class RedisLockWithWatchdog(RedisLock):
    def __init__(self, redis_client, key, timeout=30, renew_interval=10):
        super().__init__(redis_client, key, timeout)
        self.renew_interval = renew_interval
        self._stop_renew = threading.Event()
    
    def acquire(self):
        if super().acquire():
            # 启动看门狗
            self._start_watchdog()
            return True
        return False
    
    def _start_watchdog(self):
        """启动后台续期线程"""
        def renew_loop():
            while not self._stop_renew.wait(self.renew_interval):
                try:
                    # 续期
                    if self.redis.get(self.key) == self.token:
                        self.redis.expire(self.key, self.timeout)
                except:
                    break
        
        thread = threading.Thread(target=renew_loop, daemon=True)
        thread.start()
    
    def release(self):
        # 停止续期
        self._stop_renew.set()
        super().release()

3.2 Redlock 算法

多 Redis 实例的分布式锁

┌─────────┐  ┌─────────┐  ┌─────────┐
│ Redis 1 │  │ Redis 2 │  │ Redis 3 │
└────┬────┘  └────┬────┘  └────┬────┘
     │            │            │
     └────────────┼────────────┘

         ┌─────────────────┐
         │    Client       │
         │  获取多数节点锁  │
         └─────────────────┘

算法步骤

1. 记录当前时间
2. 依次向 N 个 Redis 实例请求锁
3. 计算获取锁耗时
4. 如果成功获取 >= N/2+1 个锁,且耗时 < 超时时间,则成功
5. 锁的有效时间 = 超时时间 - 耗时

Python 实现

import redis
import time
import uuid

class Redlock:
    def __init__(self, redis_instances, key, timeout=30):
        self.instances = redis_instances  # [Redis1, Redis2, Redis3]
        self.key = key
        self.timeout = timeout
        self.token = str(uuid.uuid4())
        self.validity = 0
    
    def acquire(self):
        start_time = time.time()
        acquired = 0
        
        # 1. 尝试获取所有实例的锁
        for redis_client in self.instances:
            try:
                if redis_client.set(
                    self.key, 
                    self.token, 
                    nx=True, 
                    ex=self.timeout,
                    socket_timeout=0.1
                ):
                    acquired += 1
            except:
                continue
        
        # 2. 计算耗时
        elapsed = time.time() - start_time
        self.validity = self.timeout - elapsed
        
        # 3. 检查是否成功(多数节点)
        if acquired >= (len(self.instances) // 2 + 1) and self.validity > 0:
            return True
        
        # 4. 失败则释放所有锁
        self.release()
        return False
    
    def release(self):
        lua_script = """
        if redis.call('get', KEYS[1]) == ARGV[1] then
            return redis.call('del', KEYS[1])
        else
            return 0
        end
        """
        for redis_client in self.instances:
            try:
                redis_client.eval(lua_script, 1, self.key, self.token)
            except:
                pass
    
    def __enter__(self):
        if not self.acquire():
            raise Exception("获取 Redlock 失败")
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        self.release()

# 使用示例
redis_instances = [
    redis.Redis(host='redis1', port=6379),
    redis.Redis(host='redis2', port=6379),
    redis.Redis(host='redis3', port=6379),
]

with Redlock(redis_instances, "lock:resource", timeout=30):
    process_critical_section()

四、应用场景

4.1 秒杀系统

class SeckillService:
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def seckill(self, product_id, user_id):
        lock_key = f"lock:seckill:{product_id}"
        
        with RedisLock(self.redis, lock_key, timeout=10):
            # 1. 检查库存
            stock = self.redis.get(f"product:{product_id}:stock")
            if not stock or int(stock) <= 0:
                return False
            
            # 2. 扣减库存
            self.redis.decr(f"product:{product_id}:stock")
            
            # 3. 创建订单
            self.create_order(product_id, user_id)
            
            return True
    
    def create_order(self, product_id, user_id):
        # 创建订单逻辑
        pass

4.2 分布式任务调度

class DistributedScheduler:
    def __init__(self, redis_client):
        self.redis = redis_client
        self.node_id = uuid.uuid4()
    
    def schedule(self):
        while True:
            lock_key = "lock:scheduler:task_processor"
            
            if self.redis.set(lock_key, self.node_id, nx=True, ex=60):
                try:
                    # 执行任务
                    self.process_tasks()
                finally:
                    # 释放锁
                    lua_script = """
                    if redis.call('get', KEYS[1]) == ARGV[1] then
                        return redis.call('del', KEYS[1])
                    else
                        return 0
                    end
                    """
                    self.redis.eval(lua_script, 1, lock_key, self.node_id)
            
            time.sleep(10)
    
    def process_tasks(self):
        # 处理任务逻辑
        pass

4.3 防止缓存击穿

def get_cache_with_lock(key):
    # 1. 查询缓存
    value = redis.get(key)
    if value:
        return value
    
    # 2. 获取锁
    lock_key = f"lock:{key}"
    if redis.set(lock_key, "1", nx=True, ex=10):
        try:
            # 3. 双重检查
            value = redis.get(key)
            if value:
                return value
            
            # 4. 查询数据库
            value = query_db(key)
            
            # 5. 写入缓存
            redis.setex(key, 300, value)
            
            return value
        finally:
            # 释放锁
            redis.delete(lock_key)
    else:
        # 6. 等待后重试
        time.sleep(0.1)
        return get_cache_with_lock(key)

五、常见问题与优化

5.1 锁误删问题

问题

客户端 A 获取锁(超时 30s)
客户端 A 业务执行慢(40s)
锁自动释放
客户端 B 获取锁
客户端 A 执行完,删除锁(实际是 B 的锁)

解决

# 使用唯一标识
token = uuid.uuid4()
SET lock_key token NX EX 30

# 释放时检查
if redis.get(lock_key) == token:
    DEL lock_key

5.2 锁超时问题

问题

业务执行时间 > 锁超时时间
锁自动释放,其他客户端进入

解决

# 1. 使用看门狗续期
# 2. 设置合理的超时时间
# 3. 业务幂等性设计

5.3 性能优化

# 1. 批量操作
# 使用 Pipeline 减少网络往返

# 2. 锁粒度
# 细粒度锁 vs 粗粒度锁
lock:user:1001  # 细粒度
lock:users      # 粗粒度

# 3. 锁等待
# 使用重试机制
def acquire_with_retry(lock, max_retries=5):
    for i in range(max_retries):
        if lock.acquire():
            return True
        time.sleep(0.1 * (2 ** i))  # 指数退避
    return False

5.4 红锁争议

Redlock 争议

Martin Kleppmann vs Antirez 的辩论

问题:
1. 时钟跳跃可能导致锁失效
2. GC 停顿可能导致并发

建议:
1. 使用 ZooKeeper/etcd 实现强一致性锁
2. Redis 锁用于性能要求高、允许短暂并发的场景

六、最佳实践总结

6.1 实现要点

# ✅ 推荐
SET lock_key unique_value NX EX timeout  # 原子操作
DEL with Lua script                       # 原子释放
Watchdog for long tasks                  # 看门狗续期

# ❌ 避免
SETNX + EXPIRE separately                  # 非原子
DEL without check                          # 可能误删
Infinite timeout                          # 可能死锁

6.2 配置建议

参数推荐值说明
timeout10-30s根据业务复杂度
renew_intervaltimeout/3续期间隔
max_retries3-5最大重试次数
retry_delay指数退避重试间隔

6.3 监控指标

# 监控锁状态
lock_info = {
    'acquire_success': counter,
    'acquire_fail': counter,
    'lock_timeout': counter,
    'lock_duration': histogram,
}

# 告警
if lock_timeout_rate > 0.1:
    alert("锁超时率过高")

总结

Redis 分布式锁核心要点:

实现方式优点缺点适用场景
SET NX EX简单、高性能单点故障一般场景
Redlock高可用复杂、有时钟问题高可用要求
ZooKeeper强一致性能较低强一致要求

最佳实践

  1. 使用 SET NX EX 原子操作
  2. 释放锁使用 Lua 脚本
  3. 长任务使用看门狗续期
  4. 设置合理的超时时间
  5. 重要场景使用 Redlock 或 ZooKeeper

掌握 Redis 分布式锁,构建可靠的分布式系统!

参考资料


分享这篇文章到:

上一篇文章
Kafka 消费者组与重平衡机制详解
下一篇文章
2025年互联网行业 “寒潮” 持续:大厂裁员背后的6大核心原因?