Skip to content
清晨的一缕阳光
返回

Redis 消息队列实现方案

Redis 消息队列实现方案

Redis 可以用作轻量级消息队列,支持 List、Pub/Sub、Stream 三种方案。本文将深入对比这三种方案,帮助你选择合适的实现方式。

一、List 实现消息队列

1.1 基础实现

# 生产者
LPUSH queue "message1"
LPUSH queue "message2"
LPUSH queue "message3"

# 消费者
RPOP queue  # 阻塞?不阻塞

问题:队列为空时返回 nil,需要轮询。

1.2 阻塞队列

# 阻塞弹出(推荐)
BRPOP queue 0  # 阻塞直到有数据
BLPOP queue 0  # 左侧弹出

# 多队列监听
BRPOP queue1 queue2 queue3 0
# 返回第一个有数据的队列

1.3 Python 实现

import redis
import json
import time

class MessageQueue:
    def __init__(self, redis_client, queue_name):
        self.redis = redis_client
        self.queue = queue_name
    
    def publish(self, message):
        """发布消息"""
        self.redis.rpush(self.queue, json.dumps(message))
    
    def consume(self, timeout=0):
        """消费消息(阻塞)"""
        result = self.redis.brpop(self.queue, timeout=timeout)
        if result:
            return json.loads(result[1])
        return None
    
    def consume_batch(self, count=10, timeout=0):
        """批量消费"""
        messages = []
        for _ in range(count):
            msg = self.consume(timeout=timeout if not messages else 0)
            if msg:
                messages.append(msg)
            else:
                break
        return messages

# 使用示例
mq = MessageQueue(redis, "task:queue")

# 生产者
mq.publish({"task_id": 1, "action": "send_email"})
mq.publish({"task_id": 2, "action": "send_sms"})

# 消费者
while True:
    task = mq.consume(timeout=5)
    if task:
        process_task(task)
    else:
        print("No tasks")

1.4 优缺点

优点

缺点

适用场景

二、Pub/Sub 实现消息队列

2.1 基础实现

# 订阅频道
SUBSCRIBE channel1

# 发布消息
PUBLISH channel1 "message"

2.2 Python 实现

import redis
import json

class PubSubQueue:
    def __init__(self, redis_client, channel):
        self.redis = redis_client
        self.channel = channel
    
    def publish(self, message):
        """发布消息"""
        self.redis.publish(self.channel, json.dumps(message))
    
    def subscribe(self):
        """订阅消息"""
        pubsub = self.redis.pubsub()
        pubsub.subscribe(self.channel)
        
        for message in pubsub.listen():
            if message['type'] == 'message':
                yield json.loads(message['data'])

# 使用示例
# 生产者
ps = PubSubQueue(redis, "notifications")
ps.publish({"type": "order_created", "order_id": 1001})

# 消费者
ps = PubSubQueue(redis, "notifications")
for message in ps.subscribe():
    print(f"Received: {message}")
    process_message(message)

2.3 模式订阅

# 订阅多个频道
PSUBSCRIBE channel:*

# 发布
PUBLISH channel:user:1001 "message"
PUBLISH channel:order:1002 "message"

2.4 优缺点

优点

缺点

适用场景

三、Stream 实现消息队列

3.1 基础概念

Stream 是 Redis 5.0+ 的专业消息队列:
- 消息持久化
- 支持 ACK
- 消费者组
- 消息回溯

3.2 基础操作

# 添加消息
XADD mystream * field1 value1 field2 value2

# 读取消息
XREAD COUNT 2 STREAMS mystream 0

# 阻塞读取
XREAD BLOCK 5000 COUNT 2 STREAMS mystream 0

# 删除消息
XDEL mystream message_id

# 截取 Stream
XTRIM mystream MAXLEN 1000

3.3 消费者组

# 创建消费者组
XGROUP CREATE mystream group1 0

# 消费消息
XREADGROUP GROUP group1 consumer1 COUNT 1 BLOCK 5000 STREAMS mystream >

# 确认消息
XACK mystream group1 message_id

# 查看待处理消息
XPENDING mystream group1

# 获取待处理消息详情
XPENDING mystream group1 - + 10 consumer1

3.4 Python 实现

import redis
import json
import time

