Skip to main content

Document Ingestion Service (Python)

This document describes the FastAPI application serving as the Ingestion Service for ChainAlign. It orchestrates the document ingestion pipeline, utilizing Google Cloud Document AI for processing various document types, Google Cloud Storage for file handling, and external microservices for chunking, embedding, and natural language extraction.

Overview

The Ingestion Service is a FastAPI application responsible for managing the lifecycle of documents from upload to structured storage and further processing.

Key functionalities include:

  • Receiving document uploads via a REST API.
  • Processing documents using Document AI for compatible types (PDF, images).
  • Direct processing of text-based files (Markdown, TXT, CSV).
  • Calling external services for content-aware chunking and embedding generation.
  • Storing processed document chunks and metadata in a PostgreSQL database.
  • Integrating with a LangExtract service for entity, attribute, and relationship extraction.
  • Providing health check and batch status endpoints.

Environment Variables

The service relies on several environment variables for configuration, including database connection details, Google Cloud project settings, and URLs for external services.

Helper Functions

create_document_record(tenant_id, title, file_path, status, doc_type, metadata)

Creates a new document record in the database.

store_chunk(tenant_id, document_id, chunk_text, chunk_order, embedding, metadata)

Stores a document chunk and its embedding in the database.

generate_embedding(text)

Generates an embedding for a given text using Vertex AI.

strip_markdown(markdown_text)

Strips markdown formatting from a given text.

API Endpoints

POST /ingest/

Summary: Ingests a document for processing.

Description: Receives an uploaded file, determines its type, and initiates the appropriate processing pipeline (direct text processing or Document AI batch processing).

Parameters:

  • tenant_id (query parameter): The ID of the tenant associated with the document.
  • file (form data): The document file to upload.

Responses:

  • 200 OK: Document accepted for processing.
  • 400 Bad Request: No file uploaded or unsupported file type.
  • 500 Internal Server Error: Error during processing.

GET /health

Summary: Health check endpoint.

Description: Returns the status of the service, indicating whether Google Cloud clients are initialized.

Responses:

  • 200 OK: { "status": "ok", "google_cloud_initialized": true } or { "status": "error", "google_cloud_initialized": false, "detail": "Google Cloud clients not initialized." }

GET /batch-status/{operation_name}

Summary: Get the status of a Document AI batch processing operation.

Description: Retrieves the current status of a long-running Document AI operation.

Parameters:

  • operation_name (path parameter): The full name of the Document AI operation.

Responses:

  • 200 OK: { "status": "running" }, { "status": "completed" }, or { "status": "failed" }.
  • 500 Internal Server Error: Error checking batch status.