Skip to content
清晨的一缕阳光
返回

Redis List 数据类型详解

Redis List 数据类型详解

List 是 Redis 最常用的线性数据结构,支持两端高效插入和删除。本文将深入 List 的底层快速列表结构,掌握消息队列等核心应用场景。

一、List 基础概念

1.1 什么是 List

List 是有序列表,元素可重复:

# 添加元素
LPUSH tasks "task1" "task2"
RPUSH tasks "task3" "task4"

# 获取元素
LRANGE tasks 0 -1
# 1) "task2"
# 2) "task1"
# 3) "task3"
# 4) "task4"

# 获取长度
LLEN tasks  # 4

1.2 应用场景

场景说明
消息队列生产者 - 消费者模式
时间线最新动态、Feed 流
栈/队列LIFO/FIFO 数据结构
限流滑动窗口计数

二、底层数据结构

2.1 演进历史

Redis 1.x-2.x: ziplist / linkedlist
Redis 3.2+:    quicklist (快速列表)
Redis 7.0+:    listpack (更紧凑)

2.2 快速列表(quicklist)

结构特点

quicklist = 双向链表 + ziplist

┌─────────┐    ┌─────────┐    ┌─────────┐
│ ziplist │ ←→ │ ziplist │ ←→ │ ziplist │
│ (节点 1) │    │ (节点 2) │    │ (节点 3) │
│ 10 个元素 │    │ 10 个元素 │    │ 5 个元素  │
└─────────┘    └─────────┘    └─────────┘

每个 ziplist 包含多个元素
节点之间通过双向链表连接

优势

  1. 内存紧凑(ziplist 连续存储)
  2. 操作高效(链表两端 O(1))
  3. 平衡内存与性能

2.3 配置参数

# Redis 配置
list-max-ziplist-size -2    # 每个 ziplist 最大大小
# -5: 每个节点 <= 64KB
# -4: 每个节点 <= 32KB
# -3: 每个节点 <= 16KB
# -2: 每个节点 <= 8KB  (默认)
# -1: 每个节点 <= 4KB
#  正数:元素个数

list-compress-depth 0       # 压缩深度
# 0: 不压缩
# 1: 压缩 1 个节点
# 2: 压缩 2 个节点

三、核心命令详解

3.1 添加元素

# 左侧添加(头插)
LPUSH key value [value ...]

# 右侧添加(尾插)
RPUSH key value [value ...]

# 示例
LPUSH tasks "task1"
RPUSH tasks "task2" "task3"
LRANGE tasks 0 -1
# 1) "task1"
# 2) "task2"
# 3) "task3"

3.2 获取元素

# 获取范围
LRANGE key start stop
LINDEX key index
LLEN key

# 示例
LRANGE tasks 0 2      # 前 3 个
LRANGE tasks -3 -1    # 最后 3 个
LINDEX tasks 0        # 第 1 个
LLEN tasks            # 总数量

3.3 删除元素

# 弹出元素
LPOP key              # 左侧弹出
RPOP key              # 右侧弹出
LPOP key count        # 弹出多个

# 删除指定值
LREM key count value
# count > 0: 从左到右删除 count 个
# count < 0: 从右到左删除 |count| 个
# count = 0: 删除所有

# 修剪列表
LTRIM key start stop  # 保留范围

# 示例
LREM tasks 2 "task1"  # 删除 2 个 task1
LTRIM tasks 0 9       # 保留前 10 个

3.4 修改元素

# 设置元素
LSET key index value

# 插入元素
LINSERT key BEFORE|AFTER pivot value

# 示例
LSET tasks 0 "new_task"
LINSERT tasks AFTER "task1" "task1.1"

四、消息队列实现

4.1 基础队列

# 生产者
RPUSH queue "message1" "message2"

# 消费者
LPOP queue  # 阻塞?不阻塞

# 问题:队列为空时返回 nil

4.2 阻塞队列

# 阻塞弹出
BLPOP key [key ...] timeout
BRPOP key [key ...] timeout

# 示例
BRPOP queue 0  # 阻塞直到有数据

# 多队列监听
BRPOP queue1 queue2 queue3 0
# 返回第一个有数据的队列

Python 实现

import redis

class MessageQueue:
    def __init__(self, redis_client, queue_name):
        self.redis = redis_client
        self.queue = queue_name
    
    def publish(self, message):
        """发布消息"""
        self.redis.rpush(self.queue, message)
    
    def consume(self, timeout=0):
        """消费消息(阻塞)"""
        result = self.redis.brpop(self.queue, timeout=timeout)
        if result:
            return result[1].decode()
        return None
    
    def consume_batch(self, count=10, timeout=0):
        """批量消费"""
        messages = []
        for _ in range(count):
            msg = self.consume(timeout=timeout if not messages else 0)
            if msg:
                messages.append(msg)
            else:
                break
        return messages

# 使用示例
mq = MessageQueue(redis, "task:queue")

# 生产者
mq.publish("task1")
mq.publish("task2")

# 消费者
while True:
    task = mq.consume(timeout=5)
    if task:
        process_task(task)

4.3 发布订阅模式

# 发布订阅(List 实现)
LPUSH notifications "msg1"
RPOPLPUSH notifications processing

