企业级 RAG 系统架构设计
企业级 RAG 系统需要满足高可用、多租户、安全可控等要求。如何设计企业级 RAG 架构?如何保障系统稳定性?本文详解企业级 RAG 系统的完整架构设计。
一、架构概述
1.1 企业级要求
企业级 RAG 系统要求:
┌─────────────────────────────────────┐
│ 1. 高可用性 │
│ - 99.9%+ 可用性 │
│ - 故障自动恢复 │
│ - 数据备份恢复 │
├─────────────────────────────────────┤
│ 2. 多租户支持 │
│ - 租户隔离 │
│ - 资源配额 │
│ - 独立配置 │
├─────────────────────────────────────┤
│ 3. 安全可控 │
│ - 权限控制 │
│ - 数据加密 │
│ - 审计日志 │
├─────────────────────────────────────┤
│ 4. 可扩展性 │
│ - 水平扩展 │
│ - 模块化解耦 │
│ - 弹性伸缩 │
└─────────────────────────────────────┘
1.2 整体架构
graph TB
subgraph 接入层
A[API 网关]
B[负载均衡]
end
subgraph 应用层
C[RAG 服务集群]
D[用户服务]
E[权限服务]
end
subgraph 处理层
F[检索服务]
G[重排序服务]
H[生成服务]
end
subgraph 数据层
I[向量数据库]
J[文档存储]
K[缓存集群]
L[关系数据库]
end
A --> B
B --> C
B --> D
B --> E
C --> F
C --> G
C --> H
F --> I
F --> J
F --> K
H --> K
D --> L
E --> L
二、多租户架构
2.1 租户隔离方案
# multi_tenancy.py
from typing import Dict, List, Optional
from enum import Enum
class IsolationLevel(Enum):
"""隔离级别"""
SHARED = "shared" # 共享资源
ISOLATED = "isolated" # 逻辑隔离
DEDICATED = "dedicated" # 物理隔离
class Tenant:
"""租户"""
def __init__(
self,
tenant_id: str,
name: str,
isolation_level: IsolationLevel = IsolationLevel.ISOLATED
):
self.tenant_id = tenant_id
self.name = name
self.isolation_level = isolation_level
self.config: Dict = {}
self.quota: Dict = {}
self.users: List[str] = []
class TenantManager:
"""租户管理器"""
def __init__(self):
self.tenants: Dict[str, Tenant] = {}
def create_tenant(
self,
name: str,
isolation_level: IsolationLevel,
quota: Dict
) -> Tenant:
"""创建租户"""
import uuid
tenant_id = str(uuid.uuid4())
tenant = Tenant(
tenant_id=tenant_id,
name=name,
isolation_level=isolation_level
)
tenant.quota = quota
self.tenants[tenant_id] = tenant
return tenant
def get_tenant(self, tenant_id: str) -> Optional[Tenant]:
"""获取租户"""
return self.tenants.get(tenant_id)
def get_tenant_config(self, tenant_id: str) -> Dict:
"""获取租户配置"""
tenant = self.get_tenant(tenant_id)
if tenant:
return tenant.config
return {}
def check_quota(
self,
tenant_id: str,
resource_type: str,
required: int
) -> bool:
"""检查配额"""
tenant = self.get_tenant(tenant_id)
if not tenant:
return False
quota = tenant.quota.get(resource_type, 0)
used = self._get_used_quota(tenant_id, resource_type)
return used + required <= quota
def _get_used_quota(self, tenant_id: str, resource_type: str) -> int:
"""获取已用配额"""
# 实现配额统计逻辑
return 0
2.2 资源隔离
# resource_isolation.py
from typing import Dict, Optional
class ResourceIsolator:
"""资源隔离器"""
def __init__(self):
self.tenant_resources: Dict[str, Dict] = {}
def allocate_resources(
self,
tenant_id: str,
resources: Dict
) -> bool:
"""分配资源"""
if tenant_id not in self.tenant_resources:
self.tenant_resources[tenant_id] = {}
# 检查资源是否可用
if not self._check_resource_availability(resources):
return False
self.tenant_resources[tenant_id].update(resources)
return True
def get_isolated_pool(
self,
tenant_id: str,
resource_type: str
) -> Optional[any]:
"""获取隔离资源池"""
if tenant_id in self.tenant_resources:
return self.tenant_resources[tenant_id].get(resource_type)
return None
def _check_resource_availability(self, resources: Dict) -> bool:
"""检查资源可用性"""
# 实现资源检查逻辑
return True
# 使用示例
isolator = ResourceIsolator()
# 为租户分配独立资源
isolator.allocate_resources(
tenant_id="tenant_001",
resources={
'vector_db': 'dedicated_cluster_1',
'cache': 'dedicated_cache_1',
'llm_pool': 'shared_pool'
}
)
三、权限控制
3.1 RBAC 权限模型
# rbac_permission.py
from typing import Dict, List, Set
from enum import Enum
class PermissionType(Enum):
"""权限类型"""
READ = "read"
WRITE = "write"
DELETE = "delete"
ADMIN = "admin"
class ResourceType(Enum):
"""资源类型"""
DOCUMENT = "document"
KNOWLEDGE_BASE = "knowledge_base"
AGENT = "agent"
CONFIG = "config"
class Role:
"""角色"""
def __init__(self, role_id: str, name: str):
self.role_id = role_id
self.name = name
self.permissions: Dict[ResourceType, Set[PermissionType]] = {}
def add_permission(
self,
resource_type: ResourceType,
permission_type: PermissionType
):
"""添加权限"""
if resource_type not in self.permissions:
self.permissions[resource_type] = set()
self.permissions[resource_type].add(permission_type)
def has_permission(
self,
resource_type: ResourceType,
permission_type: PermissionType
) -> bool:
"""检查权限"""
if resource_type not in self.permissions:
return False
return permission_type in self.permissions[resource_type]
class PermissionManager:
"""权限管理器"""
def __init__(self):
self.roles: Dict[str, Role] = {}
self.user_roles: Dict[str, List[str]] = {} # user_id -> role_ids
def create_role(self, role_id: str, name: str) -> Role:
"""创建角色"""
role = Role(role_id, name)
self.roles[role_id] = role
return role
def assign_role(self, user_id: str, role_id: str):
"""分配角色"""
if user_id not in self.user_roles:
self.user_roles[user_id] = []
if role_id not in self.user_roles[user_id]:
self.user_roles[user_id].append(role_id)
def check_permission(
self,
user_id: str,
resource_type: ResourceType,
permission_type: PermissionType
) -> bool:
"""检查用户权限"""
role_ids = self.user_roles.get(user_id, [])
for role_id in role_ids:
role = self.roles.get(role_id)
if role and role.has_permission(resource_type, permission_type):
return True
return False
3.2 数据权限
# data_permission.py
from typing import Dict, List, Optional
class DataPermission:
"""数据权限"""
def __init__(self):
self.document_permissions: Dict[str, List[str]] = {} # doc_id -> user_ids
self.kb_permissions: Dict[str, List[str]] = {} # kb_id -> user_ids
def grant_document_access(
self,
document_id: str,
user_id: str,
permission_type: str = "read"
):
"""授予文档访问权限"""
key = f"{document_id}:{permission_type}"
if key not in self.document_permissions:
self.document_permissions[key] = []
if user_id not in self.document_permissions[key]:
self.document_permissions[key].append(user_id)
def check_document_access(
self,
document_id: str,
user_id: str,
permission_type: str = "read"
) -> bool:
"""检查文档访问权限"""
key = f"{document_id}:{permission_type}"
return user_id in self.document_permissions.get(key, [])
def filter_accessible_documents(
self,
user_id: str,
document_ids: List[str],
permission_type: str = "read"
) -> List[str]:
"""过滤可访问文档"""
accessible = []
for doc_id in document_ids:
if self.check_document_access(doc_id, user_id, permission_type):
accessible.append(doc_id)
return accessible
四、高可用设计
4.1 集群部署
# cluster_deployment.py
from typing import Dict, List, Optional
from enum import Enum
class NodeStatus(Enum):
"""节点状态"""
HEALTHY = "healthy"
UNHEALTHY = "unhealthy"
MAINTENANCE = "maintenance"
class ClusterNode:
"""集群节点"""
def __init__(self, node_id: str, host: str, port: int):
self.node_id = node_id
self.host = host
self.port = port
self.status = NodeStatus.HEALTHY
self.load: float = 0.0
class ClusterManager:
"""集群管理器"""
def __init__(self):
self.nodes: Dict[str, ClusterNode] = {}
self.primary_node: Optional[str] = None
def add_node(self, node: ClusterNode):
"""添加节点"""
self.nodes[node.node_id] = node
if not self.primary_node:
self.primary_node = node.node_id
def remove_node(self, node_id: str):
"""移除节点"""
if node_id in self.nodes:
del self.nodes[node_id]
if self.primary_node == node_id:
self._elect_new_primary()
def get_healthy_nodes(self) -> List[ClusterNode]:
"""获取健康节点"""
return [
node for node in self.nodes.values()
if node.status == NodeStatus.HEALTHY
]
def select_node(self, strategy: str = "load_balance") -> Optional[ClusterNode]:
"""选择节点"""
healthy_nodes = self.get_healthy_nodes()
if not healthy_nodes:
return None
if strategy == "load_balance":
# 选择负载最低的节点
return min(healthy_nodes, key=lambda n: n.load)
elif strategy == "round_robin":
# 轮询
return healthy_nodes[0]
return healthy_nodes[0]
def _elect_new_primary(self):
"""选举新主节点"""
healthy_nodes = self.get_healthy_nodes()
if healthy_nodes:
self.primary_node = healthy_nodes[0].node_id
4.2 故障恢复
# failover.py
from typing import Dict, Optional
import time
class FailoverManager:
"""故障恢复管理器"""
def __init__(self):
self.health_check_interval = 10 # 秒
self.failover_threshold = 3 # 失败次数阈值
self.node_failures: Dict[str, int] = {}
def check_health(self, node_id: str) -> bool:
"""检查节点健康"""
try:
# 执行健康检查
success = self._perform_health_check(node_id)
if success:
self.node_failures[node_id] = 0
return True
else:
self._record_failure(node_id)
return False
except Exception as e:
self._record_failure(node_id)
return False
def _record_failure(self, node_id: str):
"""记录失败"""
if node_id not in self.node_failures:
self.node_failures[node_id] = 0
self.node_failures[node_id] += 1
# 检查是否需要故障转移
if self.node_failures[node_id] >= self.failover_threshold:
self._trigger_failover(node_id)
def _trigger_failover(self, failed_node_id: str):
"""触发故障转移"""
# 1. 标记节点为不健康
# 2. 选择备用节点
# 3. 切换流量
# 4. 发送告警
pass
def _perform_health_check(self, node_id: str) -> bool:
"""执行健康检查"""
# 实现健康检查逻辑
return True
五、性能优化
5.1 缓存策略
# caching_strategy.py
from typing import Dict, Optional, List
from datetime import datetime, timedelta
class CacheStrategy:
"""缓存策略"""
def __init__(self):
self.cache: Dict[str, Dict] = {}
self.default_ttl = timedelta(hours=1)
def get(
self,
key: str,
tenant_id: str = None
) -> Optional[any]:
"""获取缓存"""
full_key = self._make_key(key, tenant_id)
if full_key in self.cache:
entry = self.cache[full_key]
# 检查是否过期
if datetime.now() < entry['expires_at']:
return entry['value']
else:
# 删除过期缓存
del self.cache[full_key]
return None
def set(
self,
key: str,
value: any,
ttl: timedelta = None,
tenant_id: str = None
):
"""设置缓存"""
full_key = self._make_key(key, tenant_id)
self.cache[full_key] = {
'value': value,
'expires_at': datetime.now() + (ttl or self.default_ttl),
'created_at': datetime.now()
}
def invalidate(self, key: str, tenant_id: str = None):
"""使缓存失效"""
full_key = self._make_key(key, tenant_id)
if full_key in self.cache:
del self.cache[full_key]
def _make_key(self, key: str, tenant_id: str = None) -> str:
"""生成缓存键"""
if tenant_id:
return f"{tenant_id}:{key}"
return key
# 多级缓存策略
class MultiLevelCache:
"""多级缓存"""
def __init__(self):
self.l1_cache = {} # 本地缓存
self.l2_cache = CacheStrategy() # 分布式缓存
def get(self, key: str) -> Optional[any]:
"""获取缓存"""
# 先查 L1
if key in self.l1_cache:
return self.l1_cache[key]
# 再查 L2
value = self.l2_cache.get(key)
if value:
# 回填 L1
self.l1_cache[key] = value
return value
return None
def set(self, key: str, value: any):
"""设置缓存"""
self.l1_cache[key] = value
self.l2_cache.set(key, value)
5.2 查询优化
# query_optimization.py
from typing import Dict, List, Optional
class QueryOptimizer:
"""查询优化器"""
def __init__(self):
self.query_cache = {}
self.rewrite_rules = []
def optimize_query(
self,
query: str,
context: Dict
) -> Dict:
"""优化查询"""
# 1. 检查缓存
cached_result = self._check_cache(query)
if cached_result:
return cached_result
# 2. 查询重写
rewritten_query = self._rewrite_query(query)
# 3. 添加过滤条件
filtered_query = self._add_filters(rewritten_query, context)
# 4. 优化检索参数
optimized_params = self._optimize_params(filtered_query)
return {
'query': filtered_query,
'params': optimized_params
}
def _check_cache(self, query: str) -> Optional[Dict]:
"""检查查询缓存"""
return self.query_cache.get(query)
def _rewrite_query(self, query: str) -> str:
"""重写查询"""
# 应用重写规则
rewritten = query
for rule in self.rewrite_rules:
rewritten = rule.apply(rewritten)
return rewritten
def _add_filters(self, query: str, context: Dict) -> str:
"""添加过滤条件"""
# 添加权限过滤
if 'permissions' in context:
query += f" AND permissions IN {context['permissions']}"
# 添加租户过滤
if 'tenant_id' in context:
query += f" AND tenant_id = '{context['tenant_id']}'"
return query
def _optimize_params(self, query: str) -> Dict:
"""优化参数"""
return {
'top_k': 10,
'use_cache': True,
'parallel': True
}
六、监控与运维
6.1 监控指标
# monitoring_metrics.py
from typing import Dict, List
from datetime import datetime
class MonitoringMetrics:
"""监控指标"""
def __init__(self):
self.metrics: Dict[str, List[Dict]] = {}
def record_metric(
self,
metric_name: str,
value: float,
tags: Dict = None
):
"""记录指标"""
if metric_name not in self.metrics:
self.metrics[metric_name] = []
self.metrics[metric_name].append({
'value': value,
'tags': tags or {},
'timestamp': datetime.now().isoformat()
})
def get_metric_stats(
self,
metric_name: str,
time_range: str = "1h"
) -> Dict:
"""获取指标统计"""
import statistics
if metric_name not in self.metrics:
return {}
values = [m['value'] for m in self.metrics[metric_name]]
return {
'count': len(values),
'mean': statistics.mean(values),
'min': min(values),
'max': max(values),
'p50': statistics.median(values),
'p95': sorted(values)[int(len(values) * 0.95)] if len(values) > 20 else max(values),
'p99': sorted(values)[int(len(values) * 0.99)] if len(values) > 100 else max(values)
}
# 关键指标
KEY_METRICS = [
'request_count', # 请求数
'response_latency', # 响应延迟
'error_rate', # 错误率
'cache_hit_rate', # 缓存命中率
'retrieval_recall', # 检索召回率
'generation_quality', # 生成质量
'tenant_isolation', # 租户隔离
'resource_usage' # 资源使用
]
七、总结
7.1 核心要点
-
多租户架构
- 租户隔离
- 资源配额
- 独立配置
-
权限控制
- RBAC 模型
- 数据权限
- 审计日志
-
高可用
- 集群部署
- 故障恢复
- 负载均衡
7.2 实施建议
-
分阶段实施
- 先核心功能
- 后扩展功能
- 持续优化
-
监控先行
- 建立监控体系
- 设置告警阈值
- 定期演练
-
文档完善
- 架构文档
- 运维手册
- 应急预案
参考资料