Skip to content
清晨的一缕阳光
返回

Redis 会话管理实战

Redis 会话管理实战

会话管理是 Web 应用的核心功能。Redis 凭借高性能和过期机制,成为会话存储的理想选择。本文将深入 Redis 会话管理的实现方案。

一、Session 存储

1.1 基础实现

import redis
import uuid
import json

class SessionManager:
    def __init__(self, redis_client, prefix="session", ttl=3600):
        self.redis = redis_client
        self.prefix = prefix
        self.ttl = ttl
    
    def create(self, user_id, data=None):
        """创建会话"""
        session_id = str(uuid.uuid4())
        key = f"{self.prefix}:{session_id}"
        
        session_data = {
            'user_id': user_id,
            'data': data or {},
            'created_at': time.time()
        }
        
        self.redis.setex(key, self.ttl, json.dumps(session_data))
        return session_id
    
    def get(self, session_id):
        """获取会话"""
        key = f"{self.prefix}:{session_id}"
        data = self.redis.get(key)
        
        if data:
            session = json.loads(data)
            # 续期
            self.redis.expire(key, self.ttl)
            return session
        return None
    
    def update(self, session_id, data):
        """更新会话"""
        session = self.get(session_id)
        if session:
            session['data'].update(data)
            session['updated_at'] = time.time()
            key = f"{self.prefix}:{session_id}"
            self.redis.setex(key, self.ttl, json.dumps(session))
            return True
        return False
    
    def delete(self, session_id):
        """删除会话"""
        key = f"{self.prefix}:{session_id}"
        return self.redis.delete(key)
    
    def refresh(self, session_id):
        """刷新会话(续期)"""
        key = f"{self.prefix}:{session_id}"
        return self.redis.expire(key, self.ttl)

# 使用示例
session_mgr = SessionManager(redis, ttl=7200)

# 创建会话
session_id = session_mgr.create(user_id=1001, data={'role': 'admin'})

# 获取会话
session = session_mgr.get(session_id)

# 更新会话
session_mgr.update(session_id, {'last_login': time.time()})

# 删除会话(登出)
session_mgr.delete(session_id)

1.2 Flask 集成

from flask import Flask, session
from flask_session import Session

app = Flask(__name__)

# 配置 Redis Session
app.config['SECRET_KEY'] = 'your_secret_key'
app.config['SESSION_TYPE'] = 'redis'
app.config['SESSION_PERMANENT'] = True
app.config['SESSION_USE_SIGNER'] = True
app.config['PERMANENT_SESSION_LIFETIME'] = 3600
app.config['SESSION_REDIS'] = redis.Redis(host='localhost', port=6379)

Session(app)

@app.route('/login', methods=['POST'])
def login():
    # 验证用户
    user = authenticate(request.form['username'], request.form['password'])
    if user:
        session['user_id'] = user['id']
        session['username'] = user['username']
        return {'status': 'success'}
    return {'status': 'error'}, 401

@app.route('/profile')
def profile():
    if 'user_id' not in session:
        return {'status': 'unauthorized'}, 401
    
    user_id = session['user_id']
    return {'user_id': user_id}

@app.route('/logout', methods=['POST'])
def logout():
    session.clear()
    return {'status': 'success'}

1.3 Django 集成

# settings.py
SESSION_ENGINE = 'django.contrib.sessions.backends.cache'
SESSION_CACHE_ALIAS = 'redis'
SESSION_COOKIE_AGE = 3600
SESSION_SAVE_EVERY_REQUEST = True

CACHES = {
    'redis': {
        'BACKEND': 'django_redis.cache.RedisCache',
        'LOCATION': 'redis://127.0.0.1:6379/1',
        'OPTIONS': {
            'CLIENT_CLASS': 'django_redis.client.DefaultClient',
            'PASSWORD': 'your_password',
        }
    }
}

# views.py
from django.contrib.sessions.models import Session

def login_view(request):
    if request.method == 'POST':
        user = authenticate(request, **request.POST)
        if user:
            login(request, user)
            request.session['user_id'] = user.id
            return redirect('profile')
    return render(request, 'login.html')

def logout_view(request):
    logout(request)
    return redirect('login')

