RAG 分块策略与优化实战
文本分块(Chunking)是 RAG 系统的核心环节,直接影响检索质量和生成效果。如何选择合适的分块策略?如何优化分块效果?本文将深入解析 RAG 分块策略与优化实战。
一、分块策略概述
1.1 为什么需要分块
分块的必要性:
┌─────────────────────────────────────┐
│ 1. 模型限制 │
│ - LLM 上下文窗口有限 │
│ - Embedding 模型有长度限制 │
├─────────────────────────────────────┤
│ 2. 检索精度 │
│ - 小块检索更精准 │
│ - 减少无关信息干扰 │
├─────────────────────────────────────┤
│ 3. 成本控制 │
│ - 减少 Token 消耗 │
│ - 降低 API 调用成本 │
├─────────────────────────────────────┤
│ 4. 响应速度 │
│ - 小块处理更快 │
│ - 并行处理提升效率 │
└─────────────────────────────────────┘
1.2 分块挑战
- 粒度选择:太大包含噪声,太小丢失上下文
- 边界确定:如何保持语义完整性
- 重叠处理:如何平衡冗余和完整性
- 性能权衡:分块质量 vs 处理速度
1.3 主流分块策略
分块策略对比:
┌─────────────────────────────────────────┐
│ 策略 │ 优点 │ 缺点 │ 适用场景│
├─────────────────────────────────────────┤
│ 固定分块 │ 简单快速 │ 割裂语义 │ 通用 │
│ 语义分块 │ 保持语义 │ 复杂慢速 │ 高质量 │
│ 递归分块 │ 灵活平衡 │ 参数复杂 │ 文档 │
│ 段落分块 │ 自然边界 │ 粒度不均 │ 文章 │
│ 句子分块 │ 最细粒度 │ 丢失上下文│ 精确检索│
└─────────────────────────────────────────┘
二、固定分块策略
2.1 基础实现
# chunkers/fixed_chunker.py
from typing import List, Dict
from dataclasses import dataclass
@dataclass
class Chunk:
"""文本块"""
id: str
content: str
start_index: int
end_index: int
metadata: Dict
class FixedSizeChunker:
"""固定大小分块器"""
def __init__(self, chunk_size: int = 500, overlap: int = 50):
"""
初始化分块器
Args:
chunk_size: 每块大小(字符数)
overlap: 重叠大小(字符数)
"""
self.chunk_size = chunk_size
self.overlap = overlap
def chunk(self, text: str, metadata: Dict = None) -> List[Chunk]:
"""将文本分成固定大小的块"""
chunks = []
start = 0
chunk_id = 0
while start < len(text):
end = start + self.chunk_size
content = text[start:end]
chunk = Chunk(
id=f"chunk_{chunk_id}",
content=content,
start_index=start,
end_index=end,
metadata=metadata or {}
)
chunks.append(chunk)
start += self.chunk_size - self.overlap
chunk_id += 1
return chunks
2.2 基于 Token 的分块
# chunkers/token_chunker.py
from typing import List, Dict
import tiktoken
class TokenSizeChunker:
"""基于 Token 大小的分块器"""
def __init__(
self,
chunk_size: int = 500,
overlap: int = 50,
encoding_name: str = "cl100k_base"
):
"""
初始化 Token 分块器
Args:
chunk_size: 每块 Token 数
overlap: 重叠 Token 数
encoding_name: tiktoken 编码器名称
"""
self.chunk_size = chunk_size
self.overlap = overlap
self.encoding = tiktoken.get_encoding(encoding_name)
def count_tokens(self, text: str) -> int:
"""计算 Token 数"""
return len(self.encoding.encode(text))
def chunk(self, text: str, metadata: Dict = None) -> List[Dict]:
"""按 Token 分块"""
# 先按句子分割
sentences = self._split_into_sentences(text)
chunks = []
current_chunk = []
current_tokens = 0
chunk_id = 0
for sentence in sentences:
sentence_tokens = self.count_tokens(sentence)
# 如果当前块 + 新句子超过限制
if current_tokens + sentence_tokens > self.chunk_size:
# 保存当前块
if current_chunk:
chunk_text = ' '.join(current_chunk)
chunks.append({
'id': f"chunk_{chunk_id}",
'content': chunk_text,
'token_count': current_tokens,
'metadata': metadata or {}
})
chunk_id += 1
# 处理重叠
overlap_tokens = self._get_overlap_tokens(
current_chunk,
self.overlap
)
current_chunk = overlap_tokens
current_tokens = self.count_tokens(' '.join(current_chunk))
# 添加句子到当前块
current_chunk.append(sentence)
current_tokens += sentence_tokens
# 添加最后一个块
if current_chunk:
chunk_text = ' '.join(current_chunk)
chunks.append({
'id': f"chunk_{chunk_id}",
'content': chunk_text,
'token_count': current_tokens,
'metadata': metadata or {}
})
return chunks
def _split_into_sentences(self, text: str) -> List[str]:
"""分割成句子"""
import re
# 简单句子分割
sentences = re.split(r'[.!?。!?]+', text)
return [s.strip() for s in sentences if s.strip()]
def _get_overlap_tokens(self, chunk: List[str], overlap_tokens: int) -> List[str]:
"""获取重叠部分"""
# 从后往前取,直到达到重叠 Token 数
result = []
current_tokens = 0
for sentence in reversed(chunk):
sentence_tokens = self.count_tokens(sentence)
if current_tokens + sentence_tokens > overlap_tokens:
break
result.insert(0, sentence)
current_tokens += sentence_tokens
return result
三、语义分块策略
3.1 基于句子的分块
# chunkers/sentence_chunker.py
from typing import List, Dict
import spacy
class SentenceChunker:
"""句子级别分块器"""
def __init__(self, max_sentences_per_chunk: int = 5):
"""
初始化句子分块器
Args:
max_sentences_per_chunk: 每块最大句子数
"""
self.max_sentences = max_sentences_per_chunk
self.nlp = spacy.load('zh_core_web_sm') # 或 en_core_web_sm
def chunk(self, text: str, metadata: Dict = None) -> List[Dict]:
"""按句子分块"""
doc = self.nlp(text)
sentences = [sent.text.strip() for sent in doc.sents]
chunks = []
chunk_id = 0
for i in range(0, len(sentences), self.max_sentences):
chunk_sentences = sentences[i:i + self.max_sentences]
chunk_text = ' '.join(chunk_sentences)
chunks.append({
'id': f"chunk_{chunk_id}",
'content': chunk_text,
'sentence_count': len(chunk_sentences),
'metadata': metadata or {}
})
chunk_id += 1
return chunks
3.2 基于语义相似度的分块
# chunkers/semantic_chunker.py
from typing import List, Dict, Tuple
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
class SemanticChunker:
"""语义分块器"""
def __init__(
self,
embedding_model=None,
threshold: float = 0.5,
min_chunk_size: int = 100,
max_chunk_size: int = 1000
):
"""
初始化语义分块器
Args:
embedding_model: Embedding 模型
threshold: 语义相似度阈值
min_chunk_size: 最小块大小
max_chunk_size: 最大块大小
"""
self.embedding_model = embedding_model
self.threshold = threshold
self.min_size = min_chunk_size
self.max_size = max_chunk_size
def chunk(self, text: str, metadata: Dict = None) -> List[Dict]:
"""基于语义相似度分块"""
# 先按句子分割
sentences = self._split_sentences(text)
if not sentences:
return []
# 计算句子嵌入
embeddings = self._get_embeddings(sentences)
# 基于语义相似度合并句子
chunks = self._merge_by_similarity(
sentences,
embeddings
)
# 格式化输出
result = []
for i, chunk in enumerate(chunks):
result.append({
'id': f"chunk_{i}",
'content': ' '.join(chunk),
'sentence_count': len(chunk),
'metadata': metadata or {}
})
return result
def _split_sentences(self, text: str) -> List[str]:
"""分割句子"""
import re
sentences = re.split(r'[.!?。!?]+', text)
return [s.strip() for s in sentences if s.strip()]
def _get_embeddings(self, sentences: List[str]) -> np.ndarray:
"""计算句子嵌入"""
if self.embedding_model:
return self.embedding_model.encode(sentences)
else:
# 使用简单的词袋模型作为默认
from sklearn.feature_extraction.text import TfidfVectorizer
vectorizer = TfidfVectorizer()
return vectorizer.fit_transform(sentences).toarray()
def _merge_by_similarity(
self,
sentences: List[str],
embeddings: np.ndarray
) -> List[List[str]]:
"""基于相似度合并句子"""
chunks = []
current_chunk = [sentences[0]]
for i in range(1, len(sentences)):
# 计算当前句子与上一个句子的相似度
similarity = cosine_similarity(
[embeddings[i-1]],
[embeddings[i]]
)[0][0]
# 如果相似度低于阈值,开始新块
if similarity < self.threshold:
if len(' '.join(current_chunk)) >= self.min_size:
chunks.append(current_chunk)
current_chunk = []
else:
# 块太小,合并到前一个块
if chunks:
chunks[-1].extend(current_chunk)
current_chunk = []
current_chunk.append(sentences[i])
# 检查块大小
if len(' '.join(current_chunk)) > self.max_size:
chunks.append(current_chunk[:-1])
current_chunk = [current_chunk[-1]]
# 添加最后一个块
if current_chunk:
chunks.append(current_chunk)
return chunks
四、递归分块策略
4.1 LangChain 风格递归分块
# chunkers/recursive_chunker.py
from typing import List, Dict, Optional
class RecursiveChunker:
"""递归分块器"""
def __init__(
self,
chunk_size: int = 500,
chunk_overlap: int = 50,
separators: Optional[List[str]] = None
):
"""
初始化递归分块器
Args:
chunk_size: 块大小
chunk_overlap: 重叠大小
separators: 分隔符列表(按优先级)
"""
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.separators = separators or [
"\n\n", # 段落
"\n", # 换行
"。", # 句号
"!", # 感叹号
"?", # 问号
";", # 分号
",", # 逗号
" ", # 空格
"" # 字符
]
def chunk(self, text: str, metadata: Dict = None) -> List[Dict]:
"""递归分块"""
chunks = self._recursive_split(
text,
self.separators
)
# 格式化
result = []
for i, chunk in enumerate(chunks):
result.append({
'id': f"chunk_{i}",
'content': chunk,
'metadata': metadata or {}
})
return result
def _recursive_split(
self,
text: str,
separators: List[str]
) -> List[str]:
"""递归分割"""
# 如果文本足够小,直接返回
if len(text) <= self.chunk_size:
return [text]
# 尝试用当前分隔符分割
separator = separators[0] if separators else ""
if separator:
splits = text.split(separator)
else:
splits = list(text)
# 如果分割后还是太大,递归使用下一个分隔符
if len(splits) == 1 and len(separators) > 1:
return self._recursive_split(
text,
separators[1:]
)
# 合并小块
chunks = []
current_chunk = []
current_length = 0
for split in splits:
split_length = len(split)
if current_length + split_length > self.chunk_size:
# 保存当前块
if current_chunk:
chunk_text = separator.join(current_chunk)
chunks.append(chunk_text)
current_chunk = []
current_length = 0
# 如果单个 split 就超过限制,递归分割
if split_length > self.chunk_size:
sub_chunks = self._recursive_split(
split,
separators[1:] if len(separators) > 1 else [""]
)
chunks.extend(sub_chunks)
else:
current_chunk = [split]
current_length = split_length
else:
current_chunk.append(split)
current_length += split_length + len(separator)
# 添加最后一个块
if current_chunk:
chunk_text = separator.join(current_chunk)
chunks.append(chunk_text)
return chunks
4.2 带重叠的递归分块
# chunkers/recursive_overlap_chunker.py
from typing import List, Dict
class RecursiveOverlapChunker:
"""带重叠的递归分块器"""
def __init__(
self,
chunk_size: int = 500,
chunk_overlap: int = 50,
length_function=len
):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self._length_function = length_function
self._separators = ["\n\n", "\n", "。", "!", "?", ";", ",", " ", ""]
def create_chunks(
self,
text: str,
metadata: Dict = None
) -> List[Dict]:
"""创建带重叠的块"""
chunks = self._split_text(text)
# 添加重叠
chunks_with_overlap = self._add_overlap(chunks)
# 格式化
result = []
for i, chunk in enumerate(chunks_with_overlap):
result.append({
'id': f"chunk_{i}",
'content': chunk,
'metadata': metadata or {}
})
return result
def _split_text(self, text: str) -> List[str]:
"""分割文本"""
return self._split_text_recursive(text, self._separators)
def _split_text_recursive(
self,
text: str,
separators: List[str]
) -> List[str]:
"""递归分割"""
final_chunks = []
# 获取当前分隔符
separator = separators[0] if separators else ""
new_separators = separators[1:] if len(separators) > 1 else [""]
# 分割
if separator:
splits = text.split(separator)
else:
splits = list(text)
# 处理分割
good_splits = []
for split in splits:
if self._length_function(split) < self.chunk_size:
good_splits.append(split)
else:
# 递归分割
recursively_split = self._split_text_recursive(
split,
new_separators
)
good_splits.extend(recursively_split)
# 合并
if good_splits:
current_chunk = good_splits[0]
for split in good_splits[1:]:
if self._length_function(current_chunk + separator + split) <= self.chunk_size:
current_chunk += separator + split
else:
final_chunks.append(current_chunk)
current_chunk = split
final_chunks.append(current_chunk)
return final_chunks
def _add_overlap(self, chunks: List[str]) -> List[str]:
"""添加重叠"""
if self.chunk_overlap <= 0:
return chunks
chunks_with_overlap = []
for i in range(len(chunks)):
chunk = chunks[i]
# 获取前一个块的重叠部分
if i > 0:
prev_chunk = chunks[i - 1]
overlap = self._get_overlap(prev_chunk, self.chunk_overlap)
chunk = overlap + chunk
chunks_with_overlap.append(chunk)
return chunks_with_overlap
def _get_overlap(self, text: str, overlap_size: int) -> str:
"""获取重叠部分"""
if len(text) <= overlap_size:
return text
# 从后往前找合适的分割点
overlap_text = text[-overlap_size:]
# 尝试在句子边界处分割
for sep in ["。", "!", "?", ";", ",", "\n", " "]:
if sep in overlap_text:
last_sep_index = overlap_text.rfind(sep)
return overlap_text[:last_sep_index + 1]
return overlap_text
五、分块质量评估
5.1 评估指标
# chunk_evaluator.py
from typing import List, Dict
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
class ChunkEvaluator:
"""分块质量评估器"""
def __init__(self, embedding_model=None):
self.embedding_model = embedding_model
def evaluate(
self,
chunks: List[str],
original_text: str
) -> Dict:
"""评估分块质量"""
metrics = {}
# 1. 覆盖率
metrics['coverage'] = self._calculate_coverage(
chunks,
original_text
)
# 2. 冗余度
metrics['redundancy'] = self._calculate_redundancy(chunks)
# 3. 语义连贯性
metrics['coherence'] = self._calculate_coherence(chunks)
# 4. 粒度分布
metrics['granularity'] = self._calculate_granularity(chunks)
# 5. 综合得分
metrics['overall_score'] = (
metrics['coverage'] * 0.3 +
(1 - metrics['redundancy']) * 0.2 +
metrics['coherence'] * 0.3 +
metrics['granularity'] * 0.2
)
return metrics
def _calculate_coverage(
self,
chunks: List[str],
original_text: str
) -> float:
"""计算覆盖率"""
chunk_text = ''.join(chunks)
# 计算字符覆盖率
original_chars = set(original_text)
chunk_chars = set(chunk_text)
coverage = len(chunk_chars & original_chars) / len(original_chars)
return coverage
def _calculate_redundancy(self, chunks: List[str]) -> float:
"""计算冗余度"""
if len(chunks) < 2:
return 0.0
# 计算相邻块的重叠率
overlaps = []
for i in range(len(chunks) - 1):
overlap = self._calculate_overlap(chunks[i], chunks[i+1])
overlaps.append(overlap)
return np.mean(overlaps)
def _calculate_overlap(self, text1: str, text2: str) -> float:
"""计算重叠率"""
words1 = set(text1.split())
words2 = set(text2.split())
if not words1 or not words2:
return 0.0
overlap = len(words1 & words2)
total = len(words1 | words2)
return overlap / total
def _calculate_coherence(self, chunks: List[str]) -> float:
"""计算语义连贯性"""
if len(chunks) < 2:
return 1.0
# 使用余弦相似度计算相邻块的语义连贯性
if self.embedding_model:
embeddings = self.embedding_model.encode(chunks)
similarities = []
for i in range(len(chunks) - 1):
sim = cosine_similarity(
[embeddings[i]],
[embeddings[i+1]]
)[0][0]
similarities.append(sim)
return np.mean(similarities)
else:
# 简化版本:基于词汇重叠
return 1 - self._calculate_redundancy(chunks)
def _calculate_granularity(self, chunks: List[str]) -> float:
"""计算粒度分布"""
if not chunks:
return 0.0
# 计算块大小的标准差
sizes = [len(chunk) for chunk in chunks]
mean_size = np.mean(sizes)
std_size = np.std(sizes)
# 标准差越小,粒度越均匀
if mean_size == 0:
return 0.0
coefficient_of_variation = std_size / mean_size
score = 1 - min(coefficient_of_variation, 1.0)
return score
六、性能优化
6.1 并行分块
# chunkers/parallel_chunker.py
from typing import List, Dict
from concurrent.futures import ThreadPoolExecutor
class ParallelChunker:
"""并行分块器"""
def __init__(self, base_chunker, max_workers: int = 4):
self.base_chunker = base_chunker
self.max_workers = max_workers
def chunk_batch(
self,
documents: List[Dict],
metadata: Dict = None
) -> List[Dict]:
"""批量并行分块"""
all_chunks = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {
executor.submit(
self.base_chunker.chunk,
doc['content'],
{**metadata, **doc.get('metadata', {})}
): i
for i, doc in enumerate(documents)
}
for future in futures:
chunks = future.result()
all_chunks.extend(chunks)
return all_chunks
6.2 智能分块选择器
# chunkers/smart_chunk_selector.py
from typing import Dict, Type
class SmartChunkSelector:
"""智能分块选择器"""
def __init__(self):
self.chunkers = {
'fixed': FixedSizeChunker,
'token': TokenSizeChunker,
'sentence': SentenceChunker,
'semantic': SemanticChunker,
'recursive': RecursiveChunker
}
def select_chunker(
self,
document_type: str,
text_length: int,
use_case: str
) -> Type:
"""根据场景选择分块器"""
# 短文本
if text_length < 1000:
return self.chunkers['sentence']
# 长文档
if document_type in ['pdf', 'doc']:
return self.chunkers['recursive']
# 高质量检索
if use_case == 'high_quality_retrieval':
return self.chunkers['semantic']
# 通用场景
return self.chunkers['recursive']
def get_optimal_params(
self,
chunker_type: str,
text_stats: Dict
) -> Dict:
"""获取最优参数"""
avg_sentence_length = text_stats.get('avg_sentence_length', 50)
if chunker_type == 'fixed':
return {
'chunk_size': max(500, avg_sentence_length * 10),
'overlap': 50
}
elif chunker_type == 'token':
return {
'chunk_size': 512,
'overlap': 50
}
elif chunker_type == 'sentence':
return {
'max_sentences_per_chunk': 5
}
elif chunker_type == 'semantic':
return {
'threshold': 0.5,
'min_chunk_size': 200,
'max_chunk_size': 800
}
else:
return {
'chunk_size': 500,
'chunk_overlap': 50
}
七、实战案例
7.1 技术文档分块
# case_technical_docs.py
from chunkers.recursive_chunker import RecursiveChunker
from chunk_evaluator import ChunkEvaluator
# 配置
chunker = RecursiveChunker(
chunk_size=500,
chunk_overlap=50,
separators=["\n\n", "\n", "。", "!", "?", ".", "!", "?"]
)
evaluator = ChunkEvaluator()
# 处理技术文档
with open('technical_doc.md', 'r', encoding='utf-8') as f:
text = f.read()
# 分块
chunks = chunker.chunk(text, {'source': 'technical_doc'})
# 评估
metrics = evaluator.evaluate(
[c['content'] for c in chunks],
text
)
print(f"分块数:{len(chunks)}")
print(f"覆盖率:{metrics['coverage']:.2%}")
print(f"冗余度:{metrics['redundancy']:.2%}")
print(f"连贯性:{metrics['coherence']:.2%}")
print(f"综合得分:{metrics['overall_score']:.2%}")
7.2 分块策略对比实验
# case_chunk_comparison.py
import time
# 准备测试文本
test_text = load_test_document()
# 测试不同分块策略
chunkers = {
'fixed': FixedSizeChunker(chunk_size=500, overlap=50),
'token': TokenSizeChunker(chunk_size=512, overlap=50),
'recursive': RecursiveChunker(chunk_size=500, overlap=50),
'semantic': SemanticChunker(threshold=0.5)
}
results = {}
for name, chunker in chunkers.items():
start = time.time()
chunks = chunker.chunk(test_text)
elapsed = time.time() - start
metrics = evaluator.evaluate(
[c['content'] for c in chunks],
test_text
)
results[name] = {
'chunk_count': len(chunks),
'avg_chunk_size': sum(len(c['content']) for c in chunks) / len(chunks),
'processing_time': elapsed,
'quality_score': metrics['overall_score']
}
# 打印对比结果
print("\n=== 分块策略对比 ===\n")
print(f"{'策略':<10} {'块数':<6} {'平均大小':<10} {'耗时 (s)':<10} {'质量得分':<10}")
print("-" * 60)
for name, result in results.items():
print(f"{name:<10} {result['chunk_count']:<6} "
f"{result['avg_chunk_size']:<10.0f} "
f"{result['processing_time']:<10.3f} "
f"{result['quality_score']:<10.2%}")
八、总结
8.1 分块策略选择指南
| 场景 | 推荐策略 | 参数建议 |
|---|---|---|
| 通用文档 | 递归分块 | chunk_size=500, overlap=50 |
| 技术文档 | 递归分块 | chunk_size=800, overlap=100 |
| 对话记录 | 句子分块 | max_sentences=3 |
| 高质量检索 | 语义分块 | threshold=0.5 |
| 实时处理 | 固定分块 | chunk_size=512 |
| Token 敏感 | Token 分块 | chunk_size=512 |
8.2 最佳实践
-
选择合适的分块粒度
- 太大会包含噪声
- 太小会丢失上下文
- 建议:300-800 字符
-
设置合理的重叠
- 保持上下文连贯性
- 避免过度冗余
- 建议:10-20% 重叠
-
考虑文档类型
- 结构化文档:利用自然边界
- 非结构化:递归分块
- 混合内容:组合策略
-
持续评估优化
- 监控检索效果
- 调整分块参数
- A/B 测试验证
参考资料