Redis 消息队列实现方案
Redis 可以用作轻量级消息队列,支持 List、Pub/Sub、Stream 三种方案。本文将深入对比这三种方案,帮助你选择合适的实现方式。
一、List 实现消息队列
1.1 基础实现
# 生产者
LPUSH queue "message1"
LPUSH queue "message2"
LPUSH queue "message3"
# 消费者
RPOP queue # 阻塞?不阻塞
问题:队列为空时返回 nil,需要轮询。
1.2 阻塞队列
# 阻塞弹出(推荐)
BRPOP queue 0 # 阻塞直到有数据
BLPOP queue 0 # 左侧弹出
# 多队列监听
BRPOP queue1 queue2 queue3 0
# 返回第一个有数据的队列
1.3 Python 实现
import redis
import json
import time
class MessageQueue:
def __init__(self, redis_client, queue_name):
self.redis = redis_client
self.queue = queue_name
def publish(self, message):
"""发布消息"""
self.redis.rpush(self.queue, json.dumps(message))
def consume(self, timeout=0):
"""消费消息(阻塞)"""
result = self.redis.brpop(self.queue, timeout=timeout)
if result:
return json.loads(result[1])
return None
def consume_batch(self, count=10, timeout=0):
"""批量消费"""
messages = []
for _ in range(count):
msg = self.consume(timeout=timeout if not messages else 0)
if msg:
messages.append(msg)
else:
break
return messages
# 使用示例
mq = MessageQueue(redis, "task:queue")
# 生产者
mq.publish({"task_id": 1, "action": "send_email"})
mq.publish({"task_id": 2, "action": "send_sms"})
# 消费者
while True:
task = mq.consume(timeout=5)
if task:
process_task(task)
else:
print("No tasks")
1.4 优缺点
优点:
- 实现简单
- 支持阻塞
- 性能好
缺点:
- 无 ACK 机制
- 消息可能丢失
- 无消费者组
适用场景:
- 简单任务队列
- 允许消息丢失
- 单消费者
二、Pub/Sub 实现消息队列
2.1 基础实现
# 订阅频道
SUBSCRIBE channel1
# 发布消息
PUBLISH channel1 "message"
2.2 Python 实现
import redis
import json
class PubSubQueue:
def __init__(self, redis_client, channel):
self.redis = redis_client
self.channel = channel
def publish(self, message):
"""发布消息"""
self.redis.publish(self.channel, json.dumps(message))
def subscribe(self):
"""订阅消息"""
pubsub = self.redis.pubsub()
pubsub.subscribe(self.channel)
for message in pubsub.listen():
if message['type'] == 'message':
yield json.loads(message['data'])
# 使用示例
# 生产者
ps = PubSubQueue(redis, "notifications")
ps.publish({"type": "order_created", "order_id": 1001})
# 消费者
ps = PubSubQueue(redis, "notifications")
for message in ps.subscribe():
print(f"Received: {message}")
process_message(message)
2.3 模式订阅
# 订阅多个频道
PSUBSCRIBE channel:*
# 发布
PUBLISH channel:user:1001 "message"
PUBLISH channel:order:1002 "message"
2.4 优缺点
优点:
- 发布订阅解耦
- 支持多订阅者
- 实时性高
缺点:
- 消息不持久
- 无 ACK 机制
- 订阅者离线丢失消息
适用场景:
- 实时通知
- 聊天室
- 配置更新广播
三、Stream 实现消息队列
3.1 基础概念
Stream 是 Redis 5.0+ 的专业消息队列:
- 消息持久化
- 支持 ACK
- 消费者组
- 消息回溯
3.2 基础操作
# 添加消息
XADD mystream * field1 value1 field2 value2
# 读取消息
XREAD COUNT 2 STREAMS mystream 0
# 阻塞读取
XREAD BLOCK 5000 COUNT 2 STREAMS mystream 0
# 删除消息
XDEL mystream message_id
# 截取 Stream
XTRIM mystream MAXLEN 1000
3.3 消费者组
# 创建消费者组
XGROUP CREATE mystream group1 0
# 消费消息
XREADGROUP GROUP group1 consumer1 COUNT 1 BLOCK 5000 STREAMS mystream >
# 确认消息
XACK mystream group1 message_id
# 查看待处理消息
XPENDING mystream group1
# 获取待处理消息详情
XPENDING mystream group1 - + 10 consumer1
3.4 Python 实现
import redis
import json
import time
class StreamQueue:
def __init__(self, redis_client, stream_name, group_name, consumer_name):
self.redis = redis_client
self.stream = stream_name
self.group = group_name
self.consumer = consumer_name
def publish(self, message):
"""发布消息"""
message_id = self.redis.xadd(
self.stream,
{"data": json.dumps(message)}
)
return message_id
def create_group(self):
"""创建消费者组"""
try:
self.redis.xgroup_create(
self.stream,
self.group,
id="0",
mkstream=True
)
except redis.exceptions.ResponseError:
# 组已存在
pass
def consume(self, count=1, block=5000):
"""消费消息"""
messages = self.redis.xreadgroup(
groupname=self.group,
consumername=self.consumer,
streams={self.stream: ">"},
count=count,
block=block
)
if messages:
for stream, msgs in messages:
for msg_id, msg in msgs:
yield {
'id': msg_id.decode(),
'data': json.loads(msg[b'data'])
}
def ack(self, message_id):
"""确认消息"""
self.redis.xack(self.stream, self.group, message_id)
# 使用示例
# 生产者
sq = StreamQueue(redis, "tasks", "workers", "worker1")
sq.publish({"task_id": 1, "action": "send_email"})
# 消费者
sq.create_group()
for message in sq.consume(count=1, block=5000):
print(f"Received: {message}")
process_task(message['data'])
sq.ack(message['id'])
3.5 消息回溯
# 从指定位置读取
XREAD STREAMS mystream 1234567890-0
# 从头开始
XREAD STREAMS mystream 0
# 从最新消息
XREAD STREAMS mystream $
3.6 待处理消息
# 查看待处理消息
XPENDING mystream group1
# 获取待处理详情
XPENDING mystream group1 - + 10
# 认领待处理消息
XCLAIM mystream group1 consumer2 3600000 message_id
3.7 优缺点
优点:
- 消息持久化
- 支持 ACK 机制
- 消费者组
- 消息回溯
- 高可靠性
缺点:
- Redis 5.0+ 才支持
- 内存占用较大
- 实现复杂
适用场景:
- 可靠消息队列
- 需要 ACK
- 多消费者
- 消息回溯需求
四、方案对比
4.1 功能对比
| 特性 | List | Pub/Sub | Stream |
|---|---|---|---|
| 持久化 | ✅ | ❌ | ✅ |
| ACK 机制 | ❌ | ❌ | ✅ |
| 消费者组 | ❌ | ❌ | ✅ |
| 消息回溯 | ❌ | ❌ | ✅ |
| 阻塞读取 | ✅ | ✅ | ✅ |
| 批量操作 | ✅ | ❌ | ✅ |
| Redis 版本 | 所有 | 所有 | 5.0+ |
4.2 性能对比
# 基准测试
# List: ~100k ops/sec
# Pub/Sub: ~200k ops/sec
# Stream: ~50k ops/sec
性能排序:
Pub/Sub > List > Stream
4.3 可靠性对比
可靠性排序:
Stream > List > Pub/Sub
4.4 选择指南
简单任务队列 → List
实时通知 → Pub/Sub
可靠消息队列 → Stream
大数据量 → Stream
允许丢失 → List/Pub/Sub
需要 ACK → Stream
多消费者 → Stream
五、高级应用
5.1 延迟队列
import time
class DelayQueue:
def __init__(self, redis_client, queue_name):
self.redis = redis_client
self.queue = queue_name
def publish(self, message, delay_seconds):
"""发布延迟消息"""
execute_time = int(time.time() * 1000) + delay_seconds * 1000
task = {
'message': json.dumps(message),
'execute_time': execute_time
}
self.redis.zadd(self.queue, {json.dumps(task): execute_time})
def consume(self):
"""消费到期的消息"""
now = int(time.time() * 1000)
tasks = self.redis.zrangebyscore(
self.queue, 0, now, start=0, num=1
)
if tasks:
task = tasks[0]
self.redis.zrem(self.queue, task)
return json.loads(task)['message']
return None
# 使用示例
dq = DelayQueue(redis, "delay:queue")
dq.publish({"task": "send_reminder"}, 3600) # 1 小时后
5.2 优先级队列
class PriorityQueue:
def __init__(self, redis_client, queue_name):
self.redis = redis_client
self.queue = queue_name
def publish(self, message, priority=10):
"""发布消息(优先级越小越优先)"""
self.redis.zadd(self.queue, {json.dumps(message): priority})
def consume(self):
"""消费最高优先级消息"""
tasks = self.redis.zpopmin(self.queue, count=1)
if tasks:
return json.loads(tasks[0][0])
return None
# 使用示例
pq = PriorityQueue(redis, "priority:queue")
pq.publish({"task": "normal"}, priority=10)
pq.publish({"task": "urgent"}, priority=1) # 更高优先级
task = pq.consume() # 先消费 urgent
5.3 死信队列
class DeadLetterQueue:
def __init__(self, redis_client, queue_name, dlq_name, max_retries=3):
self.redis = redis_client
self.queue = queue_name
self.dlq = dlq_name
self.max_retries = max_retries
def publish(self, message):
"""发布消息"""
msg = {
'data': json.dumps(message),
'retries': 0,
'last_error': ''
}
self.redis.lpush(self.queue, json.dumps(msg))
def consume(self, timeout=0):
"""消费消息"""
result = self.redis.brpop(self.queue, timeout=timeout)
if result:
return json.loads(result[1])
return None
def retry(self, message, error):
"""重试消息"""
retries = message.get('retries', 0) + 1
if retries >= self.max_retries:
# 移到死信队列
message['last_error'] = str(error)
self.redis.lpush(self.dlq, json.dumps(message))
else:
# 重新入队
message['retries'] = retries
self.redis.lpush(self.queue, json.dumps(message))
def get_dlq_messages(self):
"""获取死信队列消息"""
messages = self.redis.lrange(self.dlq, 0, -1)
return [json.loads(msg) for msg in messages]
# 使用示例
dlq = DeadLetterQueue(redis, "tasks", "tasks:dlq")
# 消费失败
try:
msg = dlq.consume(timeout=5)
process(msg)
except Exception as e:
dlq.retry(msg, e) # 重试或移到死信队列
六、最佳实践
6.1 List 最佳实践
# ✅ 推荐
# 1. 使用 BRPOP 阻塞消费
result = redis.brpop(queue, timeout=5)
# 2. 批量消费
pipe = redis.pipeline()
for _ in range(10):
pipe.rpop(queue)
messages = pipe.execute()
# 3. 设置超时
result = redis.brpop(queue, timeout=30)
# ❌ 避免
# 1. 轮询
while True:
result = redis.rpop(queue) # 空转 CPU
if result:
break
# 2. 不处理空值
result = redis.rpop(queue)
process(result) # 可能为 None
6.2 Pub/Sub 最佳实践
# ✅ 推荐
# 1. 使用模式订阅
pubsub.psubscribe("channel:*")
# 2. 处理重连
while True:
try:
for message in pubsub.listen():
process(message)
except redis.ConnectionError:
time.sleep(1)
pubsub.connect()
# ❌ 避免
# 1. 依赖消息不丢失
# 2. 不处理断线重连
6.3 Stream 最佳实践
# ✅ 推荐
# 1. 使用消费者组
redis.xgroup_create(stream, group, id="0", mkstream=True)
# 2. 确认消息
redis.xack(stream, group, message_id)
# 3. 处理待处理消息
pending = redis.xpending(stream, group)
if pending:
# 认领超时消息
redis.xclaim(stream, group, consumer, min_idle_time, message_id)
# 4. 限制 Stream 大小
redis.xtrim(stream, maxlen=10000)
# ❌ 避免
# 1. 不确认消息
# 2. 不处理待处理消息
# 3. 不限制 Stream 大小
七、常见问题
Q1: 消息丢失怎么办?
List: 使用 RPOPLPUSH 保证原子性
Stream: 使用 ACK 机制
Pub/Sub: 无法避免(设计如此)
Q2: 消费者故障怎么办?
List: 消息会重新入队
Stream: XCLAIM 认领待处理消息
Pub/Sub: 消息丢失
Q3: 如何保证消息顺序?
List: 天然有序
Stream: 消息 ID 有序
Pub/Sub: 不保证顺序
Q4: 如何限流?
# 使用信号量
semaphore = redis.Semaphore("limit", max_value=100)
with semaphore:
process_message()
总结
Redis 消息队列方案对比:
| 方案 | 适用场景 | 可靠性 | 性能 |
|---|---|---|---|
| List | 简单任务队列 | 中 | 高 |
| Pub/Sub | 实时通知 | 低 | 最高 |
| Stream | 可靠消息队列 | 高 | 中 |
选择建议:
- 简单场景 → List
- 实时通知 → Pub/Sub
- 可靠消息 → Stream
- 需要 ACK → Stream
- 多消费者 → Stream
掌握三种方案,灵活选择消息队列实现!