二、Token 管理

2.1 JWT + Redis

import jwt
import time
from datetime import datetime, timedelta

class TokenManager:
    def __init__(self, redis_client, secret_key, prefix="token", ttl=7200):
        self.redis = redis_client
        self.secret_key = secret_key
        self.prefix = prefix
        self.ttl = ttl
    
    def generate(self, user_id, extra_data=None):
        """生成 Token"""
        payload = {
            'user_id': user_id,
            'exp': datetime.utcnow() + timedelta(seconds=self.ttl),
            'iat': datetime.utcnow(),
            **(extra_data or {})
        }
        
        token = jwt.encode(payload, self.secret_key, algorithm='HS256')
        
        # 存储 Token(支持注销)
        key = f"{self.prefix}:{token}"
        self.redis.setex(key, self.ttl, user_id)
        
        return token
    
    def verify(self, token):
        """验证 Token"""
        try:
            payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
            
            # 检查 Token 是否在黑名单
            key = f"{self.prefix}:{token}"
            if not self.redis.exists(key):
                return None
            
            return payload
        except jwt.ExpiredSignatureError:
            return None
        except jwt.InvalidTokenError:
            return None
    
    def revoke(self, token):
        """注销 Token"""
        key = f"{self.prefix}:{token}"
        self.redis.delete(key)
    
    def revoke_all(self, user_id):
        """注销用户所有 Token"""
        # 查找用户所有 Token(需要额外存储映射)
        key = f"user_tokens:{user_id}"
        tokens = self.redis.smembers(key)
        for token in tokens:
            self.revoke(token.decode())
        self.redis.delete(key)

# 使用示例
token_mgr = TokenManager(redis, secret_key='your_secret_key')

# 生成 Token
token = token_mgr.generate(user_id=1001, extra_data={'role': 'admin'})

# 验证 Token
payload = token_mgr.verify(token)
if payload:
    user_id = payload['user_id']

# 注销 Token(登出)
token_mgr.revoke(token)

# 注销所有 Token(强制下线)
token_mgr.revoke_all(user_id=1001)

2.2 刷新 Token

class RefreshTokenManager:
    def __init__(self, redis_client, secret_key, access_ttl=3600, refresh_ttl=604800):
        self.redis = redis_client
        self.secret_key = secret_key
        self.access_ttl = access_ttl
        self.refresh_ttl = refresh_ttl
    
    def generate_tokens(self, user_id):
        """生成访问 Token 和刷新 Token"""
        # 访问 Token
        access_payload = {
            'user_id': user_id,
            'type': 'access',
            'exp': datetime.utcnow() + timedelta(seconds=self.access_ttl)
        }
        access_token = jwt.encode(access_payload, self.secret_key, algorithm='HS256')
        
        # 刷新 Token
        refresh_payload = {
            'user_id': user_id,
            'type': 'refresh',
            'exp': datetime.utcnow() + timedelta(seconds=self.refresh_ttl)
        }
        refresh_token = jwt.encode(refresh_payload, self.secret_key, algorithm='HS256')
        
        # 存储刷新 Token
        key = f"refresh_token:{refresh_token}"
        self.redis.setex(key, self.refresh_ttl, user_id)
        
        return {
            'access_token': access_token,
            'refresh_token': refresh_token
        }
    
    def refresh_access_token(self, refresh_token):
        """刷新访问 Token"""
        # 验证刷新 Token
        try:
            payload = jwt.decode(refresh_token, self.secret_key, algorithms=['HS256'])
            
            if payload.get('type') != 'refresh':
                return None
            
            # 检查刷新 Token 是否有效
            key = f"refresh_token:{refresh_token}"
            user_id = self.redis.get(key)
            
            if not user_id:
                return None
            
            # 生成新的访问 Token
            new_access_payload = {
                'user_id': user_id.decode(),
                'type': 'access',
                'exp': datetime.utcnow() + timedelta(seconds=self.access_ttl)
            }
            new_access_token = jwt.encode(new_access_payload, self.secret_key, algorithm='HS256')
            
            return {'access_token': new_access_token}
        except:
            return None

# 使用示例
refresh_mgr = RefreshTokenManager(redis, secret_key='your_secret_key')

