AI 应用性能优化实战案例
本文通过多个真实案例,详解 AI 应用性能优化的实战方法和经验教训。
一、案例一:RAG 系统延迟优化
1.1 问题背景
优化前指标:
┌─────────────────────────────────────┐
│ P50 延迟:3.5 秒 │
│ P95 延迟:8.2 秒 │
│ P99 延迟:15.3 秒 │
│ QPS: 50 │
│ 用户投诉率高 │
└─────────────────────────────────────┘
优化目标:
┌─────────────────────────────────────┐
│ P50 延迟:< 1 秒 │
│ P95 延迟:< 3 秒 │
│ QPS: > 200 │
└─────────────────────────────────────┘
1.2 性能分析
# performance_analysis.py
import cProfile
import pstats
from typing import Dict, List
class RAGPerformanceAnalyzer:
"""RAG 性能分析器"""
def __init__(self):
self.profiler = cProfile.Profile()
def profile_query(self, rag_system, query: str) -> Dict:
"""分析查询性能"""
# 启动性能分析
self.profiler.enable()
result = rag_system.query(query)
self.profiler.disable()
# 分析结果
stats = pstats.Stats(self.profiler)
stats.sort_stats('cumulative')
# 提取关键信息
bottlenecks = self._identify_bottlenecks(stats)
return {
'latency_ms': result.get('latency_ms', 0),
'bottlenecks': bottlenecks,
'recommendations': self._generate_recommendations(bottlenecks)
}
def _identify_bottlenecks(self, stats) -> List[Dict]:
"""识别瓶颈"""
bottlenecks = []
# 分析调用统计
for func, (cc, nc, tt, ct, callers) in stats.stats.items():
if ct > 0.5: # 累计时间超过 0.5 秒
bottlenecks.append({
'function': func[2],
'cumulative_time': ct,
'call_count': nc,
'avg_time': ct / nc if nc > 0 else 0
})
return sorted(
bottlenecks,
key=lambda x: x['cumulative_time'],
reverse=True
)[:10]
def _generate_recommendations(
self,
bottlenecks: List[Dict]
) -> List[str]:
"""生成优化建议"""
recommendations = []
for bn in bottlenecks:
if 'embedding' in bn['function']:
recommendations.append(
'考虑批量计算 Embedding 或使用缓存'
)
elif 'retrieve' in bn['function']:
recommendations.append(
'优化向量检索,使用近似检索或增加索引'
)
elif 'generate' in bn['function']:
recommendations.append(
'优化 Prompt 长度或使用流式输出'
)
return recommendations
# 分析结果
"""
性能瓶颈分析:
1. Embedding 计算(35%)
- 每次查询计算 10 个文档的 Embedding
- 串行计算,总耗时 1.2 秒
2. 向量检索(25%)
- 暴力搜索,未使用索引
- 检索 10000 个向量,耗时 0.8 秒
3. LLM 生成(30%)
- Prompt 过长(3000 Token)
- 等待完整响应,耗时 1.0 秒
4. 其他(10%)
- 网络 IO、序列化等
"""
1.3 优化方案
# optimization_implementation.py
from typing import Dict, List
import asyncio
class OptimizedRAGSystem:
"""优化后的 RAG 系统"""
def __init__(self, config):
# 1. 启用 Embedding 缓存
self.embedding_cache = SemanticCache(
embedding_model=config['embedding_model'],
similarity_threshold=0.99
)
# 2. 使用 HNSW 索引
self.vector_index = HNSWIndex(
dimension=config['embedding_dimension'],
M=32,
ef_construction=200
)
# 3. 启用多级缓存
self.response_cache = MultiLevelCache(
l1_max_size=1000,
l1_ttl_minutes=5,
l2_ttl_hours=24
)
# 4. 流式输出
self.enable_streaming = True
async def query(self, query: str) -> Dict:
"""优化后的查询"""
start_time = datetime.now()
# 1. 检查响应缓存
cached = self.response_cache.get(query)
if cached:
return {
**cached,
'from_cache': True,
'latency_ms': (datetime.now() - start_time).total_seconds() * 1000
}
# 2. 并行计算 Embedding
query_embedding = await self._get_embedding_cached(query)
# 3. 使用 HNSW 索引检索
retrieved = await self._retrieve_fast(query_embedding, top_k=10)
# 4. 优化上下文
context = self._optimize_context(retrieved, query)
# 5. 流式生成
if self.enable_streaming:
result = await self._generate_streaming(query, context)
else:
result = await self._generate(query, context)
# 6. 缓存结果
self.response_cache.set(query, result)
result['latency_ms'] = (
datetime.now() - start_time
).total_seconds() * 1000
return result
async def _get_embedding_cached(self, text: str) -> List[float]:
"""获取缓存的 Embedding"""
cached = self.embedding_cache.get(text)
if cached:
return cached
embedding = await self.embedding_model.encode([text])[0]
self.embedding_cache.set(text, embedding)
return embedding
async def _retrieve_fast(
self,
query_embedding: List[float],
top_k: int
) -> List[Dict]:
"""快速检索"""
# 使用 HNSW 索引,大幅降低检索时间
indices, distances = self.vector_index.search(
[query_embedding],
k=top_k
)
return [
{'id': idx, 'distance': dist}
for idx, dist in zip(indices[0], distances[0])
]
def _optimize_context(
self,
retrieved: List[Dict],
query: str
) -> str:
"""优化上下文"""
# 只保留最相关的 5 个文档
# 压缩每个文档到 200 Token
# ...
pass
async def _generate_streaming(
self,
query: str,
context: str
) -> Dict:
"""流式生成"""
# 实现流式输出,降低感知延迟
# ...
pass
# 优化效果
"""
优化后指标:
┌─────────────────────────────────────┐
│ P50 延迟:0.8 秒(-77%) │
│ P95 延迟:2.1 秒(-74%) │
│ P99 延迟:4.5 秒(-71%) │
│ QPS: 220(+340%) │
│ 用户投诉率:降低 85% │
└─────────────────────────────────────┘
优化措施贡献度:
┌─────────────────────────────────────┐
│ Embedding 缓存:30% │
│ HNSW 索引:25% │
│ 响应缓存:20% │
│ 上下文优化:15% │
│ 流式输出:10% │
└─────────────────────────────────────┘
"""
二、案例二:Agent 系统吞吐量提升
2.1 问题背景
优化前指标:
┌─────────────────────────────────────┐
│ 并发用户:100 │
│ 请求成功率:95% │
│ 平均响应时间:2.5 秒 │
│ 系统资源利用率:CPU 80%, 内存 70% │
│ 高峰期请求失败率:15% │
└─────────────────────────────────────┘
优化目标:
┌─────────────────────────────────────┐
│ 并发用户:> 500 │
│ 请求成功率:> 99% │
│ 平均响应时间:< 1.5 秒 │
│ 高峰期请求失败率:< 2% │
└─────────────────────────────────────┘
2.2 性能瓶颈
# bottleneck_analysis.py
from typing import Dict, List
class AgentBottleneckAnalyzer:
"""Agent 系统瓶颈分析器"""
def analyze(self, metrics: Dict) -> Dict:
"""分析系统瓶颈"""
bottlenecks = []
# 检查 LLM 调用瓶颈
if metrics.get('llm_queue_length', 0) > 50:
bottlenecks.append({
'component': 'LLM Pool',
'issue': 'LLM 调用队列过长',
'severity': 'high',
'solution': '增加 LLM 并发或启用批处理'
})
# 检查内存瓶颈
if metrics.get('memory_usage', 0) > 0.8:
bottlenecks.append({
'component': 'Memory',
'issue': '内存使用率过高',
'severity': 'high',
'solution': '优化上下文长度或增加实例'
})
# 检查 Agent 实例瓶颈
if metrics.get('agent_utilization', 0) > 0.9:
bottlenecks.append({
'component': 'Agent Workers',
'issue': 'Agent 实例饱和',
'severity': 'critical',
'solution': '水平扩展 Agent 实例'
})
return {
'bottlenecks': bottlenecks,
'priority_order': self._prioritize(bottlenecks)
}
def _prioritize(self, bottlenecks: List[Dict]) -> List[str]:
"""优先级排序"""
severity_order = {'critical': 0, 'high': 1, 'medium': 2, 'low': 3}
sorted_bottlenecks = sorted(
bottlenecks,
key=lambda x: severity_order.get(x['severity'], 99)
)
return [b['component'] for b in sorted_bottlenecks]
# 分析结果
"""
瓶颈分析:
1. Agent 实例饱和(Critical)
- 10 个 Agent 实例处理 100 并发
- 每个实例处理能力:10 QPS
- 利用率:95%
2. LLM 调用队列长(High)
- LLM 并发限制:5
- 平均等待时间:1.2 秒
- 队列长度:50+
3. 内存压力大(High)
- 平均每个会话占用:50MB
- 总内存:8GB
- 使用率:85%
"""
2.3 优化实施
# optimization_implementation.py
from typing import Dict, List
import asyncio
class OptimizedAgentSystem:
"""优化后的 Agent 系统"""
def __init__(self, config):
# 1. Agent 实例池化
self.agent_pool = AgentPool(
min_instances=10,
max_instances=100,
scale_up_threshold=0.8,
scale_down_threshold=0.3
)
# 2. LLM 批处理
self.llm_batcher = LLMBatchOptimizer(
max_batch_size=20,
max_wait_ms=100
)
# 3. 会话状态外部化
self.session_store = RedisSessionStore(
host=config['redis_host'],
port=config['redis_port']
)
# 4. 请求限流
self.rate_limiter = TokenBucketRateLimiter(
rate=1000, # 每秒 1000 请求
capacity=2000
)
async def process(self, request: Dict) -> Dict:
"""处理请求"""
# 1. 限流检查
if not self.rate_limiter.allow():
return {
'error': 'Rate limit exceeded',
'retry_after': 1
}
# 2. 获取 Agent 实例
agent = await self.agent_pool.acquire()
try:
# 3. 加载会话状态
session_state = await self.session_store.get(
request['session_id']
)
# 4. 处理请求
result = await agent.process(
request['query'],
session_state
)
# 5. 保存会话状态
await self.session_store.set(
request['session_id'],
agent.state
)
return result
finally:
# 6. 释放 Agent 实例
await self.agent_pool.release(agent)
async def batch_llm_calls(
self,
prompts: List[str]
) -> List[str]:
"""批量 LLM 调用"""
return await self.llm_batcher.submit_batch(prompts)
class AgentPool:
"""Agent 实例池"""
def __init__(
self,
min_instances: int,
max_instances: int,
scale_up_threshold: float,
scale_down_threshold: float
):
self.min_instances = min_instances
self.max_instances = max_instances
self.scale_up_threshold = scale_up_threshold
self.scale_down_threshold = scale_down_threshold
self.instances: List = []
self.in_use: set = set()
# 初始化最小实例数
for i in range(min_instances):
self.instances.append(self._create_agent())
async def acquire(self):
"""获取实例"""
# 查找空闲实例
for i, instance in enumerate(self.instances):
if i not in self.in_use:
self.in_use.add(i)
return instance
# 没有空闲实例,尝试扩展
if len(self.instances) < self.max_instances:
new_instance = self._create_agent()
self.instances.append(new_instance)
self.in_use.add(len(self.instances) - 1)
return new_instance
# 等待实例释放
while True:
await asyncio.sleep(0.1)
for i, instance in enumerate(self.instances):
if i not in self.in_use:
self.in_use.add(i)
return instance
async def release(self, instance):
"""释放实例"""
idx = self.instances.index(instance)
self.in_use.discard(idx)
# 检查是否可以缩容
if (
len(self.instances) > self.min_instances and
len(self.in_use) / len(self.instances) < self.scale_down_threshold
):
# 缩容逻辑
pass
def _create_agent(self):
"""创建 Agent 实例"""
return Agent()
# 优化效果
"""
优化后指标:
┌─────────────────────────────────────┐
│ 并发用户:550(+450%) │
│ 请求成功率:99.5%(+4.5%) │
│ 平均响应时间:1.2 秒(-52%) │
│ 高峰期请求失败率:1.2%(-92%) │
│ 资源利用率:CPU 65%, 内存 60% │
└─────────────────────────────────────┘
优化措施贡献度:
┌─────────────────────────────────────┐
│ Agent 池化:40% │
│ LLM 批处理:25% │
│ 状态外部化:20% │
│ 请求限流:15% │
└─────────────────────────────────────┘
"""
三、案例三:多租户系统资源优化
3.1 问题背景
优化前指标:
┌─────────────────────────────────────┐
│ 租户数:50 │
│ 资源争用严重 │
│ 大租户占用 80% 资源 │
│ 小租户响应慢 │
│ 资源利用率:40% │
└─────────────────────────────────────┘
优化目标:
┌─────────────────────────────────────┐
│ 租户隔离 │
│ 公平资源分配 │
│ 资源利用率:> 70% │
└─────────────────────────────────────┘
3.2 优化方案
# resource_isolation.py
from typing import Dict, List
class ResourceIsolator:
"""资源隔离器"""
def __init__(self):
self.tenant_quotas: Dict[str, Dict] = {}
self.tenant_usage: Dict[str, Dict] = {}
def set_tenant_quota(
self,
tenant_id: str,
quota: Dict
):
"""设置租户配额"""
self.tenant_quotas[tenant_id] = quota
self.tenant_usage[tenant_id] = {
'tokens': 0,
'requests': 0,
'memory_mb': 0
}
def check_quota(
self,
tenant_id: str,
resource_type: str,
required: int
) -> bool:
"""检查配额"""
quota = self.tenant_quotas.get(tenant_id, {})
usage = self.tenant_usage.get(tenant_id, {})
quota_limit = quota.get(resource_type, float('inf'))
current_usage = usage.get(resource_type, 0)
return current_usage + required <= quota_limit
def record_usage(
self,
tenant_id: str,
resource_type: str,
amount: int
):
"""记录使用量"""
if tenant_id not in self.tenant_usage:
self.tenant_usage[tenant_id] = {}
current = self.tenant_usage[tenant_id].get(resource_type, 0)
self.tenant_usage[tenant_id][resource_type] = current + amount
def get_isolated_pool(
self,
tenant_id: str,
resource_type: str
):
"""获取隔离资源池"""
# 根据租户等级返回不同的资源池
tier = self.tenant_quotas.get(tenant_id, {}).get('tier', 'basic')
if tier == 'enterprise':
return DedicatedResourcePool(tenant_id)
elif tier == 'business':
return SharedResourcePool(tenant_id, max_share=0.3)
else:
return SharedResourcePool(tenant_id, max_share=0.1)
# 优化效果
"""
优化后指标:
┌─────────────────────────────────────┐
│ 租户数:50(支持 100+) │
│ 资源争用:基本消除 │
│ 大租户资源占比:30%(-50%) │
│ 小租户响应时间:降低 70% │
│ 资源利用率:75%(+35%) │
└─────────────────────────────────────┘
"""
四、总结
4.1 优化方法论
性能优化流程:
1. 建立基线
- 收集当前指标
- 识别瓶颈
- 设定目标
2. 分析瓶颈
- 性能剖析
- 资源监控
- 用户反馈
3. 实施优化
- 优先处理关键瓶颈
- 小步快跑
- 持续验证
4. 验证效果
- 对比优化前后
- 监控副作用
- 用户验收
4.2 关键经验
-
数据驱动
- 基于指标做决策
- 避免过早优化
- 持续监控
-
系统化思维
- 考虑整体影响
- 避免局部优化
- 平衡各方面
-
渐进式改进
- 小步验证
- 快速回滚
- 持续迭代
参考资料