Skip to content
清晨的一缕阳光
返回

Redis 应用场景与选型指南

Redis 凭借其高性能和丰富的数据结构,成为现代架构中不可或缺的中间件。本文将详细介绍 Redis 的核心应用场景,并提供与其他中间件的选型对比,帮助你在实际项目中做出合理的技术决策。

一、核心应用场景

1.1 缓存系统

场景特点

典型用法

import redis
import json

r = redis.Redis(host='localhost', port=6379, db=0)

def get_user_info(user_id):
    # 1. 先查缓存
    cache_key = f"user:{user_id}"
    cached = r.get(cache_key)
    
    if cached:
        return json.loads(cached)
    
    # 2. 缓存未命中,查数据库
    user = db.query("SELECT * FROM users WHERE id = ?", user_id)
    
    # 3. 写入缓存,设置过期时间
    if user:
        r.setex(cache_key, 300, json.dumps(user))  # 5 分钟过期
    
    return user

最佳实践

策略说明适用场景
Cache-Aside先读缓存,未命中读 DB通用场景
Read-Through缓存层自动读 DB框架集成
Write-Through同时写缓存和 DB强一致性要求
Write-Behind先写缓存,异步写 DB高吞吐写入

1.2 分布式锁

场景特点

实现方案对比

graph TB
    subgraph 方案对比
        A[SETNX] --> B[基础实现]
        C[Redlock] --> D[多 Redis 实例]
        E[Redisson] --> F[看门狗机制]
    end
    
    B --> G[单点故障风险]
    D --> H[性能开销大]
    F --> I[推荐生产使用]

基础实现

def acquire_lock(lock_key, timeout=10):
    """获取分布式锁"""
    identifier = str(uuid.uuid4())
    end_time = time.time() + timeout
    
    while time.time() < end_time:
        # SETNX + EXPIRE 原子操作
        if r.set(lock_key, identifier, nx=True, ex=timeout):
            return identifier
        time.sleep(0.01)
    
    return None

def release_lock(lock_key, identifier):
    """释放分布式锁(Lua 脚本保证原子性)"""
    lua_script = """
    if redis.call('get', KEYS[1]) == ARGV[1] then
        return redis.call('del', KEYS[1])
    else
        return 0
    end
    """
    return r.eval(lua_script, 1, lock_key, identifier)

生产推荐 - Redisson

// Java Redisson 示例
RLock lock = redisson.getLock("myLock");

// 自动续期(看门狗机制)
lock.lock();

try {
    // 业务逻辑
    doSomething();
} finally {
    lock.unlock();
}

// 带超时的锁
if (lock.tryLock(1, 10, TimeUnit.SECONDS)) {
    try {
        // 最多等待 1 秒,锁定 10 秒后自动释放
    } finally {
        lock.unlock();
    }
}

1.3 消息队列

场景特点

三种实现方案对比

方案命令优点缺点适用场景
ListLPUSH/BRPOP简单、高性能无 ACK 机制简单任务队列
Pub/SubPUBLISH/SUBSCRIBE实时广播消息不持久化实时通知
StreamXADD/XREAD持久化、ACK、消费组复杂、内存占用大可靠消息队列

List 实现任务队列

# 生产者
def produce_task(task_data):
    r.lpush("task:queue", json.dumps(task_data))

# 消费者
def consume_task():
    # 阻塞式弹出,超时 5 秒
    result = r.brpop("task:queue", timeout=5)
    if result:
        return json.loads(result[1])
    return None

Stream 实现可靠消息队列

# 生产者 - 发送消息
message_id = r.xadd("stream:orders", {
    "order_id": "12345",
    "amount": "99.00",
    "timestamp": str(time.time())
})

# 消费者组 - 创建组
r.xgroup_create("stream:orders", "order-processors", id="0", mkstream=True)

# 消费者 - 读取消息
messages = r.xreadgroup(
    groupname="order-processors",
    consumername="worker-1",
    streams={"stream:orders": ">"},
    count=10,
    block=5000
)

# 处理完成后 ACK
r.xack("stream:orders", "order-processors", message_id)

1.4 会话存储

场景特点

实现方案

# 存储 Session
def create_session(user_id):
    session_id = str(uuid.uuid4())
    key = f"session:{session_id}"
    
    r.hmset(key, {
        "user_id": user_id,
        "username": username,
        "login_time": str(time.time())
    })
    r.expire(key, 7200)  # 2 小时过期
    
    return session_id

# 验证 Session
def verify_session(session_id):
    key = f"session:{session_id}"
    session_data = r.hgetall(key)
    
    if session_data:
        # 续期
        r.expire(key, 7200)
        return session_data
    return None

Spring Session 集成

<!-- Maven 依赖 -->
<dependency>
    <groupId>org.springframework.session</groupId>
    <artifactId>spring-session-data-redis</artifactId>