# 处理完成后
LREM processing 1 "msg1"

# 如果处理失败
RPOPLPUSH processing notifications  # 放回队列

4.4 延迟队列

import time
import json

class DelayQueue:
    def __init__(self, redis_client, queue_name):
        self.redis = redis_client
        self.queue = queue_name
    
    def publish(self, message, delay_seconds):
        """发布延迟消息"""
        execute_time = int(time.time() * 1000) + delay_seconds * 1000
        task = {
            'message': message,
            'execute_time': execute_time
        }
        self.redis.zadd(self.queue, {json.dumps(task): execute_time})
    
    def consume(self):
        """消费到期的消息"""
        now = int(time.time() * 1000)
        
        # 获取到期任务
        tasks = self.redis.zrangebyscore(
            self.queue, 0, now, start=0, num=1
        )
        
        if tasks:
            task = tasks[0]
            # 删除任务
            self.redis.zrem(self.queue, task)
            return json.loads(task)['message']
        
        return None

# 使用示例
dq = DelayQueue(redis, "delay:queue")

# 发布 10 秒后执行的任务
dq.publish("send_email", 10)

# 消费
while True:
    task = dq.consume()
    if task:
        process(task)
    else:
        time.sleep(1)

五、高级应用

5.1 时间线 Feed 流

# 发布动态
LPUSH user:1001:feed "post1" "post2" "post3"

# 获取最新动态
LRANGE user:1001:feed 0 19  # 前 20 条

# 限制数量(保留最新 100 条)
LTRIM user:1001:feed 0 99

# 关注多用户
LPUSH user:1001:timeline "post_from_friend1"
LPUSH user:1001:timeline "post_from_friend2"

5.2 滑动窗口限流

import time

class RateLimiter:
    def __init__(self, redis_client, key, max_requests, window_seconds):
        self.redis = redis_client
        self.key = key
        self.max_requests = max_requests
        self.window = window_seconds
    
    def is_allowed(self):
        """检查是否允许请求"""
        now = time.time()
        pipe = self.redis.pipeline()
        
        # 1. 删除过期记录
        pipe.zremrangebyscore(self.key, 0, now - self.window)
        
        # 2. 添加当前请求
        pipe.zadd(self.key, {str(now): now})
        
        # 3. 设置过期时间
        pipe.expire(self.key, self.window)
        
        # 4. 统计请求数
        pipe.zcard(self.key)
        
        results = pipe.execute()
        count = results[3]
        
        return count <= self.max_requests

# 使用示例
limiter = RateLimiter(redis, "rate:user:1001", 100, 60)

if limiter.is_allowed():
    process_request()
else:
    return "Too many requests"

5.3 栈和队列

# 栈(LIFO)
LPUSH stack "a" "b" "c"
LPOP stack  # "c"
LPOP stack  # "b"

# 队列(FIFO)
RPUSH queue "a" "b" "c"
LPOP queue  # "a"
LPOP queue  # "b"

六、性能优化

6.1 批量操作

# ❌ 不推荐:逐个操作
LPUSH key "a"
LPUSH key "b"
LPUSH key "c"

# ✅ 推荐:批量操作
LPUSH key "a" "b" "c"

# 或使用 Pipeline
PIPELINE
  LPUSH key "a"
  LPUSH key "b"
  LPUSH key "c"
END

6.2 内存优化

# 控制列表长度
LTRIM key 0 999  # 保留前 1000 个

# 避免大 List
# ❌ 不推荐:单个 List 超过 10 万
# ✅ 推荐:分片存储
user:1001:feed:0
user:1001:feed:1
user:1001:feed:2

6.3 监控指标

# 查看 List 信息
LLEN key
MEMORY USAGE key

# 监控大 List
redis-cli --bigkeys | grep list

# 慢查询监控
SLOWLOG GET 10

七、常见问题

Q1: List 最大长度?

理论最大值:2^32 - 1(约 40 亿)
实际建议:< 10 万

Q2: 如何清空 List?

# 方式 1: 删除
DEL key

# 方式 2: 修剪
LTRIM key 1 0  # 保留空范围

# 性能:DEL 更快

Q3: List vs Stream?

List:
- 简单消息队列
- 无 ACK 机制
- 适用于任务队列

Stream:
- 专业消息队列
- 支持 ACK
- 支持消费者组
- 适用于复杂场景

Q4: 如何反转 List?

# 方式 1: 应用层
data = LRANGE key 0 -1
reversed = data[::-1]

# 方式 2: 新 List
LMOVE key newkey RIGHT LEFT

总结

Redis List 核心要点:

特性说明
底层结构quicklist(链表 + ziplist)
操作复杂度两端 O(1),中间 O(N)
最大长度2^32 - 1
典型应用消息队列、时间线、栈/队列

最佳实践

  1. 使用批量操作减少网络往返
  2. 控制 List 长度(LTRIM)
  3. 消息队列使用 BLPOP/BRPOP
  4. 大 List 考虑分片存储
  5. 定期监控大 Key

掌握 List,轻松实现消息队列、时间线等功能!

参考资料


分享这篇文章到:

上一篇文章
Prompt 质量评估体系构建
下一篇文章
MySQL CPU 与并发优化