Skip to content
清晨的一缕阳光
返回

RAG 文档处理工程化实战

RAG 文档处理工程化实战

文档处理是 RAG(Retrieval-Augmented Generation)系统的基础环节。如何高效处理多格式文档?如何保证文档质量?本文将详解 RAG 文档处理的完整工程化方案。

一、文档处理挑战

1.1 现实痛点

文档处理痛点:
┌─────────────────────────────────────┐
│ 1. 格式多样                           │
│    - PDF、Word、Excel、PPT           │
│    - HTML、Markdown、纯文本          │
│    - 图片、扫描件                    │
├─────────────────────────────────────┤
│ 2. 质量参差不齐                       │
│    - 格式混乱、排版错误              │
│    - 噪声数据、无关内容              │
│    - 缺失信息、损坏文件              │
├─────────────────────────────────────┤
│ 3. 处理复杂度高                       │
│    - 表格解析困难                    │
│    - 公式识别复杂                    │
│    - 多语言混合                      │
├─────────────────────────────────────┤
│ 4. 规模化困难                         │
│    - 大批量处理效率低                │
│    - 资源消耗大                      │
│    - 错误处理复杂                    │
└─────────────────────────────────────┘

1.2 工程化目标

二、文档处理架构

2.1 整体架构设计

graph LR
    A[原始文档] --> B[文档接入层]
    B --> C[格式检测]
    C --> D[格式解析器]
    D --> E[文本提取]
    E --> F[清洗流水线]
    F --> G[质量检查]
    G --> H{质量合格?}
    H -->|是 | I[文档存储]
    H -->|否 | J[异常处理]
    I --> K[元数据索引]
    J --> L[人工审核]

2.2 核心组件

文档处理管道

# document_pipeline.py
from typing import List, Dict, Optional
from dataclasses import dataclass
from enum import Enum
import hashlib
from pathlib import Path

class DocumentStatus(Enum):
    """文档状态"""
    PENDING = "pending"
    PROCESSING = "processing"
    COMPLETED = "completed"
    FAILED = "failed"
    SKIPPED = "skipped"

@dataclass
class Document:
    """文档对象"""
    id: str
    source_path: str
    file_type: str
    file_size: int
    content: str
    metadata: Dict
    status: DocumentStatus = DocumentStatus.PENDING
    error_message: Optional[str] = None
    created_at: str = ""
    updated_at: str = ""

class DocumentProcessingPipeline:
    """文档处理管道"""
    
    def __init__(self, config: Dict):
        self.config = config
        self.parsers = self._initialize_parsers()
        self.cleaners = self._initialize_cleaners()
        self.quality_checkers = self._initialize_quality_checkers()
    
    def _initialize_parsers(self) -> Dict[str, 'BaseParser']:
        """初始化解析器"""
        from parsers import PDFParser, WordParser, ExcelParser, HTMLParser
        
        return {
            '.pdf': PDFParser(),
            '.doc': WordParser(),
            '.docx': WordParser(),
            '.xls': ExcelParser(),
            '.xlsx': ExcelParser(),
            '.html': HTMLParser(),
            '.htm': HTMLParser(),
            '.md': MarkdownParser(),
            '.txt': TextParser()
        }
    
    def _initialize_cleaners(self) -> List['BaseCleaner']:
        """初始化清洗器"""
        return [
            WhitespaceCleaner(),
            NoiseCleaner(),
            FormatCleaner(),
            EncodingCleaner()
        ]
    
    def _initialize_quality_checkers(self) -> List['BaseQualityChecker']:
        """初始化质量检查器"""
        return [
            ContentLengthChecker(),
            EncodingChecker(),
            SensitiveInfoChecker()
        ]
    
    def process(self, document: Document) -> Document:
        """处理文档"""
        try:
            document.status = DocumentStatus.PROCESSING
            
            # 1. 格式检测
            file_type = self._detect_file_type(document.source_path)
            document.file_type = file_type
            
            # 2. 文本提取
            parser = self.parsers.get(file_type.lower())
            if not parser:
                raise ValueError(f"Unsupported file type: {file_type}")
            
            content = parser.parse(document.source_path)
            document.content = content
            
            # 3. 文档 ID 生成
            document.id = self._generate_document_id(document)
            
            # 4. 清洗处理
            for cleaner in self.cleaners:
                document.content = cleaner.clean(document.content)
            
            # 5. 质量检查
            for checker in self.quality_checkers:
                if not checker.check(document):
                    document.status = DocumentStatus.FAILED
                    document.error_message = f"Quality check failed: {checker.name}"
                    return document
            
            # 6. 元数据提取
            document.metadata = self._extract_metadata(document)
            
            # 7. 完成处理
            document.status = DocumentStatus.COMPLETED
            
        except Exception as e:
            document.status = DocumentStatus.FAILED
            document.error_message = str(e)
        
        return document
    
    def _detect_file_type(self, file_path: str) -> str:
        """检测文件类型"""
        from pathlib import Path
        return Path(file_path).suffix.lower()
    
    def _generate_document_id(self, document: Document) -> str:
        """生成文档 ID"""
        content_hash = hashlib.md5(
            document.content.encode('utf-8')
        ).hexdigest()
        return f"doc_{content_hash[:16]}"
    
    def _extract_metadata(self, document: Document) -> Dict:
        """提取元数据"""
        from pathlib import Path
        
        file_path = Path(document.source_path)
        
        return {
            'filename': file_path.name,
            'file_size': file_path.stat().st_size,
            'file_type': document.file_type,
            'created_at': document.created_at,
            'processed_at': document.updated_at,
            'content_length': len(document.content),
            'word_count': len(document.content.split()),
            'source_path': document.source_path
        }