# 登录
tokens = refresh_mgr.generate_tokens(user_id=1001)

# 刷新 Token
new_tokens = refresh_mgr.refresh_access_token(tokens['refresh_token'])

三、分布式会话

3.1 多节点共享 Session

┌─────────┐    ┌─────────┐    ┌─────────┐
│  App 1  │    │  App 2  │    │  App 3  │
│ 192.168 │    │ 192.168 │    │ 192.168 │
│  .1.1   │    │  .1.2   │    │  .1.3   │
└────┬────┘    └────┬────┘    └────┬────┘
     │              │              │
     └──────────────┼──────────────┘

         ┌─────────────────┐
         │  Redis Cluster  │
         │  或 Sentinel    │
         └─────────────────┘

3.2 配置示例

# Flask-Session 配置
app.config['SESSION_REDIS'] = redis.Redis(
    host='redis-cluster.example.com',
    port=6379,
    password='your_password',
    db=0,
    socket_connect_timeout=5,
    socket_keepalive=True
)

# Django 配置
CACHES = {
    'redis': {
        'BACKEND': 'django_redis.cache.RedisCache',
        'LOCATION': [
            'redis://redis1:6379/1',
            'redis://redis2:6379/1',
            'redis://redis3:6379/1',
        ],
        'OPTIONS': {
            'CLIENT_CLASS': 'django_redis.client.ShardClient',
            'PASSWORD': 'your_password',
        }
    }
}

3.3 Session 绑定

# 基于用户 ID 绑定 Session 到特定 Redis 节点
class ShardedSessionManager:
    def __init__(self, redis_clients, prefix="session"):
        self.redis_clients = redis_clients
        self.prefix = prefix
    
    def get_redis(self, user_id):
        """根据用户 ID 获取 Redis 客户端"""
        index = hash(str(user_id)) % len(self.redis_clients)
        return self.redis_clients[index]
    
    def create(self, user_id, data=None):
        """创建会话"""
        redis_client = self.get_redis(user_id)
        session_id = str(uuid.uuid4())
        key = f"{self.prefix}:{session_id}"
        
        session_data = {
            'user_id': user_id,
            'data': data or {}
        }
        
        redis_client.setex(key, 3600, json.dumps(session_data))
        return session_id
    
    def get(self, session_id):
        """获取会话"""
        # 需要从所有节点查找(或额外存储映射)
        for redis_client in self.redis_clients:
            key = f"{self.prefix}:{session_id}"
            data = redis_client.get(key)
            if data:
                return json.loads(data)
        return None

四、安全加固

4.1 Session 固定攻击防护

def login(request):
    # 登录前生成新 Session
    old_session_id = request.session.session_key
    if old_session_id:
        # 迁移 Session 数据
        old_data = dict(request.session)
        request.session.flush()  # 删除旧 Session
    
    # 验证用户
    user = authenticate(**request.POST)
    if user:
        # 创建新 Session
        request.session['user_id'] = user.id
        request.session['username'] = user.username
        request.session.cycle_key()  # 生成新 Session ID
        return redirect('profile')

4.2 CSRF 防护

from flask_wtf import CSRFProtect

app = Flask(__name__)
app.config['SECRET_KEY'] = 'your_secret_key'

csrf = CSRFProtect(app)

@app.route('/transfer', methods=['POST'])
def transfer():
    # 自动验证 CSRF Token
    amount = request.form['amount']
    # ...

4.3 会话劫持防护

class SecureSessionManager:
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def create(self, request, user_id):
        """创建安全会话"""
        session_id = str(uuid.uuid4())
        key = f"session:{session_id}"
        
        # 存储额外信息用于验证
        session_data = {
            'user_id': user_id,
            'ip_address': request.remote_addr,
            'user_agent': request.headers.get('User-Agent'),
            'created_at': time.time()
        }
        
        self.redis.setex(key, 3600, json.dumps(session_data))
        return session_id
    
    def verify(self, request, session_id):
        """验证会话"""
        key = f"session:{session_id}"
        data = self.redis.get(key)
        
        if not data:
            return None
        
        session = json.loads(data)
        
        # 验证 IP 地址(可选,可能因网络变化失效)
        # if session['ip_address'] != request.remote_addr:
        #     return None
        
        # 验证 User-Agent
        if session['user_agent'] != request.headers.get('User-Agent'):
            return None
        
        # 续期
        self.redis.expire(key, 3600)
        return session