</dependency>
# application.yml
spring:
  session:
    store-type: redis
  redis:
    host: localhost
    port: 6379

1.5 排行榜系统

场景特点

ZSet 实现

# 更新分数
def update_score(user_id, score):
    r.zadd("leaderboard:global", {user_id: score})

# 获取用户排名
def get_user_rank(user_id):
    rank = r.zrevrank("leaderboard:global", user_id)
    return rank + 1 if rank is not None else None

# 获取 Top N
def get_top_n(n=100):
    return r.zrevrange("leaderboard:global", 0, n-1, withscores=True)

# 获取用户周围排名(分段查询)
def get_user_around_rank(user_id, range_size=5):
    user_rank = r.zrevrank("leaderboard:global", user_id)
    if user_rank is None:
        return []
    
    start = max(0, user_rank - range_size)
    end = user_rank + range_size
    
    return r.zrevrange("leaderboard:global", start, end, withscores=True)

游戏排行榜完整示例

class GameLeaderboard:
    def __init__(self, game_id):
        self.key = f"leaderboard:{game_id}"
    
    def add_score(self, user_id, score, member_id=None):
        """添加分数(支持多次累加)"""
        if member_id:
            # 多字段排序:先按分数,再按时间
            member_key = f"{user_id}:{member_id}"
        else:
            member_key = user_id
        
        return r.zincrby(self.key, score, member_key)
    
    def get_personal_rank(self, user_id):
        """获取个人排名"""
        rank = r.zrevrank(self.key, user_id)
        score = r.zscore(self.key, user_id)
        
        return {
            "user_id": user_id,
            "rank": (rank + 1) if rank is not None else None,
            "score": float(score) if score else 0
        }
    
    def get_top_players(self, start=0, count=10):
        """获取排行榜前 N 名"""
        results = r.zrevrange(self.key, start, start+count-1, withscores=True)
        
        return [
            {"user_id": member, "score": float(score)}
            for member, score in results
        ]

1.6 计数器与限流

场景特点

计数器实现

# 原子自增
def increment_counter(key, delta=1):
    return r.incrby(key, delta)

# 文章阅读量统计
def increment_article_views(article_id):
    key = f"article:views:{article_id}"
    return r.incr(key)

# 日活统计
def increment_dau(date_str, user_id):
    key = f"dau:{date_str}"
    return r.pfadd(key, user_id)  # HyperLogLog

# 获取总阅读量
def get_total_views():
    keys = r.keys("article:views:*")
    if not keys:
        return 0
    
    # 使用 Pipeline 批量获取
    pipe = r.pipeline()
    for key in keys:
        pipe.get(key)
    
    results = pipe.execute()
    return sum(int(v) for v in results if v)

限流实现

# 固定窗口限流
def rate_limit_fixed_window(user_id, limit=100, window=60):
    key = f"rate:fixed:{user_id}"
    current = r.incr(key)
    
    if current == 1:
        r.expire(key, window)
    
    return current <= limit

# 滑动窗口限流(推荐)
def rate_limit_sliding_window(user_id, limit=100, window=60):
    key = f"rate:sliding:{user_id}"
    now = time.time()
    window_start = now - window
    
    # 删除窗口外的数据
    r.zremrangebyscore(key, 0, window_start)
    
    # 添加当前请求
    r.zadd(key, {str(uuid.uuid4()): now})
    
    # 统计窗口内请求数
    count = r.zcard(key)
    r.expire(key, window)
    
    return count <= limit

# 令牌桶限流
def rate_limit_token_bucket(user_id, capacity=100, rate=10):
    key = f"rate:bucket:{user_id}"
    now = time.time()
    
    # Lua 脚本保证原子性
    lua_script = """
    local bucket = redis.call('HMGET', KEYS[1], 'tokens', 'last_time')
    local tokens = tonumber(bucket[1]) or ARGV[1]
    local last_time = tonumber(bucket[2]) or ARGV[2]
    
    -- 计算新增令牌
    local now = tonumber(ARGV[3])
    local delta = now - last_time
    local new_tokens = math.min(ARGV[1], tokens + delta * ARGV[4])
    
    if new_tokens >= 1 then
        redis.call('HMSET', KEYS[1], 'tokens', new_tokens - 1, 'last_time', now)
        return 1
    else
        return 0
    end
    """
    
    return r.eval(lua_script, 1, key, capacity, now, now, rate)

二、技术选型对比

2.1 缓存中间件对比

性能与功能对比

graph LR
    subgraph 高性能
        A[本地缓存] --> B[Redis]
        B --> C[Tair]
        C --> D[Memcached]
    end
    
    subgraph 功能丰富
        E[Tair] --> F[Redis]
        F --> G[Memcached]
        G --> H[本地缓存]
    end

详细对比表