三、多格式解析器

3.1 解析器基类

# parsers/base_parser.py
from abc import ABC, abstractmethod
from typing import Dict, Any

class BaseParser(ABC):
    """解析器基类"""
    
    def __init__(self, config: Dict = None):
        self.config = config or {}
    
    @abstractmethod
    def parse(self, file_path: str) -> str:
        """解析文件,返回文本内容"""
        pass
    
    @abstractmethod
    def get_supported_extensions(self) -> List[str]:
        """获取支持的扩展名"""
        pass
    
    def validate_file(self, file_path: str) -> bool:
        """验证文件"""
        import os
        if not os.path.exists(file_path):
            raise FileNotFoundError(f"File not found: {file_path}")
        
        if os.path.getsize(file_path) == 0:
            raise ValueError("Empty file")
        
        return True

3.2 PDF 解析器

# parsers/pdf_parser.py
from typing import List, Dict
from .base_parser import BaseParser

class PDFParser(BaseParser):
    """PDF 解析器"""
    
    def __init__(self, config: Dict = None):
        super().__init__(config)
        self.use_ocr = config.get('use_ocr', False)
    
    def parse(self, file_path: str) -> str:
        """解析 PDF 文件"""
        self.validate_file(file_path)
        
        if self._is_scanned_pdf(file_path):
            return self._parse_with_ocr(file_path)
        else:
            return self._parse_text_pdf(file_path)
    
    def _parse_text_pdf(self, file_path: str) -> str:
        """解析文本型 PDF"""
        try:
            import pdfplumber
            
            texts = []
            with pdfplumber.open(file_path) as pdf:
                for page in pdf.pages:
                    # 提取文本
                    text = page.extract_text()
                    if text:
                        texts.append(text)
                    
                    # 提取表格
                    tables = page.extract_tables()
                    for table in tables:
                        table_text = self._format_table(table)
                        texts.append(table_text)
            
            return '\n\n'.join(texts)
        
        except Exception as e:
            raise ValueError(f"Failed to parse PDF: {str(e)}")
    
    def _parse_with_ocr(self, file_path: str) -> str:
        """OCR 解析扫描版 PDF"""
        try:
            import pytesseract
            from pdf2image import convert_from_path
            
            images = convert_from_path(file_path)
            texts = []
            
            for image in images:
                text = pytesseract.image_to_string(image, lang='chi_sim+eng')
                texts.append(text)
            
            return '\n\n'.join(texts)
        
        except Exception as e:
            raise ValueError(f"OCR failed: {str(e)}")
    
    def _is_scanned_pdf(self, file_path: str) -> bool:
        """判断是否为扫描版 PDF"""
        import pdfplumber
        
        with pdfplumber.open(file_path) as pdf:
            for page in pdf.pages:
                text = page.extract_text()
                if text and len(text.strip()) > 10:
                    return False
        return True
    
    def _format_table(self, table: List[List[str]]) -> str:
        """格式化表格"""
        lines = []
        for row in table:
            if any(cell for cell in row):
                line = ' | '.join(str(cell) or '' for cell in row)
                lines.append(line)
        return '\n'.join(lines)
    
    def get_supported_extensions(self) -> List[str]:
        return ['.pdf']

