Skip to content
清晨的一缕阳光
返回

企业级 RAG 系统架构设计

企业级 RAG 系统架构设计

企业级 RAG 系统需要满足高可用、多租户、安全可控等要求。如何设计企业级 RAG 架构?如何保障系统稳定性?本文详解企业级 RAG 系统的完整架构设计。

一、架构概述

1.1 企业级要求

企业级 RAG 系统要求:

┌─────────────────────────────────────┐
│ 1. 高可用性                          │
│    - 99.9%+ 可用性                   │
│    - 故障自动恢复                    │
│    - 数据备份恢复                    │
├─────────────────────────────────────┤
│ 2. 多租户支持                        │
│    - 租户隔离                        │
│    - 资源配额                        │
│    - 独立配置                        │
├─────────────────────────────────────┤
│ 3. 安全可控                          │
│    - 权限控制                        │
│    - 数据加密                        │
│    - 审计日志                        │
├─────────────────────────────────────┤
│ 4. 可扩展性                          │
│    - 水平扩展                        │
│    - 模块化解耦                      │
│    - 弹性伸缩                        │
└─────────────────────────────────────┘

1.2 整体架构

graph TB
    subgraph 接入层
        A[API 网关]
        B[负载均衡]
    end
    
    subgraph 应用层
        C[RAG 服务集群]
        D[用户服务]
        E[权限服务]
    end
    
    subgraph 处理层
        F[检索服务]
        G[重排序服务]
        H[生成服务]
    end
    
    subgraph 数据层
        I[向量数据库]
        J[文档存储]
        K[缓存集群]
        L[关系数据库]
    end
    
    A --> B
    B --> C
    B --> D
    B --> E
    C --> F
    C --> G
    C --> H
    F --> I
    F --> J
    F --> K
    H --> K
    D --> L
    E --> L

二、多租户架构

2.1 租户隔离方案

# multi_tenancy.py
from typing import Dict, List, Optional
from enum import Enum

class IsolationLevel(Enum):
    """隔离级别"""
    SHARED = "shared"  # 共享资源
    ISOLATED = "isolated"  # 逻辑隔离
    DEDICATED = "dedicated"  # 物理隔离

class Tenant:
    """租户"""
    
    def __init__(
        self,
        tenant_id: str,
        name: str,
        isolation_level: IsolationLevel = IsolationLevel.ISOLATED
    ):
        self.tenant_id = tenant_id
        self.name = name
        self.isolation_level = isolation_level
        self.config: Dict = {}
        self.quota: Dict = {}
        self.users: List[str] = []

class TenantManager:
    """租户管理器"""
    
    def __init__(self):
        self.tenants: Dict[str, Tenant] = {}
    
    def create_tenant(
        self,
        name: str,
        isolation_level: IsolationLevel,
        quota: Dict
    ) -> Tenant:
        """创建租户"""
        import uuid
        tenant_id = str(uuid.uuid4())
        
        tenant = Tenant(
            tenant_id=tenant_id,
            name=name,
            isolation_level=isolation_level
        )
        tenant.quota = quota
        
        self.tenants[tenant_id] = tenant
        return tenant
    
    def get_tenant(self, tenant_id: str) -> Optional[Tenant]:
        """获取租户"""
        return self.tenants.get(tenant_id)
    
    def get_tenant_config(self, tenant_id: str) -> Dict:
        """获取租户配置"""
        tenant = self.get_tenant(tenant_id)
        if tenant:
            return tenant.config
        return {}
    
    def check_quota(
        self,
        tenant_id: str,
        resource_type: str,
        required: int
    ) -> bool:
        """检查配额"""
        tenant = self.get_tenant(tenant_id)
        if not tenant:
            return False
        
        quota = tenant.quota.get(resource_type, 0)
        used = self._get_used_quota(tenant_id, resource_type)
        
        return used + required <= quota
    
    def _get_used_quota(self, tenant_id: str, resource_type: str) -> int:
        """获取已用配额"""
        # 实现配额统计逻辑
        return 0

2.2 资源隔离

# resource_isolation.py
from typing import Dict, Optional

class ResourceIsolator:
    """资源隔离器"""
    
    def __init__(self):
        self.tenant_resources: Dict[str, Dict] = {}
    
    def allocate_resources(
        self,
        tenant_id: str,
        resources: Dict
    ) -> bool:
        """分配资源"""
        if tenant_id not in self.tenant_resources:
            self.tenant_resources[tenant_id] = {}
        
        # 检查资源是否可用
        if not self._check_resource_availability(resources):
            return False
        
        self.tenant_resources[tenant_id].update(resources)
        return True
    
    def get_isolated_pool(
        self,
        tenant_id: str,
        resource_type: str
    ) -> Optional[any]:
        """获取隔离资源池"""
        if tenant_id in self.tenant_resources:
            return self.tenant_resources[tenant_id].get(resource_type)
        return None
    
    def _check_resource_availability(self, resources: Dict) -> bool:
        """检查资源可用性"""
        # 实现资源检查逻辑
        return True