class StreamQueue:
    def __init__(self, redis_client, stream_name, group_name, consumer_name):
        self.redis = redis_client
        self.stream = stream_name
        self.group = group_name
        self.consumer = consumer_name
    
    def publish(self, message):
        """发布消息"""
        message_id = self.redis.xadd(
            self.stream,
            {"data": json.dumps(message)}
        )
        return message_id
    
    def create_group(self):
        """创建消费者组"""
        try:
            self.redis.xgroup_create(
                self.stream,
                self.group,
                id="0",
                mkstream=True
            )
        except redis.exceptions.ResponseError:
            # 组已存在
            pass
    
    def consume(self, count=1, block=5000):
        """消费消息"""
        messages = self.redis.xreadgroup(
            groupname=self.group,
            consumername=self.consumer,
            streams={self.stream: ">"},
            count=count,
            block=block
        )
        
        if messages:
            for stream, msgs in messages:
                for msg_id, msg in msgs:
                    yield {
                        'id': msg_id.decode(),
                        'data': json.loads(msg[b'data'])
                    }
    
    def ack(self, message_id):
        """确认消息"""
        self.redis.xack(self.stream, self.group, message_id)

# 使用示例
# 生产者
sq = StreamQueue(redis, "tasks", "workers", "worker1")
sq.publish({"task_id": 1, "action": "send_email"})

# 消费者
sq.create_group()
for message in sq.consume(count=1, block=5000):
    print(f"Received: {message}")
    process_task(message['data'])
    sq.ack(message['id'])

3.5 消息回溯

# 从指定位置读取
XREAD STREAMS mystream 1234567890-0

# 从头开始
XREAD STREAMS mystream 0

# 从最新消息
XREAD STREAMS mystream $

3.6 待处理消息

# 查看待处理消息
XPENDING mystream group1

# 获取待处理详情
XPENDING mystream group1 - + 10

# 认领待处理消息
XCLAIM mystream group1 consumer2 3600000 message_id

3.7 优缺点

优点

缺点

适用场景

四、方案对比

4.1 功能对比

特性ListPub/SubStream
持久化
ACK 机制
消费者组
消息回溯
阻塞读取
批量操作
Redis 版本所有所有5.0+

4.2 性能对比

# 基准测试
# List: ~100k ops/sec
# Pub/Sub: ~200k ops/sec
# Stream: ~50k ops/sec

性能排序

Pub/Sub > List > Stream

4.3 可靠性对比

可靠性排序

Stream > List > Pub/Sub

4.4 选择指南

简单任务队列 → List
实时通知 → Pub/Sub
可靠消息队列 → Stream

大数据量 → Stream
允许丢失 → List/Pub/Sub
需要 ACK → Stream
多消费者 → Stream

五、高级应用

5.1 延迟队列

import time

class DelayQueue:
    def __init__(self, redis_client, queue_name):
        self.redis = redis_client
        self.queue = queue_name
    
    def publish(self, message, delay_seconds):
        """发布延迟消息"""
        execute_time = int(time.time() * 1000) + delay_seconds * 1000
        task = {
            'message': json.dumps(message),
            'execute_time': execute_time
        }
        self.redis.zadd(self.queue, {json.dumps(task): execute_time})
    
    def consume(self):
        """消费到期的消息"""
        now = int(time.time() * 1000)
        
        tasks = self.redis.zrangebyscore(
            self.queue, 0, now, start=0, num=1
        )
        
        if tasks:
            task = tasks[0]
            self.redis.zrem(self.queue, task)
            return json.loads(task)['message']
        
        return None

# 使用示例
dq = DelayQueue(redis, "delay:queue")
dq.publish({"task": "send_reminder"}, 3600)  # 1 小时后

5.2 优先级队列

class PriorityQueue:
    def __init__(self, redis_client, queue_name):
        self.redis = redis_client
        self.queue = queue_name
    
    def publish(self, message, priority=10):
        """发布消息(优先级越小越优先)"""
        self.redis.zadd(self.queue, {json.dumps(message): priority})
    
    def consume(self):
        """消费最高优先级消息"""
        tasks = self.redis.zpopmin(self.queue, count=1)
        if tasks:
            return json.loads(tasks[0][0])
        return None

# 使用示例
pq = PriorityQueue(redis, "priority:queue")
pq.publish({"task": "normal"}, priority=10)
pq.publish({"task": "urgent"}, priority=1)  # 更高优先级

