Skip to content
清晨的一缕阳光
返回

AI 应用性能优化实战案例

AI 应用性能优化实战案例

本文通过多个真实案例,详解 AI 应用性能优化的实战方法和经验教训。

一、案例一:RAG 系统延迟优化

1.1 问题背景

优化前指标:
┌─────────────────────────────────────┐
│ P50 延迟:3.5 秒                     │
│ P95 延迟:8.2 秒                     │
│ P99 延迟:15.3 秒                    │
│ QPS: 50                             │
│ 用户投诉率高                        │
└─────────────────────────────────────┘

优化目标:
┌─────────────────────────────────────┐
│ P50 延迟:< 1 秒                      │
│ P95 延迟:< 3 秒                     │
│ QPS: > 200                          │
└─────────────────────────────────────┘

1.2 性能分析

# performance_analysis.py
import cProfile
import pstats
from typing import Dict, List

class RAGPerformanceAnalyzer:
    """RAG 性能分析器"""
    
    def __init__(self):
        self.profiler = cProfile.Profile()
    
    def profile_query(self, rag_system, query: str) -> Dict:
        """分析查询性能"""
        # 启动性能分析
        self.profiler.enable()
        
        result = rag_system.query(query)
        
        self.profiler.disable()
        
        # 分析结果
        stats = pstats.Stats(self.profiler)
        stats.sort_stats('cumulative')
        
        # 提取关键信息
        bottlenecks = self._identify_bottlenecks(stats)
        
        return {
            'latency_ms': result.get('latency_ms', 0),
            'bottlenecks': bottlenecks,
            'recommendations': self._generate_recommendations(bottlenecks)
        }
    
    def _identify_bottlenecks(self, stats) -> List[Dict]:
        """识别瓶颈"""
        bottlenecks = []
        
        # 分析调用统计
        for func, (cc, nc, tt, ct, callers) in stats.stats.items():
            if ct > 0.5:  # 累计时间超过 0.5 秒
                bottlenecks.append({
                    'function': func[2],
                    'cumulative_time': ct,
                    'call_count': nc,
                    'avg_time': ct / nc if nc > 0 else 0
                })
        
        return sorted(
            bottlenecks,
            key=lambda x: x['cumulative_time'],
            reverse=True
        )[:10]
    
    def _generate_recommendations(
        self,
        bottlenecks: List[Dict]
    ) -> List[str]:
        """生成优化建议"""
        recommendations = []
        
        for bn in bottlenecks:
            if 'embedding' in bn['function']:
                recommendations.append(
                    '考虑批量计算 Embedding 或使用缓存'
                )
            elif 'retrieve' in bn['function']:
                recommendations.append(
                    '优化向量检索,使用近似检索或增加索引'
                )
            elif 'generate' in bn['function']:
                recommendations.append(
                    '优化 Prompt 长度或使用流式输出'
                )
        
        return recommendations

# 分析结果
"""
性能瓶颈分析:

1. Embedding 计算(35%)
   - 每次查询计算 10 个文档的 Embedding
   - 串行计算,总耗时 1.2 秒

2. 向量检索(25%)
   - 暴力搜索,未使用索引
   - 检索 10000 个向量,耗时 0.8 秒

3. LLM 生成(30%)
   - Prompt 过长(3000 Token)
   - 等待完整响应,耗时 1.0 秒

4. 其他(10%)
   - 网络 IO、序列化等
"""

1.3 优化方案

# optimization_implementation.py
from typing import Dict, List
import asyncio