3.3 Word 解析器

# parsers/word_parser.py
from typing import Dict
from .base_parser import BaseParser

class WordParser(BaseParser):
    """Word 文档解析器"""
    
    def parse(self, file_path: str) -> str:
        """解析 Word 文档"""
        self.validate_file(file_path)
        
        if file_path.endswith('.docx'):
            return self._parse_docx(file_path)
        elif file_path.endswith('.doc'):
            return self._parse_legacy_doc(file_path)
        else:
            raise ValueError("Unsupported Word format")
    
    def _parse_docx(self, file_path: str) -> str:
        """解析 docx 格式"""
        from docx import Document
        
        doc = Document(file_path)
        paragraphs = []
        
        # 提取段落
        for para in doc.paragraphs:
            if para.text.strip():
                paragraphs.append(para.text)
        
        # 提取表格
        for table in doc.tables:
            table_text = self._format_table(table)
            paragraphs.append(table_text)
        
        return '\n\n'.join(paragraphs)
    
    def _parse_legacy_doc(self, file_path: str) -> str:
        """解析旧版 doc 格式"""
        import subprocess
        
        # 使用 libreoffice 转换
        output_path = file_path.replace('.doc', '.docx')
        subprocess.run([
            'libreoffice', '--headless', '--convert-to', 'docx',
            file_path, '--outdir', str(Path(file_path).parent)
        ], check=True)
        
        return self._parse_docx(output_path)
    
    def _format_table(self, table) -> str:
        """格式化表格"""
        lines = []
        for row in table.rows:
            cells = [cell.text.strip() for cell in row.cells]
            if any(cells):
                lines.append(' | '.join(cells))
        return '\n'.join(lines)
    
    def get_supported_extensions(self) -> List[str]:
        return ['.doc', '.docx']

3.4 HTML 解析器

# parsers/html_parser.py
from typing import Dict
from .base_parser import BaseParser

class HTMLParser(BaseParser):
    """HTML 解析器"""
    
    def parse(self, file_path: str) -> str:
        """解析 HTML 文件"""
        self.validate_file(file_path)
        
        from bs4 import BeautifulSoup
        
        with open(file_path, 'r', encoding='utf-8') as f:
            soup = BeautifulSoup(f, 'html.parser')
        
        # 移除脚本和样式
        for script in soup(['script', 'style']):
            script.decompose()
        
        # 提取标题
        title = soup.title.string if soup.title else ''
        
        # 提取正文
        content_parts = []
        
        # 提取主要的内容标签
        for tag in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p', 'li', 'td']:
            elements = soup.find_all(tag)
            for elem in elements:
                text = elem.get_text(strip=True)
                if text:
                    content_parts.append(text)
        
        return '\n\n'.join([title] + content_parts) if title else '\n\n'.join(content_parts)
    
    def get_supported_extensions(self) -> List[str]:
        return ['.html', '.htm']

四、清洗流水线

4.1 清洗器基类

# cleaners/base_cleaner.py
from abc import ABC, abstractmethod

class BaseCleaner(ABC):
    """清洗器基类"""
    
    def __init__(self, config: dict = None):
        self.config = config or {}
    
    @abstractmethod
    def clean(self, text: str) -> str:
        """清洗文本"""
        pass
    
    @property
    def name(self) -> str:
        return self.__class__.__name__

4.2 空白清洗器

# cleaners/whitespace_cleaner.py
import re
from .base_cleaner import BaseCleaner

