RAG 文档处理工程化实战
文档处理是 RAG(Retrieval-Augmented Generation)系统的基础环节。如何高效处理多格式文档?如何保证文档质量?本文将详解 RAG 文档处理的完整工程化方案。
一、文档处理挑战
1.1 现实痛点
文档处理痛点:
┌─────────────────────────────────────┐
│ 1. 格式多样 │
│ - PDF、Word、Excel、PPT │
│ - HTML、Markdown、纯文本 │
│ - 图片、扫描件 │
├─────────────────────────────────────┤
│ 2. 质量参差不齐 │
│ - 格式混乱、排版错误 │
│ - 噪声数据、无关内容 │
│ - 缺失信息、损坏文件 │
├─────────────────────────────────────┤
│ 3. 处理复杂度高 │
│ - 表格解析困难 │
│ - 公式识别复杂 │
│ - 多语言混合 │
├─────────────────────────────────────┤
│ 4. 规模化困难 │
│ - 大批量处理效率低 │
│ - 资源消耗大 │
│ - 错误处理复杂 │
└─────────────────────────────────────┘
1.2 工程化目标
- 标准化:统一处理流程,降低复杂度
- 自动化:减少人工干预,提升效率
- 可扩展:支持新格式,易于扩展
- 可监控:实时追踪处理状态和质量
二、文档处理架构
2.1 整体架构设计
graph LR
A[原始文档] --> B[文档接入层]
B --> C[格式检测]
C --> D[格式解析器]
D --> E[文本提取]
E --> F[清洗流水线]
F --> G[质量检查]
G --> H{质量合格?}
H -->|是 | I[文档存储]
H -->|否 | J[异常处理]
I --> K[元数据索引]
J --> L[人工审核]
2.2 核心组件
文档处理管道:
# document_pipeline.py
from typing import List, Dict, Optional
from dataclasses import dataclass
from enum import Enum
import hashlib
from pathlib import Path
class DocumentStatus(Enum):
"""文档状态"""
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
SKIPPED = "skipped"
@dataclass
class Document:
"""文档对象"""
id: str
source_path: str
file_type: str
file_size: int
content: str
metadata: Dict
status: DocumentStatus = DocumentStatus.PENDING
error_message: Optional[str] = None
created_at: str = ""
updated_at: str = ""
class DocumentProcessingPipeline:
"""文档处理管道"""
def __init__(self, config: Dict):
self.config = config
self.parsers = self._initialize_parsers()
self.cleaners = self._initialize_cleaners()
self.quality_checkers = self._initialize_quality_checkers()
def _initialize_parsers(self) -> Dict[str, 'BaseParser']:
"""初始化解析器"""
from parsers import PDFParser, WordParser, ExcelParser, HTMLParser
return {
'.pdf': PDFParser(),
'.doc': WordParser(),
'.docx': WordParser(),
'.xls': ExcelParser(),
'.xlsx': ExcelParser(),
'.html': HTMLParser(),
'.htm': HTMLParser(),
'.md': MarkdownParser(),
'.txt': TextParser()
}
def _initialize_cleaners(self) -> List['BaseCleaner']:
"""初始化清洗器"""
return [
WhitespaceCleaner(),
NoiseCleaner(),
FormatCleaner(),
EncodingCleaner()
]
def _initialize_quality_checkers(self) -> List['BaseQualityChecker']:
"""初始化质量检查器"""
return [
ContentLengthChecker(),
EncodingChecker(),
SensitiveInfoChecker()
]
def process(self, document: Document) -> Document:
"""处理文档"""
try:
document.status = DocumentStatus.PROCESSING
# 1. 格式检测
file_type = self._detect_file_type(document.source_path)
document.file_type = file_type
# 2. 文本提取
parser = self.parsers.get(file_type.lower())
if not parser:
raise ValueError(f"Unsupported file type: {file_type}")
content = parser.parse(document.source_path)
document.content = content
# 3. 文档 ID 生成
document.id = self._generate_document_id(document)
# 4. 清洗处理
for cleaner in self.cleaners:
document.content = cleaner.clean(document.content)
# 5. 质量检查
for checker in self.quality_checkers:
if not checker.check(document):
document.status = DocumentStatus.FAILED
document.error_message = f"Quality check failed: {checker.name}"
return document
# 6. 元数据提取
document.metadata = self._extract_metadata(document)
# 7. 完成处理
document.status = DocumentStatus.COMPLETED
except Exception as e:
document.status = DocumentStatus.FAILED
document.error_message = str(e)
return document
def _detect_file_type(self, file_path: str) -> str:
"""检测文件类型"""
from pathlib import Path
return Path(file_path).suffix.lower()
def _generate_document_id(self, document: Document) -> str:
"""生成文档 ID"""
content_hash = hashlib.md5(
document.content.encode('utf-8')
).hexdigest()
return f"doc_{content_hash[:16]}"
def _extract_metadata(self, document: Document) -> Dict:
"""提取元数据"""
from pathlib import Path
file_path = Path(document.source_path)
return {
'filename': file_path.name,
'file_size': file_path.stat().st_size,
'file_type': document.file_type,
'created_at': document.created_at,
'processed_at': document.updated_at,
'content_length': len(document.content),
'word_count': len(document.content.split()),
'source_path': document.source_path
}
三、多格式解析器
3.1 解析器基类
# parsers/base_parser.py
from abc import ABC, abstractmethod
from typing import Dict, Any
class BaseParser(ABC):
"""解析器基类"""
def __init__(self, config: Dict = None):
self.config = config or {}
@abstractmethod
def parse(self, file_path: str) -> str:
"""解析文件,返回文本内容"""
pass
@abstractmethod
def get_supported_extensions(self) -> List[str]:
"""获取支持的扩展名"""
pass
def validate_file(self, file_path: str) -> bool:
"""验证文件"""
import os
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
if os.path.getsize(file_path) == 0:
raise ValueError("Empty file")
return True
3.2 PDF 解析器
# parsers/pdf_parser.py
from typing import List, Dict
from .base_parser import BaseParser
class PDFParser(BaseParser):
"""PDF 解析器"""
def __init__(self, config: Dict = None):
super().__init__(config)
self.use_ocr = config.get('use_ocr', False)
def parse(self, file_path: str) -> str:
"""解析 PDF 文件"""
self.validate_file(file_path)
if self._is_scanned_pdf(file_path):
return self._parse_with_ocr(file_path)
else:
return self._parse_text_pdf(file_path)
def _parse_text_pdf(self, file_path: str) -> str:
"""解析文本型 PDF"""
try:
import pdfplumber
texts = []
with pdfplumber.open(file_path) as pdf:
for page in pdf.pages:
# 提取文本
text = page.extract_text()
if text:
texts.append(text)
# 提取表格
tables = page.extract_tables()
for table in tables:
table_text = self._format_table(table)
texts.append(table_text)
return '\n\n'.join(texts)
except Exception as e:
raise ValueError(f"Failed to parse PDF: {str(e)}")
def _parse_with_ocr(self, file_path: str) -> str:
"""OCR 解析扫描版 PDF"""
try:
import pytesseract
from pdf2image import convert_from_path
images = convert_from_path(file_path)
texts = []
for image in images:
text = pytesseract.image_to_string(image, lang='chi_sim+eng')
texts.append(text)
return '\n\n'.join(texts)
except Exception as e:
raise ValueError(f"OCR failed: {str(e)}")
def _is_scanned_pdf(self, file_path: str) -> bool:
"""判断是否为扫描版 PDF"""
import pdfplumber
with pdfplumber.open(file_path) as pdf:
for page in pdf.pages:
text = page.extract_text()
if text and len(text.strip()) > 10:
return False
return True
def _format_table(self, table: List[List[str]]) -> str:
"""格式化表格"""
lines = []
for row in table:
if any(cell for cell in row):
line = ' | '.join(str(cell) or '' for cell in row)
lines.append(line)
return '\n'.join(lines)
def get_supported_extensions(self) -> List[str]:
return ['.pdf']
3.3 Word 解析器
# parsers/word_parser.py
from typing import Dict
from .base_parser import BaseParser
class WordParser(BaseParser):
"""Word 文档解析器"""
def parse(self, file_path: str) -> str:
"""解析 Word 文档"""
self.validate_file(file_path)
if file_path.endswith('.docx'):
return self._parse_docx(file_path)
elif file_path.endswith('.doc'):
return self._parse_legacy_doc(file_path)
else:
raise ValueError("Unsupported Word format")
def _parse_docx(self, file_path: str) -> str:
"""解析 docx 格式"""
from docx import Document
doc = Document(file_path)
paragraphs = []
# 提取段落
for para in doc.paragraphs:
if para.text.strip():
paragraphs.append(para.text)
# 提取表格
for table in doc.tables:
table_text = self._format_table(table)
paragraphs.append(table_text)
return '\n\n'.join(paragraphs)
def _parse_legacy_doc(self, file_path: str) -> str:
"""解析旧版 doc 格式"""
import subprocess
# 使用 libreoffice 转换
output_path = file_path.replace('.doc', '.docx')
subprocess.run([
'libreoffice', '--headless', '--convert-to', 'docx',
file_path, '--outdir', str(Path(file_path).parent)
], check=True)
return self._parse_docx(output_path)
def _format_table(self, table) -> str:
"""格式化表格"""
lines = []
for row in table.rows:
cells = [cell.text.strip() for cell in row.cells]
if any(cells):
lines.append(' | '.join(cells))
return '\n'.join(lines)
def get_supported_extensions(self) -> List[str]:
return ['.doc', '.docx']
3.4 HTML 解析器
# parsers/html_parser.py
from typing import Dict
from .base_parser import BaseParser
class HTMLParser(BaseParser):
"""HTML 解析器"""
def parse(self, file_path: str) -> str:
"""解析 HTML 文件"""
self.validate_file(file_path)
from bs4 import BeautifulSoup
with open(file_path, 'r', encoding='utf-8') as f:
soup = BeautifulSoup(f, 'html.parser')
# 移除脚本和样式
for script in soup(['script', 'style']):
script.decompose()
# 提取标题
title = soup.title.string if soup.title else ''
# 提取正文
content_parts = []
# 提取主要的内容标签
for tag in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'li', 'td']:
elements = soup.find_all(tag)
for elem in elements:
text = elem.get_text(strip=True)
if text:
content_parts.append(text)
return '\n\n'.join([title] + content_parts) if title else '\n\n'.join(content_parts)
def get_supported_extensions(self) -> List[str]:
return ['.html', '.htm']
四、清洗流水线
4.1 清洗器基类
# cleaners/base_cleaner.py
from abc import ABC, abstractmethod
class BaseCleaner(ABC):
"""清洗器基类"""
def __init__(self, config: dict = None):
self.config = config or {}
@abstractmethod
def clean(self, text: str) -> str:
"""清洗文本"""
pass
@property
def name(self) -> str:
return self.__class__.__name__
4.2 空白清洗器
# cleaners/whitespace_cleaner.py
import re
from .base_cleaner import BaseCleaner
class WhitespaceCleaner(BaseCleaner):
"""空白字符清洗器"""
def clean(self, text: str) -> str:
"""清洗空白字符"""
# 替换多个空格为单个空格
text = re.sub(r' +', ' ', text)
# 移除行首行尾空白
lines = text.split('\n')
lines = [line.strip() for line in lines]
# 移除空行
lines = [line for line in lines if line]
return '\n'.join(lines)
4.3 噪声清洗器
# cleaners/noise_cleaner.py
import re
from .base_cleaner import BaseCleaner
class NoiseCleaner(BaseCleaner):
"""噪声清洗器"""
def clean(self, text: str) -> str:
"""清洗噪声"""
# 移除特殊字符
text = re.sub(r'[^\w\s\u4e00-\u9fff\.\,\!\?\:\;\(\)\[\]\"\'\-\=]', '', text)
# 移除页眉页脚(常见模式)
text = re.sub(r'^\s*第\d+页\s*$', '', text, flags=re.MULTILINE)
text = re.sub(r'^\s*\d{4}-\d{2}-\d{2}\s*$', '', text, flags=re.MULTILINE)
# 移除连续的换行符
text = re.sub(r'\n{3,}', '\n\n', text)
# 移除 URL(可选)
if self.config.get('remove_urls', False):
text = re.sub(r'http[s]?://\S+', '', text)
return text
4.4 编码清洗器
# cleaners/encoding_cleaner.py
import unicodedata
from .base_cleaner import BaseCleaner
class EncodingCleaner(BaseCleaner):
"""编码清洗器"""
def clean(self, text: str) -> str:
"""清洗编码问题"""
# 标准化 Unicode
text = unicodedata.normalize('NFKC', text)
# 替换常见乱码字符
replacements = {
'\ufffd': '', # 替换字符
'\u200b': '', # 零宽空格
'\u200c': '', # 零宽非连接符
'\u200d': '', # 零宽连接符
'\ufeff': '', # BOM
}
for old, new in replacements.items():
text = text.replace(old, new)
# 转换全角字符为半角
text = self._fullwidth_to_halfwidth(text)
return text
def _fullwidth_to_halfwidth(self, text: str) -> str:
"""全角转半角"""
result = []
for char in text:
code = ord(char)
if code == 0x3000: # 全角空格
code = 0x0020
elif 0xFF01 <= code <= 0xFF5E: # 全角字符
code -= 0xFEE0
result.append(chr(code))
return ''.join(result)
五、质量控制
5.1 质量检查器
# quality_checkers/base_checker.py
from abc import ABC, abstractmethod
from typing import Dict
class BaseQualityChecker(ABC):
"""质量检查器基类"""
def __init__(self, config: Dict = None):
self.config = config or {}
self.thresholds = self._get_default_thresholds()
@abstractmethod
def check(self, document: 'Document') -> bool:
"""检查文档质量"""
pass
@property
def name(self) -> str:
return self.__class__.__name__
def _get_default_thresholds(self) -> Dict:
"""获取默认阈值"""
return {}
5.2 内容长度检查器
# quality_checkers/length_checker.py
from .base_checker import BaseQualityChecker
class ContentLengthChecker(BaseQualityChecker):
"""内容长度检查器"""
def _get_default_thresholds(self) -> Dict:
return {
'min_length': 50, # 最小字符数
'max_length': 100000, # 最大字符数
'min_words': 10, # 最小词数
}
def check(self, document: 'Document') -> bool:
"""检查内容长度"""
content = document.content
# 检查字符数
char_count = len(content)
if char_count < self.thresholds['min_length']:
document.error_message = f"Content too short: {char_count} chars"
return False
if char_count > self.thresholds['max_length']:
document.error_message = f"Content too long: {char_count} chars"
return False
# 检查词数
word_count = len(content.split())
if word_count < self.thresholds['min_words']:
document.error_message = f"Too few words: {word_count} words"
return False
return True
5.3 敏感信息检查器
# quality_checkers/sensitive_info_checker.py
import re
from .base_checker import BaseQualityChecker
class SensitiveInfoChecker(BaseQualityChecker):
"""敏感信息检查器"""
def _get_default_thresholds(self) -> Dict:
return {
'check_phone': True,
'check_email': True,
'check_id_card': True,
'check_bank_card': True,
}
def check(self, document: 'Document') -> bool:
"""检查敏感信息"""
content = document.content
sensitive_found = []
# 检查手机号
if self.thresholds.get('check_phone', True):
phones = re.findall(r'1[3-9]\d{9}', content)
if phones:
sensitive_found.append(f"手机号:{len(phones)}个")
# 检查邮箱
if self.thresholds.get('check_email', True):
emails = re.findall(r'[\w\.-]+@[\w\.-]+\.\w+', content)
if emails:
sensitive_found.append(f"邮箱:{len(emails)}个")
# 检查身份证
if self.thresholds.get('check_id_card', True):
id_cards = re.findall(r'\d{17}[\dXx]', content)
if id_cards:
sensitive_found.append(f"身份证:{len(id_cards)}个")
if sensitive_found:
document.error_message = f"包含敏感信息:{', '.join(sensitive_found)}"
document.metadata['sensitive_info'] = sensitive_found
return False
return True
六、批量处理与监控
6.1 批量处理器
# batch_processor.py
from typing import List, Dict
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
class BatchDocumentProcessor:
"""批量文档处理器"""
def __init__(self, pipeline: 'DocumentProcessingPipeline', max_workers: int = 4):
self.pipeline = pipeline
self.max_workers = max_workers
self.stats = {
'total': 0,
'success': 0,
'failed': 0,
'skipped': 0
}
def process_batch(self, file_paths: List[str]) -> List['Document']:
"""批量处理文档"""
self.stats['total'] = len(file_paths)
results = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {
executor.submit(self.pipeline.process, self._create_document(path)): path
for path in file_paths
}
for future in tqdm(as_completed(futures), total=len(futures)):
document = future.result()
results.append(document)
if document.status == 'completed':
self.stats['success'] += 1
elif document.status == 'failed':
self.stats['failed'] += 1
else:
self.stats['skipped'] += 1
return results
def _create_document(self, file_path: str) -> 'Document':
"""创建文档对象"""
from datetime import datetime
return Document(
id='',
source_path=file_path,
file_type='',
file_size=0,
content='',
metadata={},
created_at=datetime.now().isoformat(),
updated_at=datetime.now().isoformat()
)
def get_stats(self) -> Dict:
"""获取统计信息"""
return self.stats
def generate_report(self) -> str:
"""生成处理报告"""
report = f"""
=== 文档处理报告 ===
总文档数:{self.stats['total']}
成功:{self.stats['success']} ({self.stats['success']/self.stats['total']*100:.1f}%)
失败:{self.stats['failed']} ({self.stats['failed']/self.stats['total']*100:.1f}%)
跳过:{self.stats['skipped']} ({self.stats['skipped']/self.stats['total']*100:.1f}%)
"""
return report
6.2 监控仪表板
# monitoring_dashboard.py
from typing import Dict, List
from datetime import datetime
class ProcessingMonitor:
"""处理监控器"""
def __init__(self):
self.metrics_history: List[Dict] = []
def record_metrics(self, metrics: Dict):
"""记录指标"""
metrics['timestamp'] = datetime.now().isoformat()
self.metrics_history.append(metrics)
def get_throughput(self, hours: int = 1) -> float:
"""获取吞吐量(文档/小时)"""
from datetime import timedelta
cutoff = datetime.now() - timedelta(hours=hours)
recent = [
m for m in self.metrics_history
if datetime.fromisoformat(m['timestamp']) >= cutoff
]
if not recent:
return 0.0
total_docs = sum(m.get('processed_count', 0) for m in recent)
return total_docs / hours
def get_error_rate(self, hours: int = 1) -> float:
"""获取错误率"""
from datetime import timedelta
cutoff = datetime.now() - timedelta(hours=hours)
recent = [
m for m in self.metrics_history
if datetime.fromisoformat(m['timestamp']) >= cutoff
]
if not recent:
return 0.0
total = sum(m.get('processed_count', 0) for m in recent)
errors = sum(m.get('error_count', 0) for m in recent)
return errors / total if total > 0 else 0.0
def get_quality_report(self) -> Dict:
"""获取质量报告"""
if not self.metrics_history:
return {}
latest = self.metrics_history[-1]
return {
'throughput': self.get_throughput(),
'error_rate': self.get_error_rate(),
'total_processed': sum(m.get('processed_count', 0) for m in self.metrics_history),
'avg_processing_time': latest.get('avg_processing_time', 0),
'quality_score': latest.get('quality_score', 0)
}
七、实战案例
7.1 企业知识库文档处理
处理流程:
# enterprise_knowledge_base.py
from document_pipeline import DocumentProcessingPipeline, Document
from batch_processor import BatchDocumentProcessor
# 配置
config = {
'parsers': {
'pdf': {'use_ocr': True},
'word': {},
'html': {}
},
'cleaners': {
'whitespace': {},
'noise': {'remove_urls': True},
'encoding': {}
},
'quality_checkers': {
'length': {'min_length': 100},
'sensitive_info': {
'check_phone': True,
'check_email': True
}
}
}
# 创建处理管道
pipeline = DocumentProcessingPipeline(config)
# 批量处理
processor = BatchDocumentProcessor(pipeline, max_workers=8)
# 处理所有文档
file_paths = [
'docs/manual.pdf',
'docs/guide.docx',
'docs/faq.html',
# ... 更多文档
]
results = processor.process_batch(file_paths)
# 生成报告
print(processor.generate_report())
# 保存处理结果
for doc in results:
if doc.status == 'completed':
save_to_vector_db(doc)
else:
log_error(doc)
7.2 性能优化
优化策略:
# optimization_tips.py
# 1. 并行处理
# 使用 ThreadPoolExecutor 或 ProcessPoolExecutor
# 根据 CPU 和 IO 密集型选择合适的并行方式
# 2. 流式处理
# 对于大文件,使用流式读取而非一次性加载
def stream_parse_large_pdf(file_path: str):
import pdfplumber
with pdfplumber.open(file_path) as pdf:
for page in pdf.pages:
yield page.extract_text()
# 3. 缓存机制
# 缓存已处理的文档,避免重复处理
from functools import lru_cache
@lru_cache(maxsize=1000)
def get_document_hash(file_path: str) -> str:
# 计算文件哈希
pass
# 4. 增量处理
# 只处理新增或变更的文档
def incremental_process(new_files: List[str]):
# 对比文件哈希,只处理新文件
pass
八、总结
8.1 核心要点
-
架构设计
- 模块化设计,易于扩展
- 管道模式,清晰的处理流程
- 异常处理,保证系统稳定性
-
格式支持
- 支持主流文档格式
- 统一的解析器接口
- 易于添加新格式
-
质量控制
- 多层清洗流水线
- 自动化质量检查
- 敏感信息过滤
-
性能优化
- 批量并行处理
- 流式处理大文件
- 缓存和增量处理
8.2 工具推荐
| 工具 | 用途 | 安装 |
|---|---|---|
| pdfplumber | PDF 解析 | pip install pdfplumber |
| python-docx | Word 解析 | pip install python-docx |
| beautifulsoup4 | HTML 解析 | pip install beautifulsoup4 |
| pytesseract | OCR | pip install pytesseract |
参考资料