# 使用示例
isolator = ResourceIsolator()

# 为租户分配独立资源
isolator.allocate_resources(
    tenant_id="tenant_001",
    resources={
        'vector_db': 'dedicated_cluster_1',
        'cache': 'dedicated_cache_1',
        'llm_pool': 'shared_pool'
    }
)

三、权限控制

3.1 RBAC 权限模型

# rbac_permission.py
from typing import Dict, List, Set
from enum import Enum

class PermissionType(Enum):
    """权限类型"""
    READ = "read"
    WRITE = "write"
    DELETE = "delete"
    ADMIN = "admin"

class ResourceType(Enum):
    """资源类型"""
    DOCUMENT = "document"
    KNOWLEDGE_BASE = "knowledge_base"
    AGENT = "agent"
    CONFIG = "config"

class Role:
    """角色"""
    
    def __init__(self, role_id: str, name: str):
        self.role_id = role_id
        self.name = name
        self.permissions: Dict[ResourceType, Set[PermissionType]] = {}
    
    def add_permission(
        self,
        resource_type: ResourceType,
        permission_type: PermissionType
    ):
        """添加权限"""
        if resource_type not in self.permissions:
            self.permissions[resource_type] = set()
        self.permissions[resource_type].add(permission_type)
    
    def has_permission(
        self,
        resource_type: ResourceType,
        permission_type: PermissionType
    ) -> bool:
        """检查权限"""
        if resource_type not in self.permissions:
            return False
        return permission_type in self.permissions[resource_type]

class PermissionManager:
    """权限管理器"""
    
    def __init__(self):
        self.roles: Dict[str, Role] = {}
        self.user_roles: Dict[str, List[str]] = {}  # user_id -> role_ids
    
    def create_role(self, role_id: str, name: str) -> Role:
        """创建角色"""
        role = Role(role_id, name)
        self.roles[role_id] = role
        return role
    
    def assign_role(self, user_id: str, role_id: str):
        """分配角色"""
        if user_id not in self.user_roles:
            self.user_roles[user_id] = []
        
        if role_id not in self.user_roles[user_id]:
            self.user_roles[user_id].append(role_id)
    
    def check_permission(
        self,
        user_id: str,
        resource_type: ResourceType,
        permission_type: PermissionType
    ) -> bool:
        """检查用户权限"""
        role_ids = self.user_roles.get(user_id, [])
        
        for role_id in role_ids:
            role = self.roles.get(role_id)
            if role and role.has_permission(resource_type, permission_type):
                return True
        
        return False

3.2 数据权限

# data_permission.py
from typing import Dict, List, Optional

class DataPermission:
    """数据权限"""
    
    def __init__(self):
        self.document_permissions: Dict[str, List[str]] = {}  # doc_id -> user_ids
        self.kb_permissions: Dict[str, List[str]] = {}  # kb_id -> user_ids
    
    def grant_document_access(
        self,
        document_id: str,
        user_id: str,
        permission_type: str = "read"
    ):
        """授予文档访问权限"""
        key = f"{document_id}:{permission_type}"
        if key not in self.document_permissions:
            self.document_permissions[key] = []
        
        if user_id not in self.document_permissions[key]:
            self.document_permissions[key].append(user_id)
    
    def check_document_access(
        self,
        document_id: str,
        user_id: str,
        permission_type: str = "read"
    ) -> bool:
        """检查文档访问权限"""
        key = f"{document_id}:{permission_type}"
        return user_id in self.document_permissions.get(key, [])
    
    def filter_accessible_documents(
        self,
        user_id: str,
        document_ids: List[str],
        permission_type: str = "read"
    ) -> List[str]:
        """过滤可访问文档"""
        accessible = []
        
        for doc_id in document_ids:
            if self.check_document_access(doc_id, user_id, permission_type):
                accessible.append(doc_id)
        
        return accessible

四、高可用设计

4.1 集群部署

# cluster_deployment.py
from typing import Dict, List, Optional
from enum import Enum

class NodeStatus(Enum):
    """节点状态"""
    HEALTHY = "healthy"
    UNHEALTHY = "unhealthy"
    MAINTENANCE = "maintenance"