class OptimizedRAGSystem:
    """优化后的 RAG 系统"""
    
    def __init__(self, config):
        # 1. 启用 Embedding 缓存
        self.embedding_cache = SemanticCache(
            embedding_model=config['embedding_model'],
            similarity_threshold=0.99
        )
        
        # 2. 使用 HNSW 索引
        self.vector_index = HNSWIndex(
            dimension=config['embedding_dimension'],
            M=32,
            ef_construction=200
        )
        
        # 3. 启用多级缓存
        self.response_cache = MultiLevelCache(
            l1_max_size=1000,
            l1_ttl_minutes=5,
            l2_ttl_hours=24
        )
        
        # 4. 流式输出
        self.enable_streaming = True
    
    async def query(self, query: str) -> Dict:
        """优化后的查询"""
        start_time = datetime.now()
        
        # 1. 检查响应缓存
        cached = self.response_cache.get(query)
        if cached:
            return {
                **cached,
                'from_cache': True,
                'latency_ms': (datetime.now() - start_time).total_seconds() * 1000
            }
        
        # 2. 并行计算 Embedding
        query_embedding = await self._get_embedding_cached(query)
        
        # 3. 使用 HNSW 索引检索
        retrieved = await self._retrieve_fast(query_embedding, top_k=10)
        
        # 4. 优化上下文
        context = self._optimize_context(retrieved, query)
        
        # 5. 流式生成
        if self.enable_streaming:
            result = await self._generate_streaming(query, context)
        else:
            result = await self._generate(query, context)
        
        # 6. 缓存结果
        self.response_cache.set(query, result)
        
        result['latency_ms'] = (
            datetime.now() - start_time
        ).total_seconds() * 1000
        
        return result
    
    async def _get_embedding_cached(self, text: str) -> List[float]:
        """获取缓存的 Embedding"""
        cached = self.embedding_cache.get(text)
        if cached:
            return cached
        
        embedding = await self.embedding_model.encode([text])[0]
        self.embedding_cache.set(text, embedding)
        
        return embedding
    
    async def _retrieve_fast(
        self,
        query_embedding: List[float],
        top_k: int
    ) -> List[Dict]:
        """快速检索"""
        # 使用 HNSW 索引,大幅降低检索时间
        indices, distances = self.vector_index.search(
            [query_embedding],
            k=top_k
        )
        
        return [
            {'id': idx, 'distance': dist}
            for idx, dist in zip(indices[0], distances[0])
        ]
    
    def _optimize_context(
        self,
        retrieved: List[Dict],
        query: str
    ) -> str:
        """优化上下文"""
        # 只保留最相关的 5 个文档
        # 压缩每个文档到 200 Token
        # ...
        pass
    
    async def _generate_streaming(
        self,
        query: str,
        context: str
    ) -> Dict:
        """流式生成"""
        # 实现流式输出,降低感知延迟
        # ...
        pass

# 优化效果
"""
优化后指标:
┌─────────────────────────────────────┐
│ P50 延迟:0.8 秒(-77%)              │
│ P95 延迟:2.1 秒(-74%)              │
│ P99 延迟:4.5 秒(-71%)              │
│ QPS: 220(+340%)                    │
│ 用户投诉率:降低 85%                  │
└─────────────────────────────────────┘

优化措施贡献度:
┌─────────────────────────────────────┐
│ Embedding 缓存:30%                  │
│ HNSW 索引:25%                       │
│ 响应缓存:20%                        │
│ 上下文优化:15%                      │
│ 流式输出:10%                        │
└─────────────────────────────────────┘
"""

二、案例二:Agent 系统吞吐量提升

2.1 问题背景

优化前指标:
┌─────────────────────────────────────┐
│ 并发用户:100                        │
│ 请求成功率:95%                      │
│ 平均响应时间:2.5 秒                  │
│ 系统资源利用率:CPU 80%, 内存 70%     │
│ 高峰期请求失败率:15%                │
└─────────────────────────────────────┘

优化目标:
┌─────────────────────────────────────┐
│ 并发用户:> 500                      │
│ 请求成功率:> 99%                    │
│ 平均响应时间:< 1.5 秒                │
│ 高峰期请求失败率:< 2%               │
└─────────────────────────────────────┘

2.2 性能瓶颈

# bottleneck_analysis.py
from typing import Dict, List

class AgentBottleneckAnalyzer:
    """Agent 系统瓶颈分析器"""
    
    def analyze(self, metrics: Dict) -> Dict:
        """分析系统瓶颈"""
        bottlenecks = []
        
        # 检查 LLM 调用瓶颈
        if metrics.get('llm_queue_length', 0) > 50:
            bottlenecks.append({
                'component': 'LLM Pool',
                'issue': 'LLM 调用队列过长',
                'severity': 'high',
                'solution': '增加 LLM 并发或启用批处理'
            })
        
        # 检查内存瓶颈
        if metrics.get('memory_usage', 0) > 0.8:
            bottlenecks.append({
                'component': 'Memory',
                'issue': '内存使用率过高',
                'severity': 'high',
                'solution': '优化上下文长度或增加实例'
            })
        
        # 检查 Agent 实例瓶颈
        if metrics.get('agent_utilization', 0) > 0.9:
            bottlenecks.append({
                'component': 'Agent Workers',
                'issue': 'Agent 实例饱和',
                'severity': 'critical',
                'solution': '水平扩展 Agent 实例'
            })
        
        return {
            'bottlenecks': bottlenecks,
            'priority_order': self._prioritize(bottlenecks)
        }
    
    def _prioritize(self, bottlenecks: List[Dict]) -> List[str]:
        """优先级排序"""
        severity_order = {'critical': 0, 'high': 1, 'medium': 2, 'low': 3}
        
        sorted_bottlenecks = sorted(
            bottlenecks,
            key=lambda x: severity_order.get(x['severity'], 99)
        )
        
        return [b['component'] for b in sorted_bottlenecks]

