Skip to content
清晨的一缕阳光
返回

Redis 排行榜实现方案

Redis 排行榜实现方案

排行榜是游戏、社交、电商等应用的常见功能。Redis ZSet 凭借有序的分数和 O(log N) 的查询性能,成为实现排行榜的最佳选择。本文将深入排行榜的各种实现方案。

一、基础排行榜

1.1 基本实现

import redis

class Leaderboard:
    def __init__(self, redis_client, name):
        self.redis = redis_client
        self.name = name
    
    def add_member(self, member_id, score):
        """添加/更新成员分数"""
        self.redis.zadd(self.name, {member_id: score})
    
    def add_members(self, members):
        """批量添加成员"""
        # members = {'member1': 100, 'member2': 200}
        self.redis.zadd(self.name, members)
    
    def get_member_rank(self, member_id):
        """获取成员排名(从低到高)"""
        rank = self.redis.zrevrank(self.name, member_id)
        return rank + 1 if rank is not None else None
    
    def get_member_score(self, member_id):
        """获取成员分数"""
        score = self.redis.zscore(self.name, member_id)
        return float(score) if score else None
    
    def get_top(self, count=10):
        """获取前 N 名"""
        return self.redis.zrevrange(self.name, 0, count - 1, withscores=True)
    
    def get_around(self, member_id, count=5):
        """获取成员附近排名(前后各 count 名)"""
        rank = self.redis.zrevrank(self.name, member_id)
        if rank is None:
            return []
        
        start = max(0, rank - count)
        end = rank + count
        
        return self.redis.zrevrange(self.name, start, end, withscores=True)

# 使用示例
lb = Leaderboard(redis, "game:leaderboard")

# 添加分数
lb.add_member("player1", 1000)
lb.add_member("player2", 2000)
lb.add_member("player3", 1500)

# 获取排名
rank = lb.get_member_rank("player1")  # 3

# 获取前 10 名
top10 = lb.get_top(10)

# 获取附近排名
around = lb.get_around("player1", count=5)

1.2 完整示例

class GameLeaderboard:
    def __init__(self, redis_client, game_id):
        self.redis = redis_client
        self.key = f"game:{game_id}:leaderboard"
        self.user_info_key = f"game:{game_id}:users"
    
    def update_score(self, user_id, score, user_info=None):
        """更新分数"""
        pipe = self.redis.pipeline()
        
        # 更新 ZSet
        pipe.zadd(self.key, {user_id: score})
        
        # 存储用户信息
        if user_info:
            pipe.hset(self.user_info_key, user_id, json.dumps(user_info))
        
        pipe.execute()
    
    def get_leaderboard(self, page=1, page_size=10):
        """获取排行榜(分页)"""
        start = (page - 1) * page_size
        end = start + page_size - 1
        
        # 获取排名和分数
        leaderboard = self.redis.zrevrange(
            self.key, start, end, withscores=True
        )
        
        # 获取用户信息
        user_ids = [user_id for user_id, _ in leaderboard]
        user_infos = self.redis.hmget(self.user_info_key, *user_ids)
        
        # 组装结果
        result = []
        for i, (user_id, score) in enumerate(leaderboard):
            user_info = json.loads(user_infos[i]) if user_infos[i] else {}
            result.append({
                'rank': start + i + 1,
                'user_id': user_id.decode() if isinstance(user_id, bytes) else user_id,
                'score': score,
                'user_info': user_info
            })
        
        return result
    
    def get_user_rank(self, user_id):
        """获取用户排名"""
        rank = self.redis.zrevrank(self.key, user_id)
        if rank is None:
            return None
        
        score = self.redis.zscore(self.key, user_id)
        
        # 获取附近排名
        around = self.get_around(user_id, count=2)
        
        return {
            'rank': rank + 1,
            'score': float(score) if score else 0,
            'around': around
        }

# 使用示例
game_lb = GameLeaderboard(redis, game_id=1001)

# 更新分数
game_lb.update_score("user1001", 5000, {'username': 'Alice'})

# 获取排行榜
leaderboard = game_lb.get_leaderboard(page=1, page_size=10)

# 获取用户排名
user_rank = game_lb.get_user_rank("user1001")

二、分页查询

2.1 游标分页