五、性能优化

5.1 批量操作

# 批量获取 Session
def batch_get_sessions(session_ids):
    keys = [f"session:{sid}" for sid in session_ids]
    pipe = redis.pipeline()
    
    for key in keys:
        pipe.get(key)
    
    results = pipe.execute()
    return [json.loads(data) if data else None for data in results]

5.2 过期策略

# 惰性删除 + 定期删除
# Redis 默认策略

# 手动清理过期 Session
def cleanup_expired_sessions():
    # 扫描过期 Session
    cursor = 0
    while True:
        cursor, keys = redis.scan(cursor, match="session:*", count=100)
        
        for key in keys:
            ttl = redis.ttl(key)
            if ttl == -2:  # 已过期
                redis.delete(key)
        
        if cursor == 0:
            break

5.3 内存优化

# 使用 Hash 存储 Session
def create_session_hash(session_id, user_id, data):
    key = f"session:{session_id}"
    pipe = redis.pipeline()
    
    pipe.hset(key, mapping={
        'user_id': user_id,
        'data': json.dumps(data),
        'created_at': time.time()
    })
    pipe.expire(key, 3600)
    pipe.execute()

# 优势:
# 1. 更紧凑的存储
# 2. 支持字段级操作
# 3. 减少内存占用

六、监控与诊断

6.1 监控指标

# 监控 Session 数量
def get_session_count():
    cursor = 0
    count = 0
    
    while True:
        cursor, keys = redis.scan(cursor, match="session:*", count=100)
        count += len(keys)
        
        if cursor == 0:
            break
    
    return count

# 监控活跃用户
def get_active_users():
    cursor = 0
    users = set()
    
    while True:
        cursor, keys = redis.scan(cursor, match="session:*", count=100)
        
        for key in keys:
            data = redis.get(key)
            if data:
                session = json.loads(data)
                users.add(session['user_id'])
        
        if cursor == 0:
            break
    
    return len(users)

6.2 日志记录

import logging

logger = logging.getLogger('session')

class LoggedSessionManager:
    def __init__(self, redis_client):
        self.redis = redis_client
    
    def create(self, user_id):
        session_id = str(uuid.uuid4())
        logger.info(f"Session created: {session_id} for user {user_id}")
        # ...
        return session_id
    
    def delete(self, session_id):
        logger.info(f"Session deleted: {session_id}")
        # ...

七、最佳实践

7.1 配置建议

Session TTL:
- Web 应用:30 分钟 -2 小时
- 移动端:7-30 天
- 记住我:30 天

安全配置:
- 使用 HTTPS
- 启用 Session 加密
- 定期更换 Session ID

7.2 安全清单

✅ 推荐:
- 使用安全随机数生成 Session ID
- 启用 HTTPS
- 设置合理的过期时间
- 登录时更换 Session ID
- 实现 Token 注销机制

❌ 避免:
- Session ID 可预测
- 永久 Session
- 明文存储敏感信息
- 不验证 User-Agent/IP

7.3 性能检查

每日检查:
- [ ] Session 数量
- [ ] 内存使用
- [ ] 过期 Session 清理

每周检查:
- [ ] 活跃用户数
- [ ] Session 命中率
- [ ] Redis 性能指标

总结

Redis 会话管理核心要点:

方案适用场景安全性性能
SessionWeb 应用
JWTAPI/移动端最高
JWT+Redis高安全场景最高

最佳实践

  1. 使用 Redis 存储 Session
  2. 实现 Token 刷新机制
  3. 支持 Token 注销
  4. 分布式环境使用 Redis Cluster
  5. 监控 Session 指标
  6. 定期清理过期 Session

掌握会话管理,构建安全的用户认证系统!

参考资料


分享这篇文章到:

上一篇文章
Spring Cloud 微服务系列完整学习指南
下一篇文章
RocketMQ CommitLog 存储结构详解