特性RedisMemcachedTair(阿里云)
数据结构丰富(String/List/Hash/Set/ZSet)仅 String丰富 + 增强
持久化RDB + AOF不支持支持
集群模式Redis Cluster客户端分片原生支持
多线程IO 多线程(6.0+)多线程优化更好
过期策略精确到毫秒精确到秒精确到毫秒
事务支持不支持支持
发布订阅支持不支持支持
云服务能力自建/托管自建阿里云深度集成
适用场景通用缓存、复杂数据结构简单缓存企业级应用

选型建议

场景推荐方案理由
简单 KV 缓存Memcached轻量、性能好
复杂数据结构Redis数据类型丰富
需要持久化RedisRDB+AOF
阿里云环境Tair深度集成、运维方便
超大规模集群Tair/Redis Cluster自动分片、高可用

2.2 消息队列对比

特性Redis StreamKafkaRabbitMQRocketMQ
吞吐量中(万级)高(十万级)中(万级)高(十万级)
延迟低(ms 级)
可靠性
消息堆积内存限制磁盘存储内存限制磁盘存储
事务消息不支持不支持不支持支持
顺序消息支持支持支持支持
延迟消息不支持不支持支持支持
适用场景轻量级队列日志收集、大数据复杂路由电商、金融

选型建议

flowchart TD
    A[消息队列选型] --> B{消息量级?}
    B -->|万级以下 | C[Redis Stream]
    B -->|十万级以上 | D{消息类型?}
    D -->|日志/大数据 | E[Kafka]
    D -->|业务消息 | F{功能需求?}
    F -->|事务/延迟 | G[RocketMQ]
    F -->|复杂路由 | H[RabbitMQ]

三、生产实践建议

3.1 缓存设计原则

  1. 数据一致性

    • 能接受最终一致性:Cache-Aside
    • 强一致性要求:Write-Through 或 直接读 DB
  2. 缓存穿透防护

    # 布隆过滤器 + 缓存空值
    def get_with_bloom_filter(key):
        if not bloom_filter.contains(key):
            return None  # 肯定不存在
        
        cached = r.get(key)
        if cached == "NULL":
            return None  # 缓存的空值
        
        if cached:
            return cached
        
        # 查 DB
        data = db.query(key)
        if data:
            r.setex(key, 300, data)
        else:
            r.setex(key, 60, "NULL")  # 缓存空值
        
        return data
  3. 缓存雪崩防护

    # 随机过期时间
    def set_with_jitter(key, value, base_ttl=300):
        jitter = random.randint(0, 300)
        r.setex(key, base_ttl + jitter, value)
  4. 缓存击穿防护

    # 互斥锁
    def get_with_mutex(key):
        cached = r.get(key)
        if cached:
            return cached
        
        # 获取分布式锁
        lock_key = f"lock:{key}"
        if r.set(lock_key, "1", nx=True, ex=10):
            try:
                # 双重检查
                cached = r.get(key)
                if cached:
                    return cached
                
                data = db.query(key)
                r.setex(key, 300, data)
                return data
            finally:
                r.delete(lock_key)
        else:
            time.sleep(0.05)
            return get_with_mutex(key)  # 重试

3.2 Key 命名规范

推荐格式:业务名:表名:ID:字段
示例:
- user:info:1001          # 用户信息
- order:detail:2026040701 # 订单详情
- article:views:888       # 文章阅读量
- cart:items:user1001     # 购物车

3.3 内存优化策略

策略说明效果
使用 Hash 代替多个 String相关字段聚合存储节省 30-50% 内存
开启内存压缩hash-max-ziplist-entries节省 20-30% 内存
设置合理过期时间避免内存无限增长防止 OOM
定期清理冷数据使用 SCAN 命令释放内存

四、总结

场景选择速查表

需求推荐方案核心命令/数据结构
缓存String + 过期时间SETEX、GET
分布式锁String + Lua 脚本SET NX EX、EVAL
消息队列Stream/ListXADD/XREAD、LPUSH/BRPOP
Session 存储Hash + 过期时间HMSET、EXPIRE
排行榜ZSetZADD、ZREVRANK
计数器StringINCR、INCRBY
限流ZSet/Lua 脚本ZADD、EVAL
去重统计HyperLogLogPFADD、PFCOUNT
地理位置GEOGEOADD、GEORADIUS
实时消息Pub/SubPUBLISH、SUBSCRIBE

选型决策树

flowchart LR
    A[需求分析] --> B{需要持久化?}
    B -->|是 | C[Redis]
    B -->|否 | D{数据结构复杂度?}
    D -->|简单 KV| E[Memcached]
    D -->|复杂 | C
    C --> F{消息队列需求?}
    F -->|是 | G{可靠性要求?}
    G -->|高 | H[Kafka/RocketMQ]
    G -->|中 | I[Redis Stream]
    F -->|否 | J[纯缓存场景]

参考资料


分享这篇文章到:

上一篇文章
Redis 快速入门与实战指南
下一篇文章
Redis 架构设计与核心概念