class WhitespaceCleaner(BaseCleaner):
    """空白字符清洗器"""
    
    def clean(self, text: str) -> str:
        """清洗空白字符"""
        # 替换多个空格为单个空格
        text = re.sub(r' +', ' ', text)
        
        # 移除行首行尾空白
        lines = text.split('\n')
        lines = [line.strip() for line in lines]
        
        # 移除空行
        lines = [line for line in lines if line]
        
        return '\n'.join(lines)

4.3 噪声清洗器

# cleaners/noise_cleaner.py
import re
from .base_cleaner import BaseCleaner

class NoiseCleaner(BaseCleaner):
    """噪声清洗器"""
    
    def clean(self, text: str) -> str:
        """清洗噪声"""
        # 移除特殊字符
        text = re.sub(r'[^\w\s\u4e00-\u9fff\.\,\!\?\:\;\(\)\[\]\"\'\-\=]', '', text)
        
        # 移除页眉页脚(常见模式)
        text = re.sub(r'^\s*\d+\s*$', '', text, flags=re.MULTILINE)
        text = re.sub(r'^\s*\d{4}-\d{2}-\d{2}\s*$', '', text, flags=re.MULTILINE)
        
        # 移除连续的换行符
        text = re.sub(r'\n{3,}', '\n\n', text)
        
        # 移除 URL(可选)
        if self.config.get('remove_urls', False):
            text = re.sub(r'http[s]?://\S+', '', text)
        
        return text

4.4 编码清洗器

# cleaners/encoding_cleaner.py
import unicodedata
from .base_cleaner import BaseCleaner

class EncodingCleaner(BaseCleaner):
    """编码清洗器"""
    
    def clean(self, text: str) -> str:
        """清洗编码问题"""
        # 标准化 Unicode
        text = unicodedata.normalize('NFKC', text)
        
        # 替换常见乱码字符
        replacements = {
            '\ufffd': '',  # 替换字符
            '\u200b': '',  # 零宽空格
            '\u200c': '',  # 零宽非连接符
            '\u200d': '',  # 零宽连接符
            '\ufeff': '',  # BOM
        }
        
        for old, new in replacements.items():
            text = text.replace(old, new)
        
        # 转换全角字符为半角
        text = self._fullwidth_to_halfwidth(text)
        
        return text
    
    def _fullwidth_to_halfwidth(self, text: str) -> str:
        """全角转半角"""
        result = []
        for char in text:
            code = ord(char)
            if code == 0x3000:  # 全角空格
                code = 0x0020
            elif 0xFF01 <= code <= 0xFF5E:  # 全角字符
                code -= 0xFEE0
            result.append(chr(code))
        return ''.join(result)

五、质量控制

5.1 质量检查器

# quality_checkers/base_checker.py
from abc import ABC, abstractmethod
from typing import Dict

class BaseQualityChecker(ABC):
    """质量检查器基类"""
    
    def __init__(self, config: Dict = None):
        self.config = config or {}
        self.thresholds = self._get_default_thresholds()
    
    @abstractmethod
    def check(self, document: 'Document') -> bool:
        """检查文档质量"""
        pass
    
    @property
    def name(self) -> str:
        return self.__class__.__name__
    
    def _get_default_thresholds(self) -> Dict:
        """获取默认阈值"""
        return {}

5.2 内容长度检查器

# quality_checkers/length_checker.py
from .base_checker import BaseQualityChecker

class ContentLengthChecker(BaseQualityChecker):
    """内容长度检查器"""
    
    def _get_default_thresholds(self) -> Dict:
        return {
            'min_length': 50,      # 最小字符数
            'max_length': 100000,  # 最大字符数
            'min_words': 10,       # 最小词数
        }
    
    def check(self, document: 'Document') -> bool:
        """检查内容长度"""
        content = document.content
        
        # 检查字符数
        char_count = len(content)
        if char_count < self.thresholds['min_length']:
            document.error_message = f"Content too short: {char_count} chars"
            return False
        
        if char_count > self.thresholds['max_length']:
            document.error_message = f"Content too long: {char_count} chars"
            return False
        
        # 检查词数
        word_count = len(content.split())
        if word_count < self.thresholds['min_words']:
            document.error_message = f"Too few words: {word_count} words"
            return False
        
        return True