# 分析结果
"""
瓶颈分析:

1. Agent 实例饱和(Critical)
   - 10 个 Agent 实例处理 100 并发
   - 每个实例处理能力:10 QPS
   - 利用率:95%

2. LLM 调用队列长(High)
   - LLM 并发限制:5
   - 平均等待时间:1.2 秒
   - 队列长度:50+

3. 内存压力大(High)
   - 平均每个会话占用:50MB
   - 总内存:8GB
   - 使用率:85%
"""

2.3 优化实施

# optimization_implementation.py
from typing import Dict, List
import asyncio

class OptimizedAgentSystem:
    """优化后的 Agent 系统"""
    
    def __init__(self, config):
        # 1. Agent 实例池化
        self.agent_pool = AgentPool(
            min_instances=10,
            max_instances=100,
            scale_up_threshold=0.8,
            scale_down_threshold=0.3
        )
        
        # 2. LLM 批处理
        self.llm_batcher = LLMBatchOptimizer(
            max_batch_size=20,
            max_wait_ms=100
        )
        
        # 3. 会话状态外部化
        self.session_store = RedisSessionStore(
            host=config['redis_host'],
            port=config['redis_port']
        )
        
        # 4. 请求限流
        self.rate_limiter = TokenBucketRateLimiter(
            rate=1000,  # 每秒 1000 请求
            capacity=2000
        )
    
    async def process(self, request: Dict) -> Dict:
        """处理请求"""
        # 1. 限流检查
        if not self.rate_limiter.allow():
            return {
                'error': 'Rate limit exceeded',
                'retry_after': 1
            }
        
        # 2. 获取 Agent 实例
        agent = await self.agent_pool.acquire()
        
        try:
            # 3. 加载会话状态
            session_state = await self.session_store.get(
                request['session_id']
            )
            
            # 4. 处理请求
            result = await agent.process(
                request['query'],
                session_state
            )
            
            # 5. 保存会话状态
            await self.session_store.set(
                request['session_id'],
                agent.state
            )
            
            return result
        
        finally:
            # 6. 释放 Agent 实例
            await self.agent_pool.release(agent)
    
    async def batch_llm_calls(
        self,
        prompts: List[str]
    ) -> List[str]:
        """批量 LLM 调用"""
        return await self.llm_batcher.submit_batch(prompts)

class AgentPool:
    """Agent 实例池"""
    
    def __init__(
        self,
        min_instances: int,
        max_instances: int,
        scale_up_threshold: float,
        scale_down_threshold: float
    ):
        self.min_instances = min_instances
        self.max_instances = max_instances
        self.scale_up_threshold = scale_up_threshold
        self.scale_down_threshold = scale_down_threshold
        
        self.instances: List = []
        self.in_use: set = set()
        
        # 初始化最小实例数
        for i in range(min_instances):
            self.instances.append(self._create_agent())
    
    async def acquire(self):
        """获取实例"""
        # 查找空闲实例
        for i, instance in enumerate(self.instances):
            if i not in self.in_use:
                self.in_use.add(i)
                return instance
        
        # 没有空闲实例,尝试扩展
        if len(self.instances) < self.max_instances:
            new_instance = self._create_agent()
            self.instances.append(new_instance)
            self.in_use.add(len(self.instances) - 1)
            return new_instance
        
        # 等待实例释放
        while True:
            await asyncio.sleep(0.1)
            for i, instance in enumerate(self.instances):
                if i not in self.in_use:
                    self.in_use.add(i)
                    return instance
    
    async def release(self, instance):
        """释放实例"""
        idx = self.instances.index(instance)
        self.in_use.discard(idx)
        
        # 检查是否可以缩容
        if (
            len(self.instances) > self.min_instances and
            len(self.in_use) / len(self.instances) < self.scale_down_threshold
        ):
            # 缩容逻辑
            pass
    
    def _create_agent(self):
        """创建 Agent 实例"""
        return Agent()