task = pq.consume()  # 先消费 urgent

5.3 死信队列

class DeadLetterQueue:
    def __init__(self, redis_client, queue_name, dlq_name, max_retries=3):
        self.redis = redis_client
        self.queue = queue_name
        self.dlq = dlq_name
        self.max_retries = max_retries
    
    def publish(self, message):
        """发布消息"""
        msg = {
            'data': json.dumps(message),
            'retries': 0,
            'last_error': ''
        }
        self.redis.lpush(self.queue, json.dumps(msg))
    
    def consume(self, timeout=0):
        """消费消息"""
        result = self.redis.brpop(self.queue, timeout=timeout)
        if result:
            return json.loads(result[1])
        return None
    
    def retry(self, message, error):
        """重试消息"""
        retries = message.get('retries', 0) + 1
        if retries >= self.max_retries:
            # 移到死信队列
            message['last_error'] = str(error)
            self.redis.lpush(self.dlq, json.dumps(message))
        else:
            # 重新入队
            message['retries'] = retries
            self.redis.lpush(self.queue, json.dumps(message))
    
    def get_dlq_messages(self):
        """获取死信队列消息"""
        messages = self.redis.lrange(self.dlq, 0, -1)
        return [json.loads(msg) for msg in messages]

# 使用示例
dlq = DeadLetterQueue(redis, "tasks", "tasks:dlq")

# 消费失败
try:
    msg = dlq.consume(timeout=5)
    process(msg)
except Exception as e:
    dlq.retry(msg, e)  # 重试或移到死信队列

六、最佳实践

6.1 List 最佳实践

# ✅ 推荐
# 1. 使用 BRPOP 阻塞消费
result = redis.brpop(queue, timeout=5)

# 2. 批量消费
pipe = redis.pipeline()
for _ in range(10):
    pipe.rpop(queue)
messages = pipe.execute()

# 3. 设置超时
result = redis.brpop(queue, timeout=30)

# ❌ 避免
# 1. 轮询
while True:
    result = redis.rpop(queue)  # 空转 CPU
    if result:
        break

# 2. 不处理空值
result = redis.rpop(queue)
process(result)  # 可能为 None

6.2 Pub/Sub 最佳实践

# ✅ 推荐
# 1. 使用模式订阅
pubsub.psubscribe("channel:*")

# 2. 处理重连
while True:
    try:
        for message in pubsub.listen():
            process(message)
    except redis.ConnectionError:
        time.sleep(1)
        pubsub.connect()

# ❌ 避免
# 1. 依赖消息不丢失
# 2. 不处理断线重连

6.3 Stream 最佳实践

# ✅ 推荐
# 1. 使用消费者组
redis.xgroup_create(stream, group, id="0", mkstream=True)

# 2. 确认消息
redis.xack(stream, group, message_id)

# 3. 处理待处理消息
pending = redis.xpending(stream, group)
if pending:
    # 认领超时消息
    redis.xclaim(stream, group, consumer, min_idle_time, message_id)

# 4. 限制 Stream 大小
redis.xtrim(stream, maxlen=10000)

# ❌ 避免
# 1. 不确认消息
# 2. 不处理待处理消息
# 3. 不限制 Stream 大小

七、常见问题

Q1: 消息丢失怎么办?

List: 使用 RPOPLPUSH 保证原子性
Stream: 使用 ACK 机制
Pub/Sub: 无法避免(设计如此)

Q2: 消费者故障怎么办?

List: 消息会重新入队
Stream: XCLAIM 认领待处理消息
Pub/Sub: 消息丢失

Q3: 如何保证消息顺序?

List: 天然有序
Stream: 消息 ID 有序
Pub/Sub: 不保证顺序

Q4: 如何限流?

# 使用信号量
semaphore = redis.Semaphore("limit", max_value=100)

with semaphore:
    process_message()

总结

Redis 消息队列方案对比:

方案适用场景可靠性性能
List简单任务队列
Pub/Sub实时通知最高
Stream可靠消息队列

选择建议

掌握三种方案,灵活选择消息队列实现!

参考资料


分享这篇文章到:

上一篇文章
Redis Cluster 原理详解
下一篇文章
Agent 反思与自修正机制