5.3 敏感信息检查器

# quality_checkers/sensitive_info_checker.py
import re
from .base_checker import BaseQualityChecker

class SensitiveInfoChecker(BaseQualityChecker):
    """敏感信息检查器"""
    
    def _get_default_thresholds(self) -> Dict:
        return {
            'check_phone': True,
            'check_email': True,
            'check_id_card': True,
            'check_bank_card': True,
        }
    
    def check(self, document: 'Document') -> bool:
        """检查敏感信息"""
        content = document.content
        sensitive_found = []
        
        # 检查手机号
        if self.thresholds.get('check_phone', True):
            phones = re.findall(r'1[3-9]\d{9}', content)
            if phones:
                sensitive_found.append(f"手机号:{len(phones)}个")
        
        # 检查邮箱
        if self.thresholds.get('check_email', True):
            emails = re.findall(r'[\w\.-]+@[\w\.-]+\.\w+', content)
            if emails:
                sensitive_found.append(f"邮箱:{len(emails)}个")
        
        # 检查身份证
        if self.thresholds.get('check_id_card', True):
            id_cards = re.findall(r'\d{17}[\dXx]', content)
            if id_cards:
                sensitive_found.append(f"身份证:{len(id_cards)}个")
        
        if sensitive_found:
            document.error_message = f"包含敏感信息:{', '.join(sensitive_found)}"
            document.metadata['sensitive_info'] = sensitive_found
            return False
        
        return True

六、批量处理与监控

6.1 批量处理器

# batch_processor.py
from typing import List, Dict
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm

class BatchDocumentProcessor:
    """批量文档处理器"""
    
    def __init__(self, pipeline: 'DocumentProcessingPipeline', max_workers: int = 4):
        self.pipeline = pipeline
        self.max_workers = max_workers
        self.stats = {
            'total': 0,
            'success': 0,
            'failed': 0,
            'skipped': 0
        }
    
    def process_batch(self, file_paths: List[str]) -> List['Document']:
        """批量处理文档"""
        self.stats['total'] = len(file_paths)
        results = []
        
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = {
                executor.submit(self.pipeline.process, self._create_document(path)): path
                for path in file_paths
            }
            
            for future in tqdm(as_completed(futures), total=len(futures)):
                document = future.result()
                results.append(document)
                
                if document.status == 'completed':
                    self.stats['success'] += 1
                elif document.status == 'failed':
                    self.stats['failed'] += 1
                else:
                    self.stats['skipped'] += 1
        
        return results
    
    def _create_document(self, file_path: str) -> 'Document':
        """创建文档对象"""
        from datetime import datetime
        
        return Document(
            id='',
            source_path=file_path,
            file_type='',
            file_size=0,
            content='',
            metadata={},
            created_at=datetime.now().isoformat(),
            updated_at=datetime.now().isoformat()
        )
    
    def get_stats(self) -> Dict:
        """获取统计信息"""
        return self.stats
    
    def generate_report(self) -> str:
        """生成处理报告"""
        report = f"""
=== 文档处理报告 ===

总文档数:{self.stats['total']}
成功:{self.stats['success']} ({self.stats['success']/self.stats['total']*100:.1f}%)
失败:{self.stats['failed']} ({self.stats['failed']/self.stats['total']*100:.1f}%)
跳过:{self.stats['skipped']} ({self.stats['skipped']/self.stats['total']*100:.1f}%)
        """
        return report

6.2 监控仪表板

# monitoring_dashboard.py
from typing import Dict, List
from datetime import datetime