class CursorLeaderboard:
    def __init__(self, redis_client, name):
        self.redis = redis_client
        self.name = name
    
    def get_page(self, cursor=None, page_size=10):
        """游标分页"""
        if cursor is None:
            # 第一页
            start = 0
            end = page_size - 1
        else:
            # 后续页
            start = cursor
            end = start + page_size - 1
        
        leaderboard = self.redis.zrevrange(
            self.name, start, end, withscores=True
        )
        
        if not leaderboard:
            return {'data': [], 'next_cursor': None}
        
        # 计算下次游标
        next_cursor = start + len(leaderboard)
        total = self.redis.zcard(self.name)
        
        return {
            'data': [
                {
                    'rank': start + i + 1,
                    'user_id': user_id.decode() if isinstance(user_id, bytes) else user_id,
                    'score': score
                }
                for i, (user_id, score) in enumerate(leaderboard)
            ],
            'next_cursor': next_cursor if next_cursor < total else None,
            'has_more': next_cursor < total
        }

# 使用示例
clb = CursorLeaderboard(redis, "game:leaderboard")

# 第一页
page1 = clb.get_page(page_size=10)

# 第二页
page2 = clb.get_page(cursor=page1['next_cursor'], page_size=10)

2.2 分数范围分页

class ScoreRangeLeaderboard:
    def __init__(self, redis_client, name):
        self.redis = redis_client
        self.name = name
    
    def get_by_score_range(self, min_score, max_score, page=1, page_size=10):
        """按分数范围查询"""
        start = (page - 1) * page_size
        end = start + page_size - 1
        
        # 按分数范围查询
        leaderboard = self.redis.zrevrangebyscore(
            self.name, max_score, min_score,
            start=start, num=page_size,
            withscores=True
        )
        
        return [
            {
                'user_id': user_id.decode() if isinstance(user_id, bytes) else user_id,
                'score': score
            }
            for user_id, score in leaderboard
        ]

# 使用示例
srlb = ScoreRangeLeaderboard(redis, "game:leaderboard")

# 查询 1000-2000 分的用户
players = srlb.get_by_score_range(1000, 2000, page=1, page_size=10)

三、实时更新

3.1 增量更新

class RealtimeLeaderboard:
    def __init__(self, redis_client, name):
        self.redis = redis_client
        self.name = name
    
    def increment_score(self, user_id, increment):
        """增量更新分数"""
        return self.redis.zincrby(self.name, increment, user_id)
    
    def batch_increment(self, increments):
        """批量增量更新"""
        # increments = {'user1': 10, 'user2': 20}
        pipe = self.redis.pipeline()
        
        for user_id, increment in increments.items():
            pipe.zincrby(self.name, increment, user_id)
        
        pipe.execute()

# 使用示例
rlb = RealtimeLeaderboard(redis, "game:leaderboard")

# 增加分数
new_score = rlb.increment_score("player1", 100)

# 批量增加
rlb.batch_increment({'player1': 50, 'player2': 100})

3.2 异步更新

import asyncio
from aioredis import from_url

class AsyncLeaderboard:
    def __init__(self, redis_url, name):
        self.redis = None
        self.redis_url = redis_url
        self.name = name
    
    async def connect(self):
        self.redis = await from_url(self.redis_url)
    
    async def update_score(self, user_id, score):
        """异步更新分数"""
        await self.redis.zadd(self.name, {user_id: score})
    
    async def get_rank(self, user_id):
        """异步获取排名"""
        rank = await self.redis.zrevrank(self.name, user_id)
        return rank + 1 if rank is not None else None
    
    async def get_top(self, count=10):
        """异步获取前 N 名"""
        return await self.redis.zrevrange(self.name, 0, count - 1, withscores=True)
    
    async def close(self):
        if self.redis:
            await self.redis.close()

# 使用示例
async def main():
    alb = AsyncLeaderboard("redis://localhost", "game:leaderboard")
    await alb.connect()
    
    # 更新分数
    await alb.update_score("player1", 1000)
    
    # 获取排名
    rank = await alb.get_rank("player1")
    
    # 获取前 10 名
    top10 = await alb.get_top(10)
    
    await alb.close()

asyncio.run(main())

四、多维度排行

4.1 多排行榜

