Redis 凭借其高性能和丰富的数据结构,成为现代架构中不可或缺的中间件。本文将详细介绍 Redis 的核心应用场景,并提供与其他中间件的选型对比,帮助你在实际项目中做出合理的技术决策。
一、核心应用场景
1.1 缓存系统
场景特点:
- 读多写少,访问频率高
- 数据可丢失,允许短暂不一致
- 需要快速响应,降低数据库压力
典型用法:
import redis
import json
r = redis.Redis(host='localhost', port=6379, db=0)
def get_user_info(user_id):
# 1. 先查缓存
cache_key = f"user:{user_id}"
cached = r.get(cache_key)
if cached:
return json.loads(cached)
# 2. 缓存未命中,查数据库
user = db.query("SELECT * FROM users WHERE id = ?", user_id)
# 3. 写入缓存,设置过期时间
if user:
r.setex(cache_key, 300, json.dumps(user)) # 5 分钟过期
return user
最佳实践:
| 策略 | 说明 | 适用场景 |
|---|---|---|
| Cache-Aside | 先读缓存,未命中读 DB | 通用场景 |
| Read-Through | 缓存层自动读 DB | 框架集成 |
| Write-Through | 同时写缓存和 DB | 强一致性要求 |
| Write-Behind | 先写缓存,异步写 DB | 高吞吐写入 |
1.2 分布式锁
场景特点:
- 多实例并发控制
- 需要互斥访问共享资源
- 防止死锁,需要超时机制
实现方案对比:
graph TB
subgraph 方案对比
A[SETNX] --> B[基础实现]
C[Redlock] --> D[多 Redis 实例]
E[Redisson] --> F[看门狗机制]
end
B --> G[单点故障风险]
D --> H[性能开销大]
F --> I[推荐生产使用]
基础实现:
def acquire_lock(lock_key, timeout=10):
"""获取分布式锁"""
identifier = str(uuid.uuid4())
end_time = time.time() + timeout
while time.time() < end_time:
# SETNX + EXPIRE 原子操作
if r.set(lock_key, identifier, nx=True, ex=timeout):
return identifier
time.sleep(0.01)
return None
def release_lock(lock_key, identifier):
"""释放分布式锁(Lua 脚本保证原子性)"""
lua_script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
return r.eval(lua_script, 1, lock_key, identifier)
生产推荐 - Redisson:
// Java Redisson 示例
RLock lock = redisson.getLock("myLock");
// 自动续期(看门狗机制)
lock.lock();
try {
// 业务逻辑
doSomething();
} finally {
lock.unlock();
}
// 带超时的锁
if (lock.tryLock(1, 10, TimeUnit.SECONDS)) {
try {
// 最多等待 1 秒,锁定 10 秒后自动释放
} finally {
lock.unlock();
}
}
1.3 消息队列
场景特点:
- 异步处理,削峰填谷
- 解耦生产者和消费者
- 允许消息丢失(部分场景)
三种实现方案对比:
| 方案 | 命令 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|---|
| List | LPUSH/BRPOP | 简单、高性能 | 无 ACK 机制 | 简单任务队列 |
| Pub/Sub | PUBLISH/SUBSCRIBE | 实时广播 | 消息不持久化 | 实时通知 |
| Stream | XADD/XREAD | 持久化、ACK、消费组 | 复杂、内存占用大 | 可靠消息队列 |
List 实现任务队列:
# 生产者
def produce_task(task_data):
r.lpush("task:queue", json.dumps(task_data))
# 消费者
def consume_task():
# 阻塞式弹出,超时 5 秒
result = r.brpop("task:queue", timeout=5)
if result:
return json.loads(result[1])
return None
Stream 实现可靠消息队列:
# 生产者 - 发送消息
message_id = r.xadd("stream:orders", {
"order_id": "12345",
"amount": "99.00",
"timestamp": str(time.time())
})
# 消费者组 - 创建组
r.xgroup_create("stream:orders", "order-processors", id="0", mkstream=True)
# 消费者 - 读取消息
messages = r.xreadgroup(
groupname="order-processors",
consumername="worker-1",
streams={"stream:orders": ">"},
count=10,
block=5000
)
# 处理完成后 ACK
r.xack("stream:orders", "order-processors", message_id)
1.4 会话存储
场景特点:
- 用户登录状态管理
- 需要过期时间
- 分布式环境共享 Session
实现方案:
# 存储 Session
def create_session(user_id):
session_id = str(uuid.uuid4())
key = f"session:{session_id}"
r.hmset(key, {
"user_id": user_id,
"username": username,
"login_time": str(time.time())
})
r.expire(key, 7200) # 2 小时过期
return session_id
# 验证 Session
def verify_session(session_id):
key = f"session:{session_id}"
session_data = r.hgetall(key)
if session_data:
# 续期
r.expire(key, 7200)
return session_data
return None
Spring Session 集成:
<!-- Maven 依赖 -->
<dependency>
<groupId>org.springframework.session</groupId>
<artifactId>spring-session-data-redis</artifactId>
</dependency>
# application.yml
spring:
session:
store-type: redis
redis:
host: localhost
port: 6379
1.5 排行榜系统
场景特点:
- 实时排名更新
- 支持分页查询
- 高分优先排序
ZSet 实现:
# 更新分数
def update_score(user_id, score):
r.zadd("leaderboard:global", {user_id: score})
# 获取用户排名
def get_user_rank(user_id):
rank = r.zrevrank("leaderboard:global", user_id)
return rank + 1 if rank is not None else None
# 获取 Top N
def get_top_n(n=100):
return r.zrevrange("leaderboard:global", 0, n-1, withscores=True)
# 获取用户周围排名(分段查询)
def get_user_around_rank(user_id, range_size=5):
user_rank = r.zrevrank("leaderboard:global", user_id)
if user_rank is None:
return []
start = max(0, user_rank - range_size)
end = user_rank + range_size
return r.zrevrange("leaderboard:global", start, end, withscores=True)
游戏排行榜完整示例:
class GameLeaderboard:
def __init__(self, game_id):
self.key = f"leaderboard:{game_id}"
def add_score(self, user_id, score, member_id=None):
"""添加分数(支持多次累加)"""
if member_id:
# 多字段排序:先按分数,再按时间
member_key = f"{user_id}:{member_id}"
else:
member_key = user_id
return r.zincrby(self.key, score, member_key)
def get_personal_rank(self, user_id):
"""获取个人排名"""
rank = r.zrevrank(self.key, user_id)
score = r.zscore(self.key, user_id)
return {
"user_id": user_id,
"rank": (rank + 1) if rank is not None else None,
"score": float(score) if score else 0
}
def get_top_players(self, start=0, count=10):
"""获取排行榜前 N 名"""
results = r.zrevrange(self.key, start, start+count-1, withscores=True)
return [
{"user_id": member, "score": float(score)}
for member, score in results
]
1.6 计数器与限流
场景特点:
- 高并发计数
- 原子操作
- 频率限制
计数器实现:
# 原子自增
def increment_counter(key, delta=1):
return r.incrby(key, delta)
# 文章阅读量统计
def increment_article_views(article_id):
key = f"article:views:{article_id}"
return r.incr(key)
# 日活统计
def increment_dau(date_str, user_id):
key = f"dau:{date_str}"
return r.pfadd(key, user_id) # HyperLogLog
# 获取总阅读量
def get_total_views():
keys = r.keys("article:views:*")
if not keys:
return 0
# 使用 Pipeline 批量获取
pipe = r.pipeline()
for key in keys:
pipe.get(key)
results = pipe.execute()
return sum(int(v) for v in results if v)
限流实现:
# 固定窗口限流
def rate_limit_fixed_window(user_id, limit=100, window=60):
key = f"rate:fixed:{user_id}"
current = r.incr(key)
if current == 1:
r.expire(key, window)
return current <= limit
# 滑动窗口限流(推荐)
def rate_limit_sliding_window(user_id, limit=100, window=60):
key = f"rate:sliding:{user_id}"
now = time.time()
window_start = now - window
# 删除窗口外的数据
r.zremrangebyscore(key, 0, window_start)
# 添加当前请求
r.zadd(key, {str(uuid.uuid4()): now})
# 统计窗口内请求数
count = r.zcard(key)
r.expire(key, window)
return count <= limit
# 令牌桶限流
def rate_limit_token_bucket(user_id, capacity=100, rate=10):
key = f"rate:bucket:{user_id}"
now = time.time()
# Lua 脚本保证原子性
lua_script = """
local bucket = redis.call('HMGET', KEYS[1], 'tokens', 'last_time')
local tokens = tonumber(bucket[1]) or ARGV[1]
local last_time = tonumber(bucket[2]) or ARGV[2]
-- 计算新增令牌
local now = tonumber(ARGV[3])
local delta = now - last_time
local new_tokens = math.min(ARGV[1], tokens + delta * ARGV[4])
if new_tokens >= 1 then
redis.call('HMSET', KEYS[1], 'tokens', new_tokens - 1, 'last_time', now)
return 1
else
return 0
end
"""
return r.eval(lua_script, 1, key, capacity, now, now, rate)
二、技术选型对比
2.1 缓存中间件对比
性能与功能对比:
graph LR
subgraph 高性能
A[本地缓存] --> B[Redis]
B --> C[Tair]
C --> D[Memcached]
end
subgraph 功能丰富
E[Tair] --> F[Redis]
F --> G[Memcached]
G --> H[本地缓存]
end
详细对比表:
| 特性 | Redis | Memcached | Tair(阿里云) |
|---|---|---|---|
| 数据结构 | 丰富(String/List/Hash/Set/ZSet) | 仅 String | 丰富 + 增强 |
| 持久化 | RDB + AOF | 不支持 | 支持 |
| 集群模式 | Redis Cluster | 客户端分片 | 原生支持 |
| 多线程 | IO 多线程(6.0+) | 多线程 | 优化更好 |
| 过期策略 | 精确到毫秒 | 精确到秒 | 精确到毫秒 |
| 事务 | 支持 | 不支持 | 支持 |
| 发布订阅 | 支持 | 不支持 | 支持 |
| 云服务能力 | 自建/托管 | 自建 | 阿里云深度集成 |
| 适用场景 | 通用缓存、复杂数据结构 | 简单缓存 | 企业级应用 |
选型建议:
| 场景 | 推荐方案 | 理由 |
|---|---|---|
| 简单 KV 缓存 | Memcached | 轻量、性能好 |
| 复杂数据结构 | Redis | 数据类型丰富 |
| 需要持久化 | Redis | RDB+AOF |
| 阿里云环境 | Tair | 深度集成、运维方便 |
| 超大规模集群 | Tair/Redis Cluster | 自动分片、高可用 |
2.2 消息队列对比
| 特性 | Redis Stream | Kafka | RabbitMQ | RocketMQ |
|---|---|---|---|---|
| 吞吐量 | 中(万级) | 高(十万级) | 中(万级) | 高(十万级) |
| 延迟 | 低(ms 级) | 中 | 低 | 中 |
| 可靠性 | 中 | 高 | 高 | 高 |
| 消息堆积 | 内存限制 | 磁盘存储 | 内存限制 | 磁盘存储 |
| 事务消息 | 不支持 | 不支持 | 不支持 | 支持 |
| 顺序消息 | 支持 | 支持 | 支持 | 支持 |
| 延迟消息 | 不支持 | 不支持 | 支持 | 支持 |
| 适用场景 | 轻量级队列 | 日志收集、大数据 | 复杂路由 | 电商、金融 |
选型建议:
flowchart TD
A[消息队列选型] --> B{消息量级?}
B -->|万级以下 | C[Redis Stream]
B -->|十万级以上 | D{消息类型?}
D -->|日志/大数据 | E[Kafka]
D -->|业务消息 | F{功能需求?}
F -->|事务/延迟 | G[RocketMQ]
F -->|复杂路由 | H[RabbitMQ]
三、生产实践建议
3.1 缓存设计原则
-
数据一致性
- 能接受最终一致性:Cache-Aside
- 强一致性要求:Write-Through 或 直接读 DB
-
缓存穿透防护
# 布隆过滤器 + 缓存空值 def get_with_bloom_filter(key): if not bloom_filter.contains(key): return None # 肯定不存在 cached = r.get(key) if cached == "NULL": return None # 缓存的空值 if cached: return cached # 查 DB data = db.query(key) if data: r.setex(key, 300, data) else: r.setex(key, 60, "NULL") # 缓存空值 return data -
缓存雪崩防护
# 随机过期时间 def set_with_jitter(key, value, base_ttl=300): jitter = random.randint(0, 300) r.setex(key, base_ttl + jitter, value) -
缓存击穿防护
# 互斥锁 def get_with_mutex(key): cached = r.get(key) if cached: return cached # 获取分布式锁 lock_key = f"lock:{key}" if r.set(lock_key, "1", nx=True, ex=10): try: # 双重检查 cached = r.get(key) if cached: return cached data = db.query(key) r.setex(key, 300, data) return data finally: r.delete(lock_key) else: time.sleep(0.05) return get_with_mutex(key) # 重试
3.2 Key 命名规范
推荐格式:业务名:表名:ID:字段
示例:
- user:info:1001 # 用户信息
- order:detail:2026040701 # 订单详情
- article:views:888 # 文章阅读量
- cart:items:user1001 # 购物车
3.3 内存优化策略
| 策略 | 说明 | 效果 |
|---|---|---|
| 使用 Hash 代替多个 String | 相关字段聚合存储 | 节省 30-50% 内存 |
| 开启内存压缩 | hash-max-ziplist-entries | 节省 20-30% 内存 |
| 设置合理过期时间 | 避免内存无限增长 | 防止 OOM |
| 定期清理冷数据 | 使用 SCAN 命令 | 释放内存 |
四、总结
场景选择速查表
| 需求 | 推荐方案 | 核心命令/数据结构 |
|---|---|---|
| 缓存 | String + 过期时间 | SETEX、GET |
| 分布式锁 | String + Lua 脚本 | SET NX EX、EVAL |
| 消息队列 | Stream/List | XADD/XREAD、LPUSH/BRPOP |
| Session 存储 | Hash + 过期时间 | HMSET、EXPIRE |
| 排行榜 | ZSet | ZADD、ZREVRANK |
| 计数器 | String | INCR、INCRBY |
| 限流 | ZSet/Lua 脚本 | ZADD、EVAL |
| 去重统计 | HyperLogLog | PFADD、PFCOUNT |
| 地理位置 | GEO | GEOADD、GEORADIUS |
| 实时消息 | Pub/Sub | PUBLISH、SUBSCRIBE |
选型决策树
flowchart LR
A[需求分析] --> B{需要持久化?}
B -->|是 | C[Redis]
B -->|否 | D{数据结构复杂度?}
D -->|简单 KV| E[Memcached]
D -->|复杂 | C
C --> F{消息队列需求?}
F -->|是 | G{可靠性要求?}
G -->|高 | H[Kafka/RocketMQ]
G -->|中 | I[Redis Stream]
F -->|否 | J[纯缓存场景]
参考资料
- Redis 官方文档 - Use Cases
- 《Redis 设计与实现》
- Redisson 官方文档
- 阿里云 Tair 技术文档