Redis 会话管理实战
会话管理是 Web 应用的核心功能。Redis 凭借高性能和过期机制,成为会话存储的理想选择。本文将深入 Redis 会话管理的实现方案。
一、Session 存储
1.1 基础实现
import redis
import uuid
import json
class SessionManager:
def __init__(self, redis_client, prefix="session", ttl=3600):
self.redis = redis_client
self.prefix = prefix
self.ttl = ttl
def create(self, user_id, data=None):
"""创建会话"""
session_id = str(uuid.uuid4())
key = f"{self.prefix}:{session_id}"
session_data = {
'user_id': user_id,
'data': data or {},
'created_at': time.time()
}
self.redis.setex(key, self.ttl, json.dumps(session_data))
return session_id
def get(self, session_id):
"""获取会话"""
key = f"{self.prefix}:{session_id}"
data = self.redis.get(key)
if data:
session = json.loads(data)
# 续期
self.redis.expire(key, self.ttl)
return session
return None
def update(self, session_id, data):
"""更新会话"""
session = self.get(session_id)
if session:
session['data'].update(data)
session['updated_at'] = time.time()
key = f"{self.prefix}:{session_id}"
self.redis.setex(key, self.ttl, json.dumps(session))
return True
return False
def delete(self, session_id):
"""删除会话"""
key = f"{self.prefix}:{session_id}"
return self.redis.delete(key)
def refresh(self, session_id):
"""刷新会话(续期)"""
key = f"{self.prefix}:{session_id}"
return self.redis.expire(key, self.ttl)
# 使用示例
session_mgr = SessionManager(redis, ttl=7200)
# 创建会话
session_id = session_mgr.create(user_id=1001, data={'role': 'admin'})
# 获取会话
session = session_mgr.get(session_id)
# 更新会话
session_mgr.update(session_id, {'last_login': time.time()})
# 删除会话(登出)
session_mgr.delete(session_id)
1.2 Flask 集成
from flask import Flask, session
from flask_session import Session
app = Flask(__name__)
# 配置 Redis Session
app.config['SECRET_KEY'] = 'your_secret_key'
app.config['SESSION_TYPE'] = 'redis'
app.config['SESSION_PERMANENT'] = True
app.config['SESSION_USE_SIGNER'] = True
app.config['PERMANENT_SESSION_LIFETIME'] = 3600
app.config['SESSION_REDIS'] = redis.Redis(host='localhost', port=6379)
Session(app)
@app.route('/login', methods=['POST'])
def login():
# 验证用户
user = authenticate(request.form['username'], request.form['password'])
if user:
session['user_id'] = user['id']
session['username'] = user['username']
return {'status': 'success'}
return {'status': 'error'}, 401
@app.route('/profile')
def profile():
if 'user_id' not in session:
return {'status': 'unauthorized'}, 401
user_id = session['user_id']
return {'user_id': user_id}
@app.route('/logout', methods=['POST'])
def logout():
session.clear()
return {'status': 'success'}
1.3 Django 集成
# settings.py
SESSION_ENGINE = 'django.contrib.sessions.backends.cache'
SESSION_CACHE_ALIAS = 'redis'
SESSION_COOKIE_AGE = 3600
SESSION_SAVE_EVERY_REQUEST = True
CACHES = {
'redis': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': 'redis://127.0.0.1:6379/1',
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
'PASSWORD': 'your_password',
}
}
}
# views.py
from django.contrib.sessions.models import Session
def login_view(request):
if request.method == 'POST':
user = authenticate(request, **request.POST)
if user:
login(request, user)
request.session['user_id'] = user.id
return redirect('profile')
return render(request, 'login.html')
def logout_view(request):
logout(request)
return redirect('login')
二、Token 管理
2.1 JWT + Redis
import jwt
import time
from datetime import datetime, timedelta
class TokenManager:
def __init__(self, redis_client, secret_key, prefix="token", ttl=7200):
self.redis = redis_client
self.secret_key = secret_key
self.prefix = prefix
self.ttl = ttl
def generate(self, user_id, extra_data=None):
"""生成 Token"""
payload = {
'user_id': user_id,
'exp': datetime.utcnow() + timedelta(seconds=self.ttl),
'iat': datetime.utcnow(),
**(extra_data or {})
}
token = jwt.encode(payload, self.secret_key, algorithm='HS256')
# 存储 Token(支持注销)
key = f"{self.prefix}:{token}"
self.redis.setex(key, self.ttl, user_id)
return token
def verify(self, token):
"""验证 Token"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
# 检查 Token 是否在黑名单
key = f"{self.prefix}:{token}"
if not self.redis.exists(key):
return None
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
def revoke(self, token):
"""注销 Token"""
key = f"{self.prefix}:{token}"
self.redis.delete(key)
def revoke_all(self, user_id):
"""注销用户所有 Token"""
# 查找用户所有 Token(需要额外存储映射)
key = f"user_tokens:{user_id}"
tokens = self.redis.smembers(key)
for token in tokens:
self.revoke(token.decode())
self.redis.delete(key)
# 使用示例
token_mgr = TokenManager(redis, secret_key='your_secret_key')
# 生成 Token
token = token_mgr.generate(user_id=1001, extra_data={'role': 'admin'})
# 验证 Token
payload = token_mgr.verify(token)
if payload:
user_id = payload['user_id']
# 注销 Token(登出)
token_mgr.revoke(token)
# 注销所有 Token(强制下线)
token_mgr.revoke_all(user_id=1001)
2.2 刷新 Token
class RefreshTokenManager:
def __init__(self, redis_client, secret_key, access_ttl=3600, refresh_ttl=604800):
self.redis = redis_client
self.secret_key = secret_key
self.access_ttl = access_ttl
self.refresh_ttl = refresh_ttl
def generate_tokens(self, user_id):
"""生成访问 Token 和刷新 Token"""
# 访问 Token
access_payload = {
'user_id': user_id,
'type': 'access',
'exp': datetime.utcnow() + timedelta(seconds=self.access_ttl)
}
access_token = jwt.encode(access_payload, self.secret_key, algorithm='HS256')
# 刷新 Token
refresh_payload = {
'user_id': user_id,
'type': 'refresh',
'exp': datetime.utcnow() + timedelta(seconds=self.refresh_ttl)
}
refresh_token = jwt.encode(refresh_payload, self.secret_key, algorithm='HS256')
# 存储刷新 Token
key = f"refresh_token:{refresh_token}"
self.redis.setex(key, self.refresh_ttl, user_id)
return {
'access_token': access_token,
'refresh_token': refresh_token
}
def refresh_access_token(self, refresh_token):
"""刷新访问 Token"""
# 验证刷新 Token
try:
payload = jwt.decode(refresh_token, self.secret_key, algorithms=['HS256'])
if payload.get('type') != 'refresh':
return None
# 检查刷新 Token 是否有效
key = f"refresh_token:{refresh_token}"
user_id = self.redis.get(key)
if not user_id:
return None
# 生成新的访问 Token
new_access_payload = {
'user_id': user_id.decode(),
'type': 'access',
'exp': datetime.utcnow() + timedelta(seconds=self.access_ttl)
}
new_access_token = jwt.encode(new_access_payload, self.secret_key, algorithm='HS256')
return {'access_token': new_access_token}
except:
return None
# 使用示例
refresh_mgr = RefreshTokenManager(redis, secret_key='your_secret_key')
# 登录
tokens = refresh_mgr.generate_tokens(user_id=1001)
# 刷新 Token
new_tokens = refresh_mgr.refresh_access_token(tokens['refresh_token'])
三、分布式会话
3.1 多节点共享 Session
┌─────────┐ ┌─────────┐ ┌─────────┐
│ App 1 │ │ App 2 │ │ App 3 │
│ 192.168 │ │ 192.168 │ │ 192.168 │
│ .1.1 │ │ .1.2 │ │ .1.3 │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
└──────────────┼──────────────┘
▼
┌─────────────────┐
│ Redis Cluster │
│ 或 Sentinel │
└─────────────────┘
3.2 配置示例
# Flask-Session 配置
app.config['SESSION_REDIS'] = redis.Redis(
host='redis-cluster.example.com',
port=6379,
password='your_password',
db=0,
socket_connect_timeout=5,
socket_keepalive=True
)
# Django 配置
CACHES = {
'redis': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': [
'redis://redis1:6379/1',
'redis://redis2:6379/1',
'redis://redis3:6379/1',
],
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.ShardClient',
'PASSWORD': 'your_password',
}
}
}
3.3 Session 绑定
# 基于用户 ID 绑定 Session 到特定 Redis 节点
class ShardedSessionManager:
def __init__(self, redis_clients, prefix="session"):
self.redis_clients = redis_clients
self.prefix = prefix
def get_redis(self, user_id):
"""根据用户 ID 获取 Redis 客户端"""
index = hash(str(user_id)) % len(self.redis_clients)
return self.redis_clients[index]
def create(self, user_id, data=None):
"""创建会话"""
redis_client = self.get_redis(user_id)
session_id = str(uuid.uuid4())
key = f"{self.prefix}:{session_id}"
session_data = {
'user_id': user_id,
'data': data or {}
}
redis_client.setex(key, 3600, json.dumps(session_data))
return session_id
def get(self, session_id):
"""获取会话"""
# 需要从所有节点查找(或额外存储映射)
for redis_client in self.redis_clients:
key = f"{self.prefix}:{session_id}"
data = redis_client.get(key)
if data:
return json.loads(data)
return None
四、安全加固
4.1 Session 固定攻击防护
def login(request):
# 登录前生成新 Session
old_session_id = request.session.session_key
if old_session_id:
# 迁移 Session 数据
old_data = dict(request.session)
request.session.flush() # 删除旧 Session
# 验证用户
user = authenticate(**request.POST)
if user:
# 创建新 Session
request.session['user_id'] = user.id
request.session['username'] = user.username
request.session.cycle_key() # 生成新 Session ID
return redirect('profile')
4.2 CSRF 防护
from flask_wtf import CSRFProtect
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your_secret_key'
csrf = CSRFProtect(app)
@app.route('/transfer', methods=['POST'])
def transfer():
# 自动验证 CSRF Token
amount = request.form['amount']
# ...
4.3 会话劫持防护
class SecureSessionManager:
def __init__(self, redis_client):
self.redis = redis_client
def create(self, request, user_id):
"""创建安全会话"""
session_id = str(uuid.uuid4())
key = f"session:{session_id}"
# 存储额外信息用于验证
session_data = {
'user_id': user_id,
'ip_address': request.remote_addr,
'user_agent': request.headers.get('User-Agent'),
'created_at': time.time()
}
self.redis.setex(key, 3600, json.dumps(session_data))
return session_id
def verify(self, request, session_id):
"""验证会话"""
key = f"session:{session_id}"
data = self.redis.get(key)
if not data:
return None
session = json.loads(data)
# 验证 IP 地址(可选,可能因网络变化失效)
# if session['ip_address'] != request.remote_addr:
# return None
# 验证 User-Agent
if session['user_agent'] != request.headers.get('User-Agent'):
return None
# 续期
self.redis.expire(key, 3600)
return session
五、性能优化
5.1 批量操作
# 批量获取 Session
def batch_get_sessions(session_ids):
keys = [f"session:{sid}" for sid in session_ids]
pipe = redis.pipeline()
for key in keys:
pipe.get(key)
results = pipe.execute()
return [json.loads(data) if data else None for data in results]
5.2 过期策略
# 惰性删除 + 定期删除
# Redis 默认策略
# 手动清理过期 Session
def cleanup_expired_sessions():
# 扫描过期 Session
cursor = 0
while True:
cursor, keys = redis.scan(cursor, match="session:*", count=100)
for key in keys:
ttl = redis.ttl(key)
if ttl == -2: # 已过期
redis.delete(key)
if cursor == 0:
break
5.3 内存优化
# 使用 Hash 存储 Session
def create_session_hash(session_id, user_id, data):
key = f"session:{session_id}"
pipe = redis.pipeline()
pipe.hset(key, mapping={
'user_id': user_id,
'data': json.dumps(data),
'created_at': time.time()
})
pipe.expire(key, 3600)
pipe.execute()
# 优势:
# 1. 更紧凑的存储
# 2. 支持字段级操作
# 3. 减少内存占用
六、监控与诊断
6.1 监控指标
# 监控 Session 数量
def get_session_count():
cursor = 0
count = 0
while True:
cursor, keys = redis.scan(cursor, match="session:*", count=100)
count += len(keys)
if cursor == 0:
break
return count
# 监控活跃用户
def get_active_users():
cursor = 0
users = set()
while True:
cursor, keys = redis.scan(cursor, match="session:*", count=100)
for key in keys:
data = redis.get(key)
if data:
session = json.loads(data)
users.add(session['user_id'])
if cursor == 0:
break
return len(users)
6.2 日志记录
import logging
logger = logging.getLogger('session')
class LoggedSessionManager:
def __init__(self, redis_client):
self.redis = redis_client
def create(self, user_id):
session_id = str(uuid.uuid4())
logger.info(f"Session created: {session_id} for user {user_id}")
# ...
return session_id
def delete(self, session_id):
logger.info(f"Session deleted: {session_id}")
# ...
七、最佳实践
7.1 配置建议
Session TTL:
- Web 应用:30 分钟 -2 小时
- 移动端:7-30 天
- 记住我:30 天
安全配置:
- 使用 HTTPS
- 启用 Session 加密
- 定期更换 Session ID
7.2 安全清单
✅ 推荐:
- 使用安全随机数生成 Session ID
- 启用 HTTPS
- 设置合理的过期时间
- 登录时更换 Session ID
- 实现 Token 注销机制
❌ 避免:
- Session ID 可预测
- 永久 Session
- 明文存储敏感信息
- 不验证 User-Agent/IP
7.3 性能检查
每日检查:
- [ ] Session 数量
- [ ] 内存使用
- [ ] 过期 Session 清理
每周检查:
- [ ] 活跃用户数
- [ ] Session 命中率
- [ ] Redis 性能指标
总结
Redis 会话管理核心要点:
| 方案 | 适用场景 | 安全性 | 性能 |
|---|---|---|---|
| Session | Web 应用 | 高 | 高 |
| JWT | API/移动端 | 中 | 最高 |
| JWT+Redis | 高安全场景 | 最高 | 高 |
最佳实践:
- 使用 Redis 存储 Session
- 实现 Token 刷新机制
- 支持 Token 注销
- 分布式环境使用 Redis Cluster
- 监控 Session 指标
- 定期清理过期 Session
掌握会话管理,构建安全的用户认证系统!