class MultiLeaderboard:
    def __init__(self, redis_client, game_id):
        self.redis = redis_client
        self.game_id = game_id
    
    def get_key(self, dimension):
        """获取排行榜 Key"""
        return f"game:{self.game_id}:leaderboard:{dimension}"
    
    def update_scores(self, user_id, scores):
        """更新多维度分数"""
        # scores = {'total': 1000, 'pve': 500, 'pvp': 300}
        pipe = self.redis.pipeline()
        
        for dimension, score in scores.items():
            key = self.get_key(dimension)
            pipe.zadd(key, {user_id: score})
        
        pipe.execute()
    
    def get_rank(self, user_id, dimension):
        """获取指定维度排名"""
        key = self.get_key(dimension)
        rank = self.redis.zrevrank(key, user_id)
        return rank + 1 if rank is not None else None
    
    def get_top(self, dimension, count=10):
        """获取指定维度前 N 名"""
        key = self.get_key(dimension)
        return self.redis.zrevrange(key, 0, count - 1, withscores=True)

# 使用示例
mlb = MultiLeaderboard(redis, game_id=1001)

# 更新多维度分数
mlb.update_scores("user1001", {
    'total': 1000,
    'pve': 500,
    'pvp': 300,
    'achievement': 200
})

# 获取总排名
total_rank = mlb.get_rank("user1001", 'total')

# 获取 PVP 前 10 名
pvp_top10 = mlb.get_top('pvp', count=10)

4.2 综合评分

class CompositeLeaderboard:
    def __init__(self, redis_client, name):
        self.redis = redis_client
        self.name = name
    
    def calculate_score(self, user_data):
        """计算综合分数"""
        # 权重配置
        weights = {
            'score': 0.5,
            'level': 0.3,
            'achievement': 0.2
        }
        
        # 综合分数 = score * 0.5 + level * 0.3 * 100 + achievement * 0.2 * 10
        composite_score = (
            user_data['score'] * weights['score'] +
            user_data['level'] * weights['level'] * 100 +
            user_data['achievement'] * weights['achievement'] * 10
        )
        
        return composite_score
    
    def update_composite_score(self, user_id, user_data):
        """更新综合分数"""
        score = self.calculate_score(user_data)
        self.redis.zadd(self.name, {user_id: score})
        return score

# 使用示例
clb = CompositeLeaderboard(redis, "game:composite_leaderboard")

# 更新综合分数
score = clb.update_composite_score("user1001", {
    'score': 1000,
    'level': 50,
    'achievement': 100
})

五、时间维度排行

5.1 日榜/周榜/月榜

class TimeLeaderboard:
    def __init__(self, redis_client, prefix):
        self.redis = redis_client
        self.prefix = prefix
    
    def get_key(self, period):
        """获取排行榜 Key"""
        from datetime import datetime
        now = datetime.now()
        
        if period == 'daily':
            return f"{self.prefix}:daily:{now.strftime('%Y%m%d')}"
        elif period == 'weekly':
            return f"{self.prefix}:weekly:{now.strftime('%Y%W')}"
        elif period == 'monthly':
            return f"{self.prefix}:monthly:{now.strftime('%Y%m')}"
        elif period == 'all':
            return f"{self.prefix}:all"
    
    def add_score(self, user_id, score, period='all'):
        """添加分数"""
        key = self.get_key(period)
        self.redis.zincrby(key, score, user_id)
        
        # 设置过期时间
        if period == 'daily':
            self.redis.expire(key, 86400 * 2)  # 保留 2 天
        elif period == 'weekly':
            self.redis.expire(key, 86400 * 8)  # 保留 8 天
        elif period == 'monthly':
            self.redis.expire(key, 86400 * 32)  # 保留 32 天
    
    def get_top(self, period='all', count=10):
        """获取指定时间段前 N 名"""
        key = self.get_key(period)
        return self.redis.zrevrange(key, 0, count - 1, withscores=True)

# 使用示例
tlb = TimeLeaderboard(redis, "game")

# 添加分数(同时更新总榜和日榜)
tlb.add_score("user1001", 100, period='daily')
tlb.add_score("user1001", 100, period='all')

# 获取日榜前 10
daily_top10 = tlb.get_top('daily', count=10)

# 获取总榜前 10
all_top10 = tlb.get_top('all', count=10)

5.2 历史排行榜