class ClusterNode:
    """集群节点"""
    
    def __init__(self, node_id: str, host: str, port: int):
        self.node_id = node_id
        self.host = host
        self.port = port
        self.status = NodeStatus.HEALTHY
        self.load: float = 0.0

class ClusterManager:
    """集群管理器"""
    
    def __init__(self):
        self.nodes: Dict[str, ClusterNode] = {}
        self.primary_node: Optional[str] = None
    
    def add_node(self, node: ClusterNode):
        """添加节点"""
        self.nodes[node.node_id] = node
        
        if not self.primary_node:
            self.primary_node = node.node_id
    
    def remove_node(self, node_id: str):
        """移除节点"""
        if node_id in self.nodes:
            del self.nodes[node_id]
        
        if self.primary_node == node_id:
            self._elect_new_primary()
    
    def get_healthy_nodes(self) -> List[ClusterNode]:
        """获取健康节点"""
        return [
            node for node in self.nodes.values()
            if node.status == NodeStatus.HEALTHY
        ]
    
    def select_node(self, strategy: str = "load_balance") -> Optional[ClusterNode]:
        """选择节点"""
        healthy_nodes = self.get_healthy_nodes()
        
        if not healthy_nodes:
            return None
        
        if strategy == "load_balance":
            # 选择负载最低的节点
            return min(healthy_nodes, key=lambda n: n.load)
        elif strategy == "round_robin":
            # 轮询
            return healthy_nodes[0]
        
        return healthy_nodes[0]
    
    def _elect_new_primary(self):
        """选举新主节点"""
        healthy_nodes = self.get_healthy_nodes()
        if healthy_nodes:
            self.primary_node = healthy_nodes[0].node_id

4.2 故障恢复

# failover.py
from typing import Dict, Optional
import time

class FailoverManager:
    """故障恢复管理器"""
    
    def __init__(self):
        self.health_check_interval = 10  # 秒
        self.failover_threshold = 3  # 失败次数阈值
        self.node_failures: Dict[str, int] = {}
    
    def check_health(self, node_id: str) -> bool:
        """检查节点健康"""
        try:
            # 执行健康检查
            success = self._perform_health_check(node_id)
            
            if success:
                self.node_failures[node_id] = 0
                return True
            else:
                self._record_failure(node_id)
                return False
                
        except Exception as e:
            self._record_failure(node_id)
            return False
    
    def _record_failure(self, node_id: str):
        """记录失败"""
        if node_id not in self.node_failures:
            self.node_failures[node_id] = 0
        
        self.node_failures[node_id] += 1
        
        # 检查是否需要故障转移
        if self.node_failures[node_id] >= self.failover_threshold:
            self._trigger_failover(node_id)
    
    def _trigger_failover(self, failed_node_id: str):
        """触发故障转移"""
        # 1. 标记节点为不健康
        # 2. 选择备用节点
        # 3. 切换流量
        # 4. 发送告警
        pass
    
    def _perform_health_check(self, node_id: str) -> bool:
        """执行健康检查"""
        # 实现健康检查逻辑
        return True

五、性能优化

5.1 缓存策略

# caching_strategy.py
from typing import Dict, Optional, List
from datetime import datetime, timedelta

class CacheStrategy:
    """缓存策略"""
    
    def __init__(self):
        self.cache: Dict[str, Dict] = {}
        self.default_ttl = timedelta(hours=1)
    
    def get(
        self,
        key: str,
        tenant_id: str = None
    ) -> Optional[any]:
        """获取缓存"""
        full_key = self._make_key(key, tenant_id)
        
        if full_key in self.cache:
            entry = self.cache[full_key]
            
            # 检查是否过期
            if datetime.now() < entry['expires_at']:
                return entry['value']
            else:
                # 删除过期缓存
                del self.cache[full_key]
        
        return None
    
    def set(
        self,
        key: str,
        value: any,
        ttl: timedelta = None,
        tenant_id: str = None
    ):
        """设置缓存"""
        full_key = self._make_key(key, tenant_id)
        
        self.cache[full_key] = {
            'value': value,
            'expires_at': datetime.now() + (ttl or self.default_ttl),
            'created_at': datetime.now()
        }
    
    def invalidate(self, key: str, tenant_id: str = None):
        """使缓存失效"""
        full_key = self._make_key(key, tenant_id)
        if full_key in self.cache:
            del self.cache[full_key]
    
    def _make_key(self, key: str, tenant_id: str = None) -> str:
        """生成缓存键"""
        if tenant_id:
            return f"{tenant_id}:{key}"
        return key

