Functional Specification: Intelligent Document Ingestion Pipeline
Version: 2.0 Date: October 17, 2025 Last Updated: November 1, 2025 Status: M46 Implementation Ready
1.0 Executive Summary
This document outlines the design for ChainAlign's Intelligent Document Ingestion Pipeline. This comprehensive, quality-gated pipeline intelligently routes documents based on type, complexity, and content structure, with support for both tenant-specific and general knowledge base ingestion. The primary goal is to transform raw, diverse documents into structured, high-quality data ready for AI processing, knowledge graph construction, and vector storage.
Problem Statement:
- Handling multiple document types (PDF, TXT, MD, CSV, structured data) with varying parsing requirements.
- Addressing different PDF complexities (text-only, forms, tables, images).
- Managing complex financial documents with tables spanning multiple pages.
- Ensuring robust quality control and intelligent routing.
- Supporting both tenant-specific and general knowledge base ingestion.
- Establishing a unified pipeline for consistent quality assessment.
2.0 Architecture Overview
The pipeline begins with a Document Upload Endpoint (backend: ingestionRoutes.js). All incoming documents are first processed by a new Document Intelligence Service, which performs initial analysis, validation, and routing decisions. Documents are then directed to specialized parsing services based on their type and complexity. After parsing, a Content Quality Validator assesses the extracted content. Finally, the validated content proceeds to Embedding Generation (using Google Gemini), Cognee Knowledge Graph Construction, and Vector Storage (using pgvector).
3.0 Core Components
3.1 Document Intelligence Service (NEW)
- Location:
backend/src/services/DocumentIntelligenceService.js - Responsibilities:
- Pre-process all incoming documents.
- Detect document type and complexity.
- Make routing decisions for subsequent parsing.
- Track ingestion metadata.
- Key Methods:
class DocumentIntelligenceService {
/**
* Main entry point for document analysis
*/
async analyzeDocument(fileBuffer, filename, mimetype, tenantId, sourceType) {
// 1. Basic validation
// 2. File type detection
// 3. Document complexity assessment
// 4. Route decision
// 5. Return analysis result
}
/**
* Detect if PDF contains tables, forms, or just text
*/
async detectPDFComplexity(fileBuffer) {
// Use PyPDF2 for quick scan
// Count pages
// Detect forms (AcroForm fields)
// Detect tables (via heuristics or ML model)
// Detect images
return {
type: 'text' | 'form' | 'table' | 'image' | 'mixed',
confidence: 0.95,
pageCount: 10,
hasImages: false,
hasTables: true,
hasforms: false
};
}
/**
* Decide which parser to use
*/
async routeToParser(documentAnalysis) {
// Decision logic based on analysis
}
}
3.2 PDF Parser Service (NEW)
- Location:
python-services/pdf-parser-service/ - Purpose: Specialized service for PDF parsing with multiple strategies and robust fallback mechanisms. It extracts text, forms, and tables from PDFs.
- Components:
- PDF Complexity Analyzer:
class PDFComplexityAnalyzer:
"""
Analyzes PDF structure to determine parsing strategy.
This component informs the PDF Parser Service which specific parsers (e.g., text, form, table)
and strategies (e.g., Camelot vs. Document AI) to prioritize.
"""
def analyze(self, file_content: bytes) -> Dict:
"""
Returns:
{
'type': 'text' | 'form' | 'table' | 'image' | 'mixed',
'confidence': 0.95,
'page_count': 10,
'has_images': False,
'has_tables': True,
'has_forms': False,
'table_pages': [2, 3, 5],
'text_density': 0.85,
'ocr_confidence': 1.0,
'parsing_successful': True,
'boilerplate_percentage': 0.0,
'cleaned_text_length': 0,
}
"""
pass - Text-Only Parser: For simple text-based PDFs using PyPDF2 or pdfplumber.
- Form Parser: For PDFs with forms using Google Document AI Form Parser.
- Table Parser: For PDFs with tables using specialized extraction tools like Camelot, Tabula, Document AI, or PaddleOCR. Includes a
merge_split_tablesmethod for multi-page tables. - Multi-Page Table Handler: Detects and merges tables split across pages using column header matching and structure analysis.
- PDF Complexity Analyzer:
3.2.1 Fallback Parsing Strategy
The PDF Parser Service implements a multi-stage fallback strategy to ensure maximum extraction success and quality:
- Initial Attempt (Optimized/Cost-Effective):
- For
text-onlyPDFs: PyPDF2 or pdfplumber. - For
tablePDFs: Camelot (for simple, rule-based tables) then Tabula (for more complex, structure-based tables). - For
formPDFs: Direct to Google Document AI Form Parser.
- For
- Fallback (Advanced/Costlier):
- If an initial attempt fails, returns low confidence, or yields incomplete results, the service will automatically fall back to Google Document AI (Table or Form Processor, as appropriate) for a more robust extraction.
- Error Handling: Each parsing attempt will log its success/failure, confidence score, and any errors. If all strategies fail, a comprehensive error report is generated, and the document is flagged for manual review or re-processing.
3.3 Pipeline Orchestrator
- Location:
python-services//pipeline_orchestrator.py - Purpose: Orchestrates the entire ingestion pipeline, from document intelligence analysis to parsing, quality validation, chunking, embedding, and storage.
- Key Method:
class IngestionPipelineOrchestrator:
"""
Orchestrates the entire ingestion pipeline
"""
def __init__(self):
self.document_intelligence = DocumentIntelligenceService()
self.pdf_parser = PDFParserService()
self.text_parser = TextParserService()
self.data_processor = DataProcessorService() # M33 path
self.quality_validator = ContentQualityValidator()
self.chunking_service = ChunkingService() # NEW
self.embedding_service = EmbeddingService()
self.cognee_service = CogneeService()
self.vector_store = VectorStoreService()
async def process_document(
self,
file: UploadFile,
tenant_id: str,
source_type: str,
source_params: Dict
) -> Dict:
"""
Main pipeline execution
"""
# 1. Document Intelligence Analysis (determines parsing strategy)
analysis = await self.document_intelligence.analyze(
file.file.read(),
file.filename,
file.content_type,
tenant_id,
source_type
)
# 2. Route to appropriate parser based on analysis
parsed_content = None
if analysis['category'] == 'structured_data':
# Use M33 Data Entry Intelligence Service
parsed_content = await self.data_processor.process(file, tenant_id)
elif analysis['document_type'] == 'pdf':
# PDF Parser Service handles internal fallback logic (Camelot -> Tabula -> Doc AI)
parsed_content = await self.pdf_parser.parse(
file.file.read(), # Pass file content directly
analysis # Pass full analysis for context
)
elif analysis['document_type'] in ['text', 'markdown']:
parsed_content = await self.text_parser.parse(file)
if not parsed_content:
raise Exception("Document parsing failed or returned empty content.")
# 3. Quality Validation
quality_score = await self.quality_validator.validate(
parsed_content,
analysis
)
if quality_score['overall_score'] < 0.7:
# Flag for manual review or retry with different parser
return {
'status': 'needs_review',
'quality_score': quality_score,
'reason': 'Low extraction quality'
}
# 4. Chunking
chunks = await self.chunking_service.split_document(
parsed_content.get('content_blocks', []), # Assuming parsed_content returns content_blocks
analysis.get('chunking_strategy', 'semantic')
)
# 5. Embedding & Storage
# This step will now operate on the generated chunks
# (e.g., loop through chunks, generate embeddings, store in vector DB)
# For now, placeholder for integration with external services
# await self.embedding_service.generate_and_store(chunks, tenant_id, document_id)
# await self.cognee_service.build_graph(chunks, tenant_id, document_id)
# await self.vector_store.index_chunks(chunks, tenant_id, document_id)
return {
'status': 'success',
'document_id': doc_id, # Assuming doc_id is determined earlier
'quality_score': quality_score,
'chunks_created': len(chunks)
}
3.4 Content Quality Validator
- Location:
python-services//quality_validator.py - Purpose: Validates the quality of extracted content (text, tables) and provides a confidence score based on defined quality dimensions and a weighted scoring model. This component replaces basic validation checks and integrates results from services like
languagetool-serviceinto its comprehensive scoring. - Quality Dimensions:
- Structural Integrity (Weighted 0.5):
- OCR Confidence: For scanned documents, confidence score from the OCR engine (e.g., Google Cloud Vision). Low confidence means garbled text.
- Formatting & Parsing: Ability to extract clean text blocks without mixing up headers, footers, and main content.
- File Health: Detects corruption or empty documents.
- Content Clarity & Coherence (Weighted 0.3):
- Signal-to-Noise Ratio: Measures valuable content versus boilerplate (e.g., navigation links, "Terms of Service," page numbers, legal disclaimers).
- Readability Score: A metric like the Flesch-Kincaid score to identify documents that are either overly simplistic or convoluted.
- Language Detection: Ensures the document is in the expected language.
- Integration with
languagetool-service: Results fromlanguagetool-service(e.g., grammar, style issues) will contribute to this score.
- Information Density (Weighted 0.2):
- Text Length: After cleaning, ensures a minimum amount of text. A 200-page document with only 50 words of actual content is likely useless.
- Uniqueness: Checks if this is a duplicate or near-duplicate of a document already in the system.
- Structural Integrity (Weighted 0.5):
- Weighted Scoring Model:
QUALITY_SCORE = (0.5 * Structural_Score) + (0.3 * Clarity_Score) + (0.2 * Density_Score)- Structural_Score Calculation:
- If OCR Confidence is > 95%, score is 1.0.
- If OCR Confidence is 80-95%, score is 0.7.
- If OCR Confidence is < 80%, score is 0.1.
- If the file fails to parse, the score is 0.0.
- Clarity_Score Calculation:
- Calculate the percentage of boilerplate text. If boilerplate is < 10%, the score is 1.0. If it's > 50%, the score is 0.2.
- Adjust score based on
languagetool-servicereport (e.g., penalize for critical grammar issues).
- Density_Score Calculation:
- If the cleaned text length is < 50 words, the score is 0.1. If it's > 200 words, the score is 1.0.
- Structural_Score Calculation:
- Triage Thresholds:
- Auto-Accept Threshold:
SCORE > 0.85(High confidence. Ingest directly into the ChainAlign vector database without human intervention). - Manual Review Threshold:
0.50 <= SCORE <= 0.85(Potentially valuable but has issues. Route to a human review queue). - Auto-Reject Threshold:
SCORE < 0.50(Low quality, likely "garbage." Automatically move to a "rejected" folder for archival or deletion).
- Auto-Accept Threshold:
- Key Method:
class ContentQualityValidator:
"""
Validates extracted content quality based on defined dimensions and a weighted scoring model.
"""
def validate(self, parsed_content: Dict, analysis: Dict) -> Dict:
"""
Returns quality metrics and an overall score:
{
'overall_score': 0.85,
'structural_score': 0.9,
'clarity_score': 0.8,
'density_score': 0.85,
'issues': [],
'warnings': []
}
"""
# Placeholder for actual implementation of score calculation
structural_score = self._calculate_structural_score(parsed_content, analysis)
clarity_score = self._calculate_clarity_score(parsed_content, analysis)
density_score = self._calculate_density_score(parsed_content, analysis)
overall_score = (0.5 * structural_score) + (0.3 * clarity_score) + (0.2 * density_score)
return {
'overall_score': overall_score,
'structural_score': structural_score,
'clarity_score': clarity_score,
'density_score': density_score,
'issues': self._identify_issues(overall_score),
'warnings': self._identify_warnings(overall_score)
}
def _calculate_structural_score(self, parsed_content: Dict, analysis: Dict) -> float:
"""
Calculates structural integrity score based on OCR confidence, parsing success, and file health.
"""
# Example logic based on FSD
if not analysis.get('parsing_successful', True): return 0.0
ocr_confidence = analysis.get('ocr_confidence', 1.0) # Assume 1.0 if not OCR
if ocr_confidence > 0.95: return 1.0
if ocr_confidence >= 0.80: return 0.7
return 0.1
def _calculate_clarity_score(self, parsed_content: Dict, analysis: Dict) -> float:
"""
Calculates content clarity and coherence score based on boilerplate percentage.
"""
# Example logic based on FSD
boilerplate_percentage = analysis.get('boilerplate_percentage', 0.0)
if boilerplate_percentage < 0.10: return 1.0
if boilerplate_percentage > 0.50: return 0.2
return 0.7 # Default for moderate boilerplate
def _calculate_density_score(self, parsed_content: Dict, analysis: Dict) -> float:
"""
Calculates information density score based on cleaned text length.
"""
# Example logic based on FSD
cleaned_text_length = len(parsed_content.get('text', ''))
if cleaned_text_length < 50: return 0.1
if cleaned_text_length > 200: return 1.0
return 0.7 # Default for moderate length
def _identify_issues(self, overall_score: float) -> List[str]:
"""
Identifies critical issues based on overall score (Auto-Reject).
"""
issues = []
if overall_score < 0.50: issues.append('Document quality too low for automated processing (Auto-Reject).')
return issues
def _identify_warnings(self, overall_score: float) -> List[str]:
"""
Identifies warnings for documents that might need manual review.
"""
warnings = []
if 0.50 <= overall_score <= 0.85: warnings.append('Document requires manual review due to moderate quality.')
return warnings
3.5 Chunking Service (NEW)
- Location:
python-services/chunking-service/ - Purpose: Responsible for splitting the validated content into smaller, semantically meaningful chunks suitable for embedding and retrieval. It operates after content quality validation.
- Key Methods:
class ChunkingService:
"""
Splits extracted document content into optimized chunks.
"""
def split_document(self, content_blocks: List[Dict], chunking_strategy: str = 'semantic') -> List[Dict]:
"""
Splits the list of content blocks (text, tables) into smaller chunks.
Each chunk will be annotated with metadata like original page, block type, etc.
Args:
content_blocks (List[Dict]): A list of extracted content blocks (e.g., {'type': 'text', 'content': '...', 'page': 1}, {'type': 'table', 'data': '...', 'page': 2}).
chunking_strategy (str): The strategy to use (e.g., 'semantic', 'recursive-char', 'page-based').
Returns:
List[Dict]: A list of chunks, each with 'text' and 'metadata' fields.
"""
pass - Responsibilities:
- Apply various chunking strategies (e.g., fixed-size, recursive character, semantic splitting, table-aware).
- Preserve metadata and context within chunks (e.g., original page number, table ID, section headers).
- Handle different content types (text, tables) appropriately during splitting).
3.5 Chunking Service (NEW)
-
Location:
python-services/chunking-service/ -
Purpose: Responsible for splitting the validated content into smaller, semantically meaningful chunks suitable for embedding and retrieval. It operates after content quality validation.
-
Key Methods:
class ChunkingService:
"""
Splits extracted document content into optimized chunks.
"""
def split_document(self, content_blocks: List[Dict], chunking_strategy: str = 'semantic') -> List[Dict]:
"""
Splits the list of content blocks (text, tables) into smaller chunks.
Each chunk will be annotated with metadata like original page, block type, etc.
Args:
content_blocks (List[Dict]): A list of extracted content blocks (e.g., {'type': 'text', 'content': '...', 'page': 1}, {'type': 'table', 'data': '...', 'page': 2}).
chunking_strategy (str): The strategy to use (e.g., 'semantic', 'recursive-char', 'page-based').
Returns:
List[Dict]: A list of chunks, each with 'text' and 'metadata' fields.
"""
pass -
Responsibilities:
- Apply various chunking strategies (e.g., fixed-size, recursive character, semantic splitting, table-aware).
- Preserve metadata and context within chunks (e.g., original page number, table ID, section headers).
- Handle different content types (text, tables) appropriately during splitting.
-
Location:
python-services//quality_validator.py -
Purpose: Validates the quality of extracted content (text, tables) and provides a confidence score based on defined quality dimensions and a weighted scoring model.
-
Quality Dimensions:
- Structural Integrity (Weighted 0.5):
- OCR Confidence: For scanned documents, confidence score from the OCR engine (e.g., Google Cloud Vision). Low confidence means garbled text.
- Formatting & Parsing: Ability to extract clean text blocks without mixing up headers, footers, and main content.
- File Health: Detects corruption or empty documents.
- Content Clarity & Coherence (Weighted 0.3):
- Signal-to-Noise Ratio: Measures valuable content versus boilerplate (e.g., navigation links, "Terms of Service," page numbers, legal disclaimers).
- Readability Score: A metric like the Flesch-Kincaid score to identify documents that are either overly simplistic or convoluted.
- Language Detection: Ensures the document is in the expected language.
- Information Density (Weighted 0.2):
- Text Length: After cleaning, ensures a minimum amount of text. A 200-page document with only 50 words of actual content is likely useless.
- Uniqueness: Checks if this is a duplicate or near-duplicate of a document already in the system.
- Structural Integrity (Weighted 0.5):
-
Weighted Scoring Model:
QUALITY_SCORE = (0.5 * Structural_Score) + (0.3 * Clarity_Score) + (0.2 * Density_Score)- Structural_Score Calculation:
- If OCR Confidence is > 95%, score is 1.0.
- If OCR Confidence is 80-95%, score is 0.7.
- If OCR Confidence is < 80%, score is 0.1.
- If the file fails to parse, the score is 0.0.
- Clarity_Score Calculation:
- Calculate the percentage of boilerplate text. If boilerplate is < 10%, the score is 1.0. If it's > 50%, the score is 0.2.
- Density_Score Calculation:
- If the cleaned text length is < 50 words, the score is 0.1. If it's > 200 words, the score is 1.0.
- Structural_Score Calculation:
-
Triage Thresholds:
- Auto-Accept Threshold:
SCORE > 0.85(High confidence. Ingest directly into the ChainAlign vector database without human intervention). - Manual Review Threshold:
0.50 <= SCORE <= 0.85(Potentially valuable but has issues. Route to a human review queue). - Auto-Reject Threshold:
SCORE < 0.50(Low quality, likely "garbage." Automatically move to a "rejected" folder for archival or deletion).
- Auto-Accept Threshold:
-
Key Method:
class ContentQualityValidator:
"""
Validates extracted content quality based on defined dimensions and a weighted scoring model.
"""
def validate(self, parsed_content: Dict, analysis: Dict) -> Dict:
"""
Returns quality metrics and an overall score:
{
'overall_score': 0.85,
'structural_score': 0.9,
'clarity_score': 0.8,
'density_score': 0.85,
'issues': [],
'warnings': []
}
"""
# Placeholder for actual implementation of score calculation
structural_score = self._calculate_structural_score(parsed_content, analysis)
clarity_score = self._calculate_clarity_score(parsed_content, analysis)
density_score = self._calculate_density_score(parsed_content, analysis)
overall_score = (0.5 * structural_score) + (0.3 * clarity_score) + (0.2 * density_score)
return {
'overall_score': overall_score,
'structural_score': structural_score,
'clarity_score': clarity_score,
'density_score': density_score,
'issues': self._identify_issues(overall_score),
'warnings': self._identify_warnings(overall_score)
}
def _calculate_structural_score(self, parsed_content: Dict, analysis: Dict) -> float:
"""
Calculates structural integrity score based on OCR confidence, parsing success, and file health.
"""
# Example logic based on FSD
if not analysis.get('parsing_successful', True): return 0.0
ocr_confidence = analysis.get('ocr_confidence', 1.0) # Assume 1.0 if not OCR
if ocr_confidence > 0.95: return 1.0
if ocr_confidence >= 0.80: return 0.7
return 0.1
def _calculate_clarity_score(self, parsed_content: Dict, analysis: Dict) -> float:
"""
Calculates content clarity and coherence score based on boilerplate percentage.
"""
# Example logic based on FSD
boilerplate_percentage = analysis.get('boilerplate_percentage', 0.0)
if boilerplate_percentage < 0.10: return 1.0
if boilerplate_percentage > 0.50: return 0.2
return 0.7 # Default for moderate boilerplate
def _calculate_density_score(self, parsed_content: Dict, analysis: Dict) -> float:
"""
Calculates information density score based on cleaned text length.
"""
# Example logic based on FSD
cleaned_text_length = len(parsed_content.get('text', ''))
if cleaned_text_length < 50: return 0.1
if cleaned_text_length > 200: return 1.0
return 0.7 # Default for moderate length
def _identify_issues(self, overall_score: float) -> List[str]:
"""
Identifies critical issues based on overall score (Auto-Reject).
"""
issues = []
if overall_score < 0.50: issues.append('Document quality too low for automated processing (Auto-Reject).')
return issues
def _identify_warnings(self, overall_score: float) -> List[str]:
"""
Identifies warnings for documents that might need manual review.
"""
warnings = []
if 0.50 <= overall_score <= 0.85: warnings.append('Document requires manual review due to moderate quality.')
return warnings
4.0 Routing Logic
The decision tree below illustrates how documents are routed through the pipeline based on their type, complexity, and quality score.
5.0 Implementation Plan
The implementation will follow a phased approach, focusing on building a robust foundation before layering on advanced parsing and quality validation.
Phase 1: Foundation
- Tasks:
- Create
DocumentIntelligenceService.js(Backend): Implement file type detection, basic PDF complexity analysis, and initial route decision logic. - Create PDF Parser Service (Python): Set up the directory structure (
python-services/pdf-parser-service/) and FastAPI application, including thePDFComplexityAnalyzer. - Integrate with
ingestionRoutes.js: Modify existing ingestion routes to call theDocumentIntelligenceServiceand add metadata about routing decisions.
- Create
- Deliverables: Document Intelligence Service with basic routing, PDF complexity analyzer, and updated ingestion routes.
Phase 2: Parser Implementation
- Tasks:
- Text-Only PDF Parser: Integrate PyPDF2 for fast extraction of simple PDFs.
- Table Parser: Set up Camelot for simple tables, integrate Tabula for complex tables, and Document AI as a fallback.
- Multi-Page Table Handler: Implement detection and merge logic for tables spanning multiple pages.
- Deliverables: Working parsers for various PDF types and multi-page table handling.
Phase 3: Quality Validation
- Tasks:
- Content Quality Validator: Implement text quality metrics, table integrity checks, and confidence scoring.
- Quality Gate Integration: Configure thresholds, implement a review queue for low-quality documents, and define retry logic.
- Deliverables: A functional quality validation system and a review queue for flagged documents.
Phase 4: Integration & Testing
- Tasks:
- Pipeline Orchestrator: Implement the full end-to-end pipeline flow, including error handling and retry logic.
- Testing: Conduct unit tests for each parser, integration tests for the entire pipeline, and end-to-end tests with real financial documents.
- Documentation: Generate API documentation, a parser selection guide, and a quality metrics guide.
- Deliverables: A complete, tested ingestion pipeline and comprehensive documentation.
6.0 Data Flow Examples
Example 1: Financial Report PDF with Tables
- Upload:
quarterly_report.pdf - Intelligence Analysis:
- Type: PDF
- Complexity: MIXED (text + tables)
- Table pages:
[3, 4, 5, 6] - Confidence: 0.92
- Routing Decision: Use Table Parser
- Parsing:
- Pages 1-2: Text-only (PyPDF2)
- Pages 3-4: Table 1 (Camelot)
- Pages 5-6: Table 1 continuation (MultiPageTableHandler)
- Quality Validation:
- Text quality: 0.95
- Table integrity: 0.88
- Overall: 0.91 (PASS)
- Chunking:
- Strategy: Financial document chunking
- Preserve table structure
- Tag as tenant-specific
- Storage:
- Embeddings generated
- Knowledge graph updated
- Vector store indexed
Example 2: Simple Text Document
- Upload:
company_policy.md - Intelligence Analysis:
- Type: Markdown
- Complexity: SIMPLE (text-only)
- Routing Decision: Text Parser (skip AI)
- Parsing: Direct text extraction
- Quality Validation: 0.98 (PASS)
- Chunking: Standard semantic chunking
- Storage: Embed + store
Example 3: Structured Data (M33 Path)
- Upload:
sales_data.csv - Intelligence Analysis:
- Type: Structured Data
- Complexity: N/A
- Routing Decision: M33 Data Entry Intelligence Service
- Processing: (M33 pipeline)
- Gap identification
- Quality scoring
- Staging if needed
- Storage: Core schema tables
7.0 Configuration
The following environment variables will be used to configure the pipeline components.
- PDF Parser Service:
PDF_PARSER_SERVICE_URL=http://pdf-parser-service:8004PDF_COMPLEXITY_THRESHOLD=0.7USE_DOCUMENT_AI_FALLBACK=true
- Quality Thresholds:
MIN_QUALITY_SCORE=0.7MIN_TABLE_INTEGRITY=0.75MIN_TEXT_QUALITY=0.8
- Parser Selection:
PREFER_LOCAL_PARSERS=true(Use PyPDF2/Camelot before Document AI)MAX_DOCUMENT_AI_CALLS_PER_DAY=1000
- Tenant Routing:
TENANT_SPECIFIC_SOURCES=['upload', 'data_entry', 'tenant_kb']GLOBAL_KB_SOURCES=['admin_upload', 'public_docs', 'industry_reports']
8.0 Database Schema Updates
The following tables will be added or updated to support the ingestion pipeline.
document_processing_log (New Table)
CREATE TABLE document_processing_log (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID REFERENCES tenants(id),
document_id UUID,
original_filename VARCHAR(500),
file_type VARCHAR(50),
complexity_analysis JSONB,
parser_used VARCHAR(100),
quality_score JSONB,
processing_time_ms INTEGER,
status VARCHAR(50), -- 'success', 'needs_review', 'failed'
error_details TEXT,
created_at TIMESTAMP DEFAULT NOW()
);
extracted_tables (New Table)
CREATE TABLE extracted_tables (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
document_id UUID REFERENCES document_processing_log(document_id),
table_index INTEGER,
page_numbers INTEGER[],
is_multi_page BOOLEAN,
column_headers TEXT[],
row_count INTEGER,
extraction_confidence FLOAT,
table_data JSONB,
created_at TIMESTAMP DEFAULT NOW()
);
9.0 Monitoring & Metrics
Key metrics will be tracked to ensure the health, performance, and cost-effectiveness of the pipeline.
- Parser Performance:
- Time per document type.
- Success rate by parser.
- Quality scores by parser.
- Quality Metrics:
- Average quality score.
- Documents flagged for review.
- Retry success rate.
- Cost Metrics:
- Document AI API calls.
- Cost per document.
- Local vs. cloud parsing ratio.
- Business Metrics:
- Documents processed per day.
- Time to availability.
- User satisfaction (manual review rate).
10.0 Open Questions & Decisions Needed
The following questions need to be resolved to finalize the design and proceed with implementation.
- Document AI Budget: What is the monthly budget for Document AI API calls? This will influence the
MAX_DOCUMENT_AI_CALLS_PER_DAYconfiguration and fallback strategies. - Document Types Distribution: What percentage of incoming documents fall into each category (e.g., simple text PDFs, PDFs with tables, forms)? This will help prioritize parser development.
- Quality Requirements: What is the acceptable quality threshold for automated ingestion? This will define the
MIN_QUALITY_SCOREand other related thresholds. - Latency Requirements: How fast does document ingestion need to be for different document types?
- Cognee Integration: Should Cognee be called for all documents or only certain types/complexities?
11.0 M46 Implementation Details (READY FOR PRODUCTION)
Status: Production-ready design finalized November 1, 2025 Approach: Pragmatic dual-strategy (PyPDF2 + OCR fallback) Not a document management system - Ingestion focuses on data extraction for downstream AI processing
11.1 Revised Architecture: Simplified & Cost-Effective
The FSD proposes complex multi-stage fallback with Document AI as primary. M46 implementation reverses this:
Strategy: 90% efficiency with 10% cost
- PyPDF2 first (80-90% of PDFs) → Fast, free, <100ms per page
- OCR fallback (10-20% of PDFs) → Google Cloud Vision for scanned/complex layouts
- Single unified service → No complex orchestration, easier testing
ENTRY POINTS (Both → Same Pipeline):
├── POST /api/v1/ingest/document (Production API)
└── POST /api/ingest/upload (Demo UI)
PROCESSING:
1. Complexity Detection → text-based? scanned? tables?
2. Text Extraction (Dual Strategy)
├─ PyPDF2: Simple extraction (if text_density > 0.3)
└─ OCR: Google Cloud Vision (if text_density < 0.3 or extraction fails)
3. Langextract Enrichment → language, clarity_score, quality signals
4. Table Extraction → Camelot for structured, OCR text for complex
5. Content-Aware Chunking → semantic chunks with metadata
6. Metadata Attachment → Every chunk gets: tenant_id, document_id, source_type,
page_number, language, quality_score, created_at
7. PostgreSQL Storage → Chunks stored with full metadata chain
11.2 Key Implementation Files
Backend Entry Point:
backend/src/routes/ingestionRoutes.js- Expose dual entry points (API + UI)backend/src/services/DocumentIntelligenceService.js- Complexity detection (already exists, needs implementation)
PDF Parser Service (NEW):
python-services/pdf-parser-service/main.py- FastAPI servicepython-services/pdf-parser-service/parser.py- PDFParserService classparse_document()- Main entry point_detect_complexity()- PyPDF2-based analysis_extract_text_pypdf2()- Fast path_extract_text_ocr()- Google Cloud Vision fallback_extract_tables()- Camelot + OCR table extraction
Integration Points:
- Existing:
python-services/content-aware-chunking-service/- Enhanced for metadata propagation - Existing:
python-services/languagetool-service/- Called for quality enrichment - Existing: PostgreSQL + pgvector - Final storage with tenant scoping
11.3 Python Dependencies
fastapi==0.104.0
PyPDF2==3.0.1 # Text extraction (PRIMARY)
google-cloud-vision==3.5.0 # OCR (FALLBACK)
camelot-py==0.10.1 # Table extraction
pdf2image==1.16.3 # PDF → images for OCR
requests==2.31.0 # HTTP to langextract
pydantic==2.4.0
python-multipart==0.0.6
11.4 Return Format (Standard Response)
{
"status": "success",
"document_id": "uuid",
"tenant_id": "uuid",
"chunks": {
"total": 47,
"preview": [
{
"text": "Financial statement Q3 2025...",
"metadata": {
"tenant_id": "uuid",
"document_id": "uuid",
"source_type": "api|upload",
"page_number": 1,
"chunk_index": 0,
"language": "en",
"quality_score": 0.94,
"extraction_confidence": 0.95,
"created_at": "2025-11-01T13:30:00Z"
}
}
]
},
"extraction_metadata": {
"pages": 8,
"tables_found": 2,
"language": "en",
"quality_score": 0.94,
"extraction_confidence": 0.92,
"processing_time_ms": 2850,
"strategy_used": "pypdf2" # or "ocr"
}
}
11.5 Multi-Tenant Data Isolation (CRITICAL)
Principle: Every chunk scoped to tenant_id
CREATE TABLE document_chunks (
id UUID PRIMARY KEY,
tenant_id UUID NOT NULL, -- ← CRITICAL for isolation
document_id UUID NOT NULL,
content TEXT NOT NULL,
metadata JSONB NOT NULL,
embedding vector(1536),
created_at TIMESTAMP,
FOREIGN KEY (tenant_id) REFERENCES tenants(id)
);
-- ALL queries must include: WHERE tenant_id = ?
11.6 Error Handling & Fallbacks
| Scenario | Behavior |
|---|---|
| PyPDF2 fails | Immediately trigger OCR |
| OCR fails | Return error + partial extraction |
| Table extraction fails | Return text only (skip structured table) |
| Langextract unavailable | Skip enrichment, continue |
| Storage fails | Return 500 + log (data queued for retry) |
11.7 Quality Metrics to Track
- extraction_strategy_used_distribution (pypdf2 vs ocr %)
- extraction_confidence_histogram
- quality_score_histogram
- processing_time_percentiles (p50, p95, p99)
- ocr_fallback_rate
- table_extraction_success_rate
- storage_success_rate
11.8 Demo vs Production Differences
| Aspect | Demo | Production |
|---|---|---|
| Entry Point | UI upload + API | API only |
| File Size | 10MB | 50MB |
| OCR Fallback | Yes | Yes |
| Output | UI summary + chunks | Chunks only |
| Retry | Simple (3x) | Exponential backoff + DLQ |
12.0 Alternative Approaches Considered
Option 1: Use Only Document AI
- Pros: Single API, handles everything.
- Cons: Expensive, slower, potentially overkill for simple documents.
Option 2: Use Only Open-Source
- Pros: Free, fast.
- Cons: May not handle complex tables or forms as effectively as specialized services.
Option 3: Hybrid (RECOMMENDED)
- Pros: Cost-effective, flexible, high quality by leveraging the strengths of both open-source and cloud-based solutions.
- Cons: More complex implementation due to integrating multiple tools and services.
12.0 Sign-Off & Approvals
| Role | Name | Approval Status | Date | Signature |
|---|---|---|---|---|
| Engineering Lead | ☐ Approved ☐ Rejected ☐ Needs Revision | |||
| Product Manager | ☐ Approved ☐ Rejected ☐ Needs Revision | |||
| Data Architect | ☐ Approved ☐ Rejected ☐ Needs Revision | |||
| CTO | ☐ Approved ☐ Rejected ☐ Needs Revision |
Comments / Concerns:
END OF FUNCTIONAL SPECIFICATION DOCUMENT