class HistoryLeaderboard:
    def __init__(self, redis_client, prefix):
        self.redis = redis_client
        self.prefix = prefix
    
    def archive_leaderboard(self, period, date):
        """归档排行榜"""
        source_key = f"{self.prefix}:{period}:{date}"
        archive_key = f"{self.prefix}:archive:{period}:{date}"
        
        # 复制排行榜
        self.redis.zunionstore(archive_key, [source_key])
        
        # 设置长期过期
        self.redis.expire(archive_key, 86400 * 365)
    
    def get_history(self, period, date, count=10):
        """获取历史排行榜"""
        archive_key = f"{self.prefix}:archive:{period}:{date}"
        return self.redis.zrevrange(archive_key, 0, count - 1, withscores=True)

# 使用示例
hlb = HistoryLeaderboard(redis, "game")

# 归档昨日排行榜
hlb.archive_leaderboard('daily', '20240101')

# 获取历史排行榜
history = hlb.get_history('daily', '20240101', count=10)

六、性能优化

6.1 缓存优化

class CachedLeaderboard:
    def __init__(self, redis_client, name, cache_ttl=60):
        self.redis = redis_client
        self.name = name
        self.cache_ttl = cache_ttl
        self.cache_key = f"{name}:cache"
    
    def get_top_cached(self, count=10):
        """获取缓存的排行榜"""
        # 尝试从缓存获取
        cached = self.redis.get(self.cache_key)
        if cached:
            return json.loads(cached)
        
        # 从 ZSet 获取
        leaderboard = self.redis.zrevrange(
            self.name, 0, count - 1, withscores=True
        )
        
        # 转换为可序列化格式
        data = [
            {'user_id': user_id.decode(), 'score': score}
            for user_id, score in leaderboard
        ]
        
        # 缓存
        self.redis.setex(
            self.cache_key,
            self.cache_ttl,
            json.dumps(data)
        )
        
        return data

# 使用示例
clb = CachedLeaderboard(redis, "game:leaderboard", cache_ttl=60)

# 获取缓存的排行榜(60 秒内不查 ZSet)
top10 = clb.get_top_cached(count=10)

6.2 分批更新

def batch_update_leaderboard(updates, batch_size=100):
    """分批更新排行榜"""
    pipe = redis.pipeline()
    
    for i, (user_id, score) in enumerate(updates.items()):
        pipe.zadd("game:leaderboard", {user_id: score})
        
        # 每 batch_size 条执行一次
        if (i + 1) % batch_size == 0:
            pipe.execute()
            pipe = redis.pipeline()
    
    # 执行剩余
    if pipe.command_stack:
        pipe.execute()

七、最佳实践

7.1 配置建议

ZSet 配置:
- zset-max-ziplist-entries: 128
- zset-max-ziplist-value: 64

缓存配置:
- 热门排行榜缓存 60 秒
- 个人排名缓存 30 秒

过期策略:
- 日榜:2 天
- 周榜:8 天
- 月榜:32 天

7.2 监控指标

# 监控排行榜大小
def get_leaderboard_size(name):
    return redis.zcard(name)

# 监控查询延迟
import time

def timed_get_top(name, count=10):
    start = time.time()
    result = redis.zrevrange(name, 0, count - 1, withscores=True)
    elapsed = time.time() - start
    
    # 记录指标
    metrics.record('leaderboard.query.time', elapsed)
    
    return result

7.3 常见问题

问题解决方案
大排行榜性能慢分页查询、缓存结果
实时更新延迟异步更新、批量处理
内存占用高设置过期时间、定期清理
排名并列使用小数分数(如 100.1)

总结

Redis 排行榜核心要点:

功能命令复杂度
添加分数ZADD/ZINCRBYO(log N)
获取排名ZREVRANKO(log N)
获取范围ZREVRANGEO(log N + M)
分页查询ZREVRANGE+LIMITO(log N + M)

最佳实践

  1. 使用 ZSet 存储排行榜
  2. 分页查询避免全量加载
  3. 缓存热门排行榜
  4. 设置合理的过期时间
  5. 多维度使用多个 ZSet
  6. 异步更新提高性能

掌握排行榜实现,构建实时排名系统!

参考资料


分享这篇文章到:

上一篇文章
Prompt 测试框架详解
下一篇文章
以日为鉴:全民考公热的潮起与镜鉴