class ProcessingMonitor:
    """处理监控器"""
    
    def __init__(self):
        self.metrics_history: List[Dict] = []
    
    def record_metrics(self, metrics: Dict):
        """记录指标"""
        metrics['timestamp'] = datetime.now().isoformat()
        self.metrics_history.append(metrics)
    
    def get_throughput(self, hours: int = 1) -> float:
        """获取吞吐量(文档/小时)"""
        from datetime import timedelta
        
        cutoff = datetime.now() - timedelta(hours=hours)
        recent = [
            m for m in self.metrics_history
            if datetime.fromisoformat(m['timestamp']) >= cutoff
        ]
        
        if not recent:
            return 0.0
        
        total_docs = sum(m.get('processed_count', 0) for m in recent)
        return total_docs / hours
    
    def get_error_rate(self, hours: int = 1) -> float:
        """获取错误率"""
        from datetime import timedelta
        
        cutoff = datetime.now() - timedelta(hours=hours)
        recent = [
            m for m in self.metrics_history
            if datetime.fromisoformat(m['timestamp']) >= cutoff
        ]
        
        if not recent:
            return 0.0
        
        total = sum(m.get('processed_count', 0) for m in recent)
        errors = sum(m.get('error_count', 0) for m in recent)
        
        return errors / total if total > 0 else 0.0
    
    def get_quality_report(self) -> Dict:
        """获取质量报告"""
        if not self.metrics_history:
            return {}
        
        latest = self.metrics_history[-1]
        
        return {
            'throughput': self.get_throughput(),
            'error_rate': self.get_error_rate(),
            'total_processed': sum(m.get('processed_count', 0) for m in self.metrics_history),
            'avg_processing_time': latest.get('avg_processing_time', 0),
            'quality_score': latest.get('quality_score', 0)
        }

七、实战案例

7.1 企业知识库文档处理

处理流程

# enterprise_knowledge_base.py
from document_pipeline import DocumentProcessingPipeline, Document
from batch_processor import BatchDocumentProcessor

# 配置
config = {
    'parsers': {
        'pdf': {'use_ocr': True},
        'word': {},
        'html': {}
    },
    'cleaners': {
        'whitespace': {},
        'noise': {'remove_urls': True},
        'encoding': {}
    },
    'quality_checkers': {
        'length': {'min_length': 100},
        'sensitive_info': {
            'check_phone': True,
            'check_email': True
        }
    }
}

# 创建处理管道
pipeline = DocumentProcessingPipeline(config)

# 批量处理
processor = BatchDocumentProcessor(pipeline, max_workers=8)

# 处理所有文档
file_paths = [
    'docs/manual.pdf',
    'docs/guide.docx',
    'docs/faq.html',
    # ... 更多文档
]

results = processor.process_batch(file_paths)

# 生成报告
print(processor.generate_report())

# 保存处理结果
for doc in results:
    if doc.status == 'completed':
        save_to_vector_db(doc)
    else:
        log_error(doc)

7.2 性能优化

优化策略

# optimization_tips.py

# 1. 并行处理
# 使用 ThreadPoolExecutor 或 ProcessPoolExecutor
# 根据 CPU 和 IO 密集型选择合适的并行方式

# 2. 流式处理
# 对于大文件,使用流式读取而非一次性加载
def stream_parse_large_pdf(file_path: str):
    import pdfplumber
    with pdfplumber.open(file_path) as pdf:
        for page in pdf.pages:
            yield page.extract_text()

# 3. 缓存机制
# 缓存已处理的文档,避免重复处理
from functools import lru_cache

@lru_cache(maxsize=1000)
def get_document_hash(file_path: str) -> str:
    # 计算文件哈希
    pass

# 4. 增量处理
# 只处理新增或变更的文档
def incremental_process(new_files: List[str]):
    # 对比文件哈希,只处理新文件
    pass

八、总结

8.1 核心要点

  1. 架构设计

    • 模块化设计,易于扩展
    • 管道模式,清晰的处理流程
    • 异常处理,保证系统稳定性
  2. 格式支持

    • 支持主流文档格式
    • 统一的解析器接口
    • 易于添加新格式
  3. 质量控制

    • 多层清洗流水线
    • 自动化质量检查
    • 敏感信息过滤
  4. 性能优化

    • 批量并行处理
    • 流式处理大文件
    • 缓存和增量处理

8.2 工具推荐

工具用途安装
pdfplumberPDF 解析pip install pdfplumber
python-docxWord 解析pip install python-docx
beautifulsoup4HTML 解析pip install beautifulsoup4
pytesseractOCRpip install pytesseract

参考资料


分享这篇文章到:

上一篇文章
软件开发管理:如何提升大家绩效?
下一篇文章
Redis Set 数据类型详解