# 优化效果
"""
优化后指标:
┌─────────────────────────────────────┐
│ 并发用户:550(+450%)               │
│ 请求成功率:99.5%(+4.5%)           │
│ 平均响应时间:1.2 秒(-52%)          │
│ 高峰期请求失败率:1.2%(-92%)       │
│ 资源利用率:CPU 65%, 内存 60%         │
└─────────────────────────────────────┘

优化措施贡献度:
┌─────────────────────────────────────┐
│ Agent 池化:40%                      │
│ LLM 批处理:25%                      │
│ 状态外部化:20%                      │
│ 请求限流:15%                        │
└─────────────────────────────────────┘
"""

三、案例三:多租户系统资源优化

3.1 问题背景

优化前指标:
┌─────────────────────────────────────┐
│ 租户数:50                           │
│ 资源争用严重                         │
│ 大租户占用 80% 资源                   │
│ 小租户响应慢                         │
│ 资源利用率:40%                      │
└─────────────────────────────────────┘

优化目标:
┌─────────────────────────────────────┐
│ 租户隔离                             │
│ 公平资源分配                         │
│ 资源利用率:> 70%                    │
└─────────────────────────────────────┘

3.2 优化方案

# resource_isolation.py
from typing import Dict, List

class ResourceIsolator:
    """资源隔离器"""
    
    def __init__(self):
        self.tenant_quotas: Dict[str, Dict] = {}
        self.tenant_usage: Dict[str, Dict] = {}
    
    def set_tenant_quota(
        self,
        tenant_id: str,
        quota: Dict
    ):
        """设置租户配额"""
        self.tenant_quotas[tenant_id] = quota
        self.tenant_usage[tenant_id] = {
            'tokens': 0,
            'requests': 0,
            'memory_mb': 0
        }
    
    def check_quota(
        self,
        tenant_id: str,
        resource_type: str,
        required: int
    ) -> bool:
        """检查配额"""
        quota = self.tenant_quotas.get(tenant_id, {})
        usage = self.tenant_usage.get(tenant_id, {})
        
        quota_limit = quota.get(resource_type, float('inf'))
        current_usage = usage.get(resource_type, 0)
        
        return current_usage + required <= quota_limit
    
    def record_usage(
        self,
        tenant_id: str,
        resource_type: str,
        amount: int
    ):
        """记录使用量"""
        if tenant_id not in self.tenant_usage:
            self.tenant_usage[tenant_id] = {}
        
        current = self.tenant_usage[tenant_id].get(resource_type, 0)
        self.tenant_usage[tenant_id][resource_type] = current + amount
    
    def get_isolated_pool(
        self,
        tenant_id: str,
        resource_type: str
    ):
        """获取隔离资源池"""
        # 根据租户等级返回不同的资源池
        tier = self.tenant_quotas.get(tenant_id, {}).get('tier', 'basic')
        
        if tier == 'enterprise':
            return DedicatedResourcePool(tenant_id)
        elif tier == 'business':
            return SharedResourcePool(tenant_id, max_share=0.3)
        else:
            return SharedResourcePool(tenant_id, max_share=0.1)

# 优化效果
"""
优化后指标:
┌─────────────────────────────────────┐
│ 租户数:50(支持 100+)              │
│ 资源争用:基本消除                   │
│ 大租户资源占比:30%(-50%)          │
│ 小租户响应时间:降低 70%              │
│ 资源利用率:75%(+35%)              │
└─────────────────────────────────────┘
"""

四、总结

4.1 优化方法论

性能优化流程:

1. 建立基线
   - 收集当前指标
   - 识别瓶颈
   - 设定目标

2. 分析瓶颈
   - 性能剖析
   - 资源监控
   - 用户反馈

3. 实施优化
   - 优先处理关键瓶颈
   - 小步快跑
   - 持续验证

4. 验证效果
   - 对比优化前后
   - 监控副作用
   - 用户验收

4.2 关键经验

  1. 数据驱动

    • 基于指标做决策
    • 避免过早优化
    • 持续监控
  2. 系统化思维

    • 考虑整体影响
    • 避免局部优化
    • 平衡各方面
  3. 渐进式改进

    • 小步验证
    • 快速回滚
    • 持续迭代

参考资料


分享这篇文章到:

上一篇文章
Redis 哨兵集群实战
下一篇文章
RocketMQ 架构设计与核心原理