Redis List 数据类型详解
List 是 Redis 最常用的线性数据结构,支持两端高效插入和删除。本文将深入 List 的底层快速列表结构,掌握消息队列等核心应用场景。
一、List 基础概念
1.1 什么是 List
List 是有序列表,元素可重复:
- 按插入顺序排序
- 支持两端操作(头部、尾部)
- 基于索引访问
# 添加元素
LPUSH tasks "task1" "task2"
RPUSH tasks "task3" "task4"
# 获取元素
LRANGE tasks 0 -1
# 1) "task2"
# 2) "task1"
# 3) "task3"
# 4) "task4"
# 获取长度
LLEN tasks # 4
1.2 应用场景
| 场景 | 说明 |
|---|---|
| 消息队列 | 生产者 - 消费者模式 |
| 时间线 | 最新动态、Feed 流 |
| 栈/队列 | LIFO/FIFO 数据结构 |
| 限流 | 滑动窗口计数 |
二、底层数据结构
2.1 演进历史
Redis 1.x-2.x: ziplist / linkedlist
Redis 3.2+: quicklist (快速列表)
Redis 7.0+: listpack (更紧凑)
2.2 快速列表(quicklist)
结构特点:
quicklist = 双向链表 + ziplist
┌─────────┐ ┌─────────┐ ┌─────────┐
│ ziplist │ ←→ │ ziplist │ ←→ │ ziplist │
│ (节点 1) │ │ (节点 2) │ │ (节点 3) │
│ 10 个元素 │ │ 10 个元素 │ │ 5 个元素 │
└─────────┘ └─────────┘ └─────────┘
每个 ziplist 包含多个元素
节点之间通过双向链表连接
优势:
- 内存紧凑(ziplist 连续存储)
- 操作高效(链表两端 O(1))
- 平衡内存与性能
2.3 配置参数
# Redis 配置
list-max-ziplist-size -2 # 每个 ziplist 最大大小
# -5: 每个节点 <= 64KB
# -4: 每个节点 <= 32KB
# -3: 每个节点 <= 16KB
# -2: 每个节点 <= 8KB (默认)
# -1: 每个节点 <= 4KB
# 正数:元素个数
list-compress-depth 0 # 压缩深度
# 0: 不压缩
# 1: 压缩 1 个节点
# 2: 压缩 2 个节点
三、核心命令详解
3.1 添加元素
# 左侧添加(头插)
LPUSH key value [value ...]
# 右侧添加(尾插)
RPUSH key value [value ...]
# 示例
LPUSH tasks "task1"
RPUSH tasks "task2" "task3"
LRANGE tasks 0 -1
# 1) "task1"
# 2) "task2"
# 3) "task3"
3.2 获取元素
# 获取范围
LRANGE key start stop
LINDEX key index
LLEN key
# 示例
LRANGE tasks 0 2 # 前 3 个
LRANGE tasks -3 -1 # 最后 3 个
LINDEX tasks 0 # 第 1 个
LLEN tasks # 总数量
3.3 删除元素
# 弹出元素
LPOP key # 左侧弹出
RPOP key # 右侧弹出
LPOP key count # 弹出多个
# 删除指定值
LREM key count value
# count > 0: 从左到右删除 count 个
# count < 0: 从右到左删除 |count| 个
# count = 0: 删除所有
# 修剪列表
LTRIM key start stop # 保留范围
# 示例
LREM tasks 2 "task1" # 删除 2 个 task1
LTRIM tasks 0 9 # 保留前 10 个
3.4 修改元素
# 设置元素
LSET key index value
# 插入元素
LINSERT key BEFORE|AFTER pivot value
# 示例
LSET tasks 0 "new_task"
LINSERT tasks AFTER "task1" "task1.1"
四、消息队列实现
4.1 基础队列
# 生产者
RPUSH queue "message1" "message2"
# 消费者
LPOP queue # 阻塞?不阻塞
# 问题:队列为空时返回 nil
4.2 阻塞队列
# 阻塞弹出
BLPOP key [key ...] timeout
BRPOP key [key ...] timeout
# 示例
BRPOP queue 0 # 阻塞直到有数据
# 多队列监听
BRPOP queue1 queue2 queue3 0
# 返回第一个有数据的队列
Python 实现:
import redis
class MessageQueue:
def __init__(self, redis_client, queue_name):
self.redis = redis_client
self.queue = queue_name
def publish(self, message):
"""发布消息"""
self.redis.rpush(self.queue, message)
def consume(self, timeout=0):
"""消费消息(阻塞)"""
result = self.redis.brpop(self.queue, timeout=timeout)
if result:
return result[1].decode()
return None
def consume_batch(self, count=10, timeout=0):
"""批量消费"""
messages = []
for _ in range(count):
msg = self.consume(timeout=timeout if not messages else 0)
if msg:
messages.append(msg)
else:
break
return messages
# 使用示例
mq = MessageQueue(redis, "task:queue")
# 生产者
mq.publish("task1")
mq.publish("task2")
# 消费者
while True:
task = mq.consume(timeout=5)
if task:
process_task(task)
4.3 发布订阅模式
# 发布订阅(List 实现)
LPUSH notifications "msg1"
RPOPLPUSH notifications processing
# 处理完成后
LREM processing 1 "msg1"
# 如果处理失败
RPOPLPUSH processing notifications # 放回队列
4.4 延迟队列
import time
import json
class DelayQueue:
def __init__(self, redis_client, queue_name):
self.redis = redis_client
self.queue = queue_name
def publish(self, message, delay_seconds):
"""发布延迟消息"""
execute_time = int(time.time() * 1000) + delay_seconds * 1000
task = {
'message': message,
'execute_time': execute_time
}
self.redis.zadd(self.queue, {json.dumps(task): execute_time})
def consume(self):
"""消费到期的消息"""
now = int(time.time() * 1000)
# 获取到期任务
tasks = self.redis.zrangebyscore(
self.queue, 0, now, start=0, num=1
)
if tasks:
task = tasks[0]
# 删除任务
self.redis.zrem(self.queue, task)
return json.loads(task)['message']
return None
# 使用示例
dq = DelayQueue(redis, "delay:queue")
# 发布 10 秒后执行的任务
dq.publish("send_email", 10)
# 消费
while True:
task = dq.consume()
if task:
process(task)
else:
time.sleep(1)
五、高级应用
5.1 时间线 Feed 流
# 发布动态
LPUSH user:1001:feed "post1" "post2" "post3"
# 获取最新动态
LRANGE user:1001:feed 0 19 # 前 20 条
# 限制数量(保留最新 100 条)
LTRIM user:1001:feed 0 99
# 关注多用户
LPUSH user:1001:timeline "post_from_friend1"
LPUSH user:1001:timeline "post_from_friend2"
5.2 滑动窗口限流
import time
class RateLimiter:
def __init__(self, redis_client, key, max_requests, window_seconds):
self.redis = redis_client
self.key = key
self.max_requests = max_requests
self.window = window_seconds
def is_allowed(self):
"""检查是否允许请求"""
now = time.time()
pipe = self.redis.pipeline()
# 1. 删除过期记录
pipe.zremrangebyscore(self.key, 0, now - self.window)
# 2. 添加当前请求
pipe.zadd(self.key, {str(now): now})
# 3. 设置过期时间
pipe.expire(self.key, self.window)
# 4. 统计请求数
pipe.zcard(self.key)
results = pipe.execute()
count = results[3]
return count <= self.max_requests
# 使用示例
limiter = RateLimiter(redis, "rate:user:1001", 100, 60)
if limiter.is_allowed():
process_request()
else:
return "Too many requests"
5.3 栈和队列
# 栈(LIFO)
LPUSH stack "a" "b" "c"
LPOP stack # "c"
LPOP stack # "b"
# 队列(FIFO)
RPUSH queue "a" "b" "c"
LPOP queue # "a"
LPOP queue # "b"
六、性能优化
6.1 批量操作
# ❌ 不推荐:逐个操作
LPUSH key "a"
LPUSH key "b"
LPUSH key "c"
# ✅ 推荐:批量操作
LPUSH key "a" "b" "c"
# 或使用 Pipeline
PIPELINE
LPUSH key "a"
LPUSH key "b"
LPUSH key "c"
END
6.2 内存优化
# 控制列表长度
LTRIM key 0 999 # 保留前 1000 个
# 避免大 List
# ❌ 不推荐:单个 List 超过 10 万
# ✅ 推荐:分片存储
user:1001:feed:0
user:1001:feed:1
user:1001:feed:2
6.3 监控指标
# 查看 List 信息
LLEN key
MEMORY USAGE key
# 监控大 List
redis-cli --bigkeys | grep list
# 慢查询监控
SLOWLOG GET 10
七、常见问题
Q1: List 最大长度?
理论最大值:2^32 - 1(约 40 亿)
实际建议:< 10 万
Q2: 如何清空 List?
# 方式 1: 删除
DEL key
# 方式 2: 修剪
LTRIM key 1 0 # 保留空范围
# 性能:DEL 更快
Q3: List vs Stream?
List:
- 简单消息队列
- 无 ACK 机制
- 适用于任务队列
Stream:
- 专业消息队列
- 支持 ACK
- 支持消费者组
- 适用于复杂场景
Q4: 如何反转 List?
# 方式 1: 应用层
data = LRANGE key 0 -1
reversed = data[::-1]
# 方式 2: 新 List
LMOVE key newkey RIGHT LEFT
总结
Redis List 核心要点:
| 特性 | 说明 |
|---|---|
| 底层结构 | quicklist(链表 + ziplist) |
| 操作复杂度 | 两端 O(1),中间 O(N) |
| 最大长度 | 2^32 - 1 |
| 典型应用 | 消息队列、时间线、栈/队列 |
最佳实践:
- 使用批量操作减少网络往返
- 控制 List 长度(LTRIM)
- 消息队列使用 BLPOP/BRPOP
- 大 List 考虑分片存储
- 定期监控大 Key
掌握 List,轻松实现消息队列、时间线等功能!