# 多级缓存策略
class MultiLevelCache:
    """多级缓存"""
    
    def __init__(self):
        self.l1_cache = {}  # 本地缓存
        self.l2_cache = CacheStrategy()  # 分布式缓存
    
    def get(self, key: str) -> Optional[any]:
        """获取缓存"""
        # 先查 L1
        if key in self.l1_cache:
            return self.l1_cache[key]
        
        # 再查 L2
        value = self.l2_cache.get(key)
        if value:
            # 回填 L1
            self.l1_cache[key] = value
            return value
        
        return None
    
    def set(self, key: str, value: any):
        """设置缓存"""
        self.l1_cache[key] = value
        self.l2_cache.set(key, value)

5.2 查询优化

# query_optimization.py
from typing import Dict, List, Optional

class QueryOptimizer:
    """查询优化器"""
    
    def __init__(self):
        self.query_cache = {}
        self.rewrite_rules = []
    
    def optimize_query(
        self,
        query: str,
        context: Dict
    ) -> Dict:
        """优化查询"""
        # 1. 检查缓存
        cached_result = self._check_cache(query)
        if cached_result:
            return cached_result
        
        # 2. 查询重写
        rewritten_query = self._rewrite_query(query)
        
        # 3. 添加过滤条件
        filtered_query = self._add_filters(rewritten_query, context)
        
        # 4. 优化检索参数
        optimized_params = self._optimize_params(filtered_query)
        
        return {
            'query': filtered_query,
            'params': optimized_params
        }
    
    def _check_cache(self, query: str) -> Optional[Dict]:
        """检查查询缓存"""
        return self.query_cache.get(query)
    
    def _rewrite_query(self, query: str) -> str:
        """重写查询"""
        # 应用重写规则
        rewritten = query
        
        for rule in self.rewrite_rules:
            rewritten = rule.apply(rewritten)
        
        return rewritten
    
    def _add_filters(self, query: str, context: Dict) -> str:
        """添加过滤条件"""
        # 添加权限过滤
        if 'permissions' in context:
            query += f" AND permissions IN {context['permissions']}"
        
        # 添加租户过滤
        if 'tenant_id' in context:
            query += f" AND tenant_id = '{context['tenant_id']}'"
        
        return query
    
    def _optimize_params(self, query: str) -> Dict:
        """优化参数"""
        return {
            'top_k': 10,
            'use_cache': True,
            'parallel': True
        }

六、监控与运维

6.1 监控指标

# monitoring_metrics.py
from typing import Dict, List
from datetime import datetime

class MonitoringMetrics:
    """监控指标"""
    
    def __init__(self):
        self.metrics: Dict[str, List[Dict]] = {}
    
    def record_metric(
        self,
        metric_name: str,
        value: float,
        tags: Dict = None
    ):
        """记录指标"""
        if metric_name not in self.metrics:
            self.metrics[metric_name] = []
        
        self.metrics[metric_name].append({
            'value': value,
            'tags': tags or {},
            'timestamp': datetime.now().isoformat()
        })
    
    def get_metric_stats(
        self,
        metric_name: str,
        time_range: str = "1h"
    ) -> Dict:
        """获取指标统计"""
        import statistics
        
        if metric_name not in self.metrics:
            return {}
        
        values = [m['value'] for m in self.metrics[metric_name]]
        
        return {
            'count': len(values),
            'mean': statistics.mean(values),
            'min': min(values),
            'max': max(values),
            'p50': statistics.median(values),
            'p95': sorted(values)[int(len(values) * 0.95)] if len(values) > 20 else max(values),
            'p99': sorted(values)[int(len(values) * 0.99)] if len(values) > 100 else max(values)
        }

# 关键指标
KEY_METRICS = [
    'request_count',          # 请求数
    'response_latency',       # 响应延迟
    'error_rate',             # 错误率
    'cache_hit_rate',         # 缓存命中率
    'retrieval_recall',       # 检索召回率
    'generation_quality',     # 生成质量
    'tenant_isolation',       # 租户隔离
    'resource_usage'          # 资源使用
]

七、总结

7.1 核心要点

  1. 多租户架构

    • 租户隔离
    • 资源配额
    • 独立配置
  2. 权限控制

    • RBAC 模型
    • 数据权限
    • 审计日志
  3. 高可用

    • 集群部署
    • 故障恢复
    • 负载均衡

7.2 实施建议

  1. 分阶段实施

    • 先核心功能
    • 后扩展功能
    • 持续优化
  2. 监控先行

    • 建立监控体系
    • 设置告警阈值
    • 定期演练
  3. 文档完善

    • 架构文档
    • 运维手册
    • 应急预案

参考资料


分享这篇文章到:

上一篇文章
RocketMQ Controller 控制器详解与高可用
下一篇文章
Redis 持久化性能优化