Skip to main content

Functional Specification: ERP Integration System

Executive Summary

ChainAlign's ERP Integration System positions the platform as an intelligent iPaaS (Integration Platform as a Service) that normalizes disparate customer ERP systems to ChainAlign's canonical S&OP data model. Unlike traditional integration approaches that require customers to build data lakes or implement complex ETL pipelines, ChainAlign assumes the integration burden by providing:

  1. Multi-protocol ingestion (REST, GraphQL, SOAP, VPN/SQL) through a single stable gateway
  2. LLM-powered semantic mapping that learns customer-specific field mappings
  3. Self-improving ontology that evolves with each validated integration
  4. Agentic automation that minimizes human intervention over time

This architecture enables customers to integrate in days (via CSV) while evolving to real-time sync (via API) without rebuilding integrations.


1. Business Context

1.1 Problem Statement

Enterprise customers using ChainAlign for S&OP need to connect data from their existing ERP systems (SAP, Oracle, Microsoft Dynamics, Adler, etc.). Traditional approaches impose significant friction:

  • Customer-side data lakes require 18-24 months and dedicated data engineering teams
  • Fixed schema mappings break when ERPs are upgraded or customized
  • One-size-fits-all APIs don't accommodate industry-specific fields
  • Manual CSV exports don't scale for real-time decision-making

1.2 Strategic Differentiation

ChainAlign's approach:

"Upload CSV today, sync in real-time tomorrow"

  • Start with CSV onboarding (2-day value delivery)
  • Evolve to API integration (weeks, not months)
  • Single semantic engine for all ingestion methods
  • Customer-agnostic architecture (support whatever they offer)
  • Self-service governance (tenants can map their own fields over time)

This positions ChainAlign as a SaaS-led iPaaS that removes integration as a barrier to S&OP transformation.


2. High-Level Architecture

2.1 System Components

┌─────────────────────────────────────────────────────────────────┐
│ Customer ERP Systems │
│ (SAP, Oracle, Adler, Dynamics, Custom) │
└─────────────────┬───────────────────────────────────────────────┘
│ REST / GraphQL / SOAP / VPN / CSV

┌─────────────────────────────────────────────────────────────────┐
│ gateway.chainalign.com │
│ (GCP API Gateway - Single Stable Endpoint) │
│ - Environment routing (dev/staging/prod) │
│ - Version routing (Stripe-style headers) │
│ - Auth, rate limiting, DDoS protection │
└─────────────────┬───────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────┐
│ Ingestion Orchestrator (Node.js/Cloud Run) │
│ - Protocol detection (Content-Type parsing) │
│ - Payload normalization (REST/GraphQL/SOAP → JSON) │
│ - Schema extraction (field names, types, samples) │
│ - Queue writing (pgmq: ingestion_queue) │
└─────────────────┬───────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────┐
│ pgmq: ingestion_queue │
│ Format: {source, tenant_id, protocol, raw_data, schema_hints} │
└─────────────────┬───────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────┐
│ Semantic Mapper Agent (Python/LangGraph/Cloud Run) │
│ │
│ Tiered Intelligence Workflow: │
│ 1. Exact match → ontology_concepts (PostgreSQL) │
│ 2. Semantic search → pgvector embeddings │
│ 3. LLM reasoning → Gemini confidence scoring │
│ 4. Human validation → pending_field_mappings queue │
│ 5. Execute mapping → update DB / create migration │
│ │
│ LangGraph enables cyclic refinement (retry with new strategy) │
└─────────────────┬───────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────┐
│ Knowledge & Governance Layer │
│ │
│ • Core Ontology (PostgreSQL + pgvector) │
│ - Canonical S&OP concepts (sales, inventory, forecast) │
│ - Synonyms learned from validations │
│ - Embeddings for semantic similarity │
│ │
│ • Admin Validation UI (React - Extended CSV UI) │
│ - Review pending mappings across all sources │
│ - Approve/reject/correct/mark-as-new-concept │
│ - Approvals train the agent, update ontology │
│ │
│ • ERP Version Registry │
│ - Track known ERP versions and schema changes │
│ - Proactive monitoring of vendor releases │
│ - Anomaly detection for schema drift │
└─────────────────────────────────────────────────────────────────┘

2.2 Data Flow

End-to-End Request Flow:

Customer ERP
→ gateway.chainalign.com
→ GCP API Gateway (auth, routing)
→ Ingestion Orchestrator (protocol parsing)
→ pgmq ingestion_queue
→ Semantic Mapper Agent (LangGraph)
├─ Exact match? → Auto-map to PostgreSQL
├─ Semantic match? → Auto-map with confidence
├─ LLM scoring? → Auto-map if >95% confidence
└─ Low confidence? → pending_field_mappings → Human validation
→ PostgreSQL (core_entities, sales, locations, etc.)

3. Protocol Support

3.1 REST APIs (Pull or Push)

Customer exposes endpoints:

  • ChainAlign polls on schedule (hourly/daily) OR
  • Customer POSTs to gateway.chainalign.com/ingest/rest

Orchestrator handling:

  • Parse JSON payload
  • Extract field names, types, sample values
  • Write to ingestion_queue with protocol: 'REST'

Example:

POST /ingest/rest HTTP/1.1
Host: gateway.chainalign.com
Content-Type: application/json
ChainAlign-Version: 2025-11-17
Authorization: Bearer <tenant_token>

{
"sales_orders": [
{
"order_id": "SO-12345",
"product_sku": "WIDGET-001",
"quantity": 100,
"order_date": "2025-11-15"
}
]
}

3.2 GraphQL (Write via Mutations)

Customer writes mutations to ChainAlign:

Example mutation:

mutation SyncInventory($items: [InventoryItemInput!]!) {
syncInventory(items: $items) {
status
itemsProcessed
errors {
item
message
}
}
}

Orchestrator handling:

  • Validate against published schema
  • Extract structured data
  • Write to ingestion_queue with protocol: 'GraphQL'

3.3 GraphQL (Real-time via Subscriptions)

ChainAlign publishes events, customer pushes data in response:

Customer subscribes:

subscription OnDataRequested {
dataRequested {
requestId
fields
dateRange
}
}

When ChainAlign needs fresh data:

  • Publishes dataRequested event
  • Customer responds with mutation containing requested data

3.4 SOAP/Legacy

Customer sends SOAP XML:

Example:

POST /ingest/soap HTTP/1.1
Host: gateway.chainalign.com
Content-Type: application/soap+xml

<soap:Envelope>
<soap:Body>
<SyncSalesOrders>
<Order>
<OrderID>12345</OrderID>
<ProductSKU>WIDGET-001</ProductSKU>
</Order>
</SyncSalesOrders>
</soap:Body>
</soap:Envelope>

Orchestrator handling:

  • Parse SOAP XML using WSDL definitions
  • Convert to canonical JSON structure
  • Write to ingestion_queue with protocol: 'SOAP'

3.5 VPN/Direct Database Connection

Secure tunnel to customer's SQL Server:

  • ChainAlign runs scheduled jobs
  • Queries tables directly via read-only credentials
  • Treats query results like REST responses
  • Write to ingestion_queue with protocol: 'SQL'

3.6 CSV Upload (Existing Flow)

Unified with ERP integration:

  • CSV upload UI writes to same ingestion_queue
  • Same Semantic Mapper Agent processes all sources
  • CSV templates stored in ontology as learned mappings

4. Semantic Mapper Agent (LangGraph)

4.1 Agent Workflow

The Python agent processes each queued ingestion job through a cyclic decision graph using LangGraph:

# Conceptual LangGraph structure
from langgraph.graph import StateGraph, END

workflow = StateGraph()

# Nodes
workflow.add_node("extract_schema", extract_schema_node)
workflow.add_node("exact_match", exact_match_lookup)
workflow.add_node("semantic_search", semantic_search_node)
workflow.add_node("llm_reasoning", llm_reasoning_node)
workflow.add_node("human_validation", human_validation_queue)
workflow.add_node("execute_mapping", execute_mapping_node)

# Edges (with conditional routing)
workflow.set_entry_point("extract_schema")
workflow.add_edge("extract_schema", "exact_match")
workflow.add_conditional_edges(
"exact_match",
should_use_semantic_search,
{
"auto_map": "execute_mapping",
"semantic_search": "semantic_search"
}
)
workflow.add_conditional_edges(
"semantic_search",
should_use_llm,
{
"high_confidence": "execute_mapping",
"llm_reasoning": "llm_reasoning"
}
)
workflow.add_conditional_edges(
"llm_reasoning",
check_llm_confidence,
{
"auto_map": "execute_mapping",
"human_review": "human_validation",
"retry": "semantic_search" # Cycle back with refined query
}
)
workflow.add_edge("human_validation", END) # Waits for human approval
workflow.add_edge("execute_mapping", END)

4.2 Node Details

Node 1: Schema Extraction

Purpose: Parse raw ingestion data to understand structure

Actions:

  • Extract field names from JSON/XML/CSV
  • Infer data types (string, numeric, date, boolean)
  • Collect sample values (first 5-10 records)
  • Generate field fingerprints (e.g., "10-15 digits, prefix 'INV-'" → invoice_number)

Output:

{
"fields": [
{
"name": "proj_stage_bill",
"type": "numeric",
"samples": [1250.00, 3400.50, 890.25],
"fingerprint": "decimal_currency"
}
]
}

Node 2: Exact Match Lookup

Purpose: Fast lookup for known field names

Actions:

  • Query ontology_concepts table by exact name
  • Check synonyms array for known aliases
  • If match found → confidence = 100% → Route to execute_mapping

SQL:

SELECT concept_id, canonical_field
FROM ontology_concepts
WHERE tenant_id = $1
AND (concept_name = $2 OR $2 = ANY(synonyms))

Node 3: Semantic Search (Fallback)

Purpose: Find similar concepts when exact match fails

Actions:

  • Generate embedding for unknown field (name + sample context)
  • Query pgvector for nearest neighbors in ontology
  • Calculate cosine similarity
  • If similarity > 0.85 → High-confidence synonym → execute_mapping
  • If 0.60-0.85 → Pass to LLM reasoning
  • If < 0.60 → Low confidence → LLM reasoning

SQL:

SELECT concept_id, canonical_field,
embedding <=> $1 AS distance
FROM ontology_concepts
WHERE tenant_id = $2
ORDER BY embedding <=> $1
LIMIT 3

Node 4: LLM Reasoning

Purpose: Use Gemini to determine semantic equivalence

Prompt:

You are a data integration specialist for S&OP systems.

Context:
- Customer field: "{field_name}"
- Data type: {type}
- Sample values: {samples}
- Customer industry: {tenant_industry}

Top 3 ontology matches:
1. {match1_name} - {match1_description} (similarity: 0.72)
2. {match2_name} - {match2_description} (similarity: 0.68)
3. {match3_name} - {match3_description} (similarity: 0.65)

Question: Is this customer field semantically equivalent to one of these concepts?

Respond in JSON:
{
"is_match": true/false,
"matched_concept_id": "uuid or null",
"confidence": 0-100,
"reasoning": "Explain your decision"
}

If no match, suggest if this is:
- A synonym that should be added
- A truly new concept
- Junk data to ignore

Decision Logic:

  • Confidence > 95% → Auto-map, log reasoning → execute_mapping
  • Confidence 60-95% → Human validation queue
  • Confidence < 60% → Retry semantic search with refined query (cycle up to 2x)

Refinement strategies for retry:

  • Decompose compound field names (e.g., "proj_stage_bill" → "project", "stage", "billing")
  • Use broader domain terms (e.g., "financial" instead of "billing")
  • Query industry-specific aliases

Node 5: Human Validation Queue

Purpose: Route ambiguous mappings to admin review

Actions:

  • Write to pending_field_mappings table:
INSERT INTO pending_field_mappings (
tenant_id, source_field, suggested_concept_id,
llm_confidence, llm_reasoning, sample_data, status
) VALUES ($1, $2, $3, $4, $5, $6, 'pending')
  • Trigger notification (optional - email/Slack)
  • Wait for human decision (workflow exits, resumes on approval)

Human options in UI:

  1. Approve → Update ontology, execute mapping
  2. Correct → Select different concept, execute mapping
  3. Mark as new concept → Create migration, add to ontology
  4. Reject as junk → Ignore field, don't map

Node 6: Execute Mapping

Purpose: Apply validated mapping to data

Actions:

Case 1: Synonym (most common)

-- Add to ontology
UPDATE ontology_concepts
SET synonyms = array_append(synonyms, $1)
WHERE concept_id = $2;

-- Move data from JSONB staging to proper column
UPDATE core_entities
SET billing_milestone = (custom_attributes->>'proj_stage_bill')::numeric
WHERE tenant_id = $1 AND custom_attributes->>'proj_stage_bill' IS NOT NULL;

-- Clean up staging
UPDATE core_entities
SET custom_attributes = custom_attributes - 'proj_stage_bill'
WHERE tenant_id = $1;

Case 2: New core concept (rare)

// Generate Knex migration
const migration = `
exports.up = function(knex) {
return knex.schema.table('core_entities', (table) => {
table.decimal('carbon_footprint_per_unit', 15, 4);
});
};
`;

// Add to ontology
await ontologyRepository.create({
concept_name: 'carbon_footprint_per_unit',
canonical_field: 'carbon_footprint_per_unit',
domain: 'sustainability',
description: 'CO2 emissions per unit produced'
});

Case 3: Tenant-specific field

-- Stays in JSONB, create tenant-specific index
CREATE INDEX idx_custom_attrs_tenant_field
ON core_entities USING GIN ((custom_attributes->'tenant_specific_field'))
WHERE tenant_id = $1;

4.3 Cyclic Refinement (LangGraph Key Feature)

LangGraph allows the agent to loop back and retry with refined strategies:

Example cycle:

  1. Field: "proj_stage_bill" fails semantic search (similarity 0.55)
  2. LLM reasoning: "Possibly related to billing or project stages" (confidence 58%)
  3. Cycle back to semantic search with decomposed terms: "project", "billing"
  4. New semantic search finds "billing_milestone" (similarity 0.88)
  5. Auto-map with high confidence

Max cycles: 2 (prevents infinite loops)


5. Core Ontology Structure

5.1 Database Schema

-- Core ontology storage
CREATE TABLE ontology_concepts (
concept_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID, -- NULL for global concepts

-- Concept definition
concept_name VARCHAR(100) NOT NULL,
canonical_field VARCHAR(100) NOT NULL, -- DB column name
domain VARCHAR(50), -- 'sales', 'inventory', 'finance', 'production'
description TEXT,

-- Mappings
synonyms TEXT[], -- Known aliases
embedding VECTOR(768), -- Gemini embedding for semantic search

-- Metadata
data_type VARCHAR(20), -- 'string', 'numeric', 'date', 'boolean'
is_required BOOLEAN DEFAULT false,
validation_rules JSONB, -- {min, max, regex, enum_values}

-- Evolution tracking
usage_count INTEGER DEFAULT 0,
last_mapped_at TIMESTAMP,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(),

UNIQUE(tenant_id, concept_name)
);

CREATE INDEX idx_ontology_tenant ON ontology_concepts(tenant_id);
CREATE INDEX idx_ontology_domain ON ontology_concepts(domain);
CREATE INDEX idx_ontology_embedding ON ontology_concepts USING ivfflat (embedding vector_cosine_ops);

-- Pending mappings awaiting human review
CREATE TABLE pending_field_mappings (
mapping_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,

-- Source field info
source_field VARCHAR(200) NOT NULL,
source_type VARCHAR(20),
sample_data JSONB,

-- Suggested mapping
suggested_concept_id UUID,
llm_confidence INTEGER, -- 0-100
llm_reasoning TEXT,

-- Review tracking
status VARCHAR(20) DEFAULT 'pending', -- 'pending', 'approved', 'rejected'
reviewed_by UUID,
reviewed_at TIMESTAMP,
final_concept_id UUID, -- May differ from suggested

created_at TIMESTAMP DEFAULT NOW(),

FOREIGN KEY(tenant_id) REFERENCES tenants(tenant_id),
FOREIGN KEY(suggested_concept_id) REFERENCES ontology_concepts(concept_id),
FOREIGN KEY(final_concept_id) REFERENCES ontology_concepts(concept_id)
);

-- ERP version tracking for proactive monitoring
CREATE TABLE erp_version_registry (
registry_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
erp_system VARCHAR(50) NOT NULL, -- 'SAP_S4HANA', 'ORACLE_EBS', 'ADLER'
version VARCHAR(50) NOT NULL,
release_date DATE,

-- Schema changes in this version
schema_changes JSONB, -- [{type: 'added', field: 'new_field', description}]

-- Affected tenants
affected_tenant_ids UUID[],

-- Monitoring
detected_at TIMESTAMP DEFAULT NOW(),
review_status VARCHAR(20) DEFAULT 'pending',

UNIQUE(erp_system, version)
);

5.2 Initial Ontology Seeding

ChainAlign Standard S&OP Concepts:

const CORE_ONTOLOGY = [
// Product domain
{
concept_name: 'product_sku',
canonical_field: 'sku',
domain: 'product',
synonyms: ['SKU', 'Item', 'ItemCode', 'ProductCode', 'Material'],
data_type: 'string',
is_required: true
},
{
concept_name: 'unit_cost',
canonical_field: 'unit_cost',
domain: 'product',
synonyms: ['COGS', 'CostPerUnit', 'StandardCost', 'MaterialCost'],
data_type: 'numeric'
},

// Sales domain
{
concept_name: 'quantity_sold',
canonical_field: 'quantity_sold',
domain: 'sales',
synonyms: ['SalesQty', 'UnitsSold', 'Volume', 'Quantity'],
data_type: 'numeric',
is_required: true
},
{
concept_name: 'revenue',
canonical_field: 'revenue',
domain: 'sales',
synonyms: ['Sales', 'SalesAmount', 'Income', 'TotalSales'],
data_type: 'numeric'
},

// Inventory domain
{
concept_name: 'inventory_level',
canonical_field: 'quantity_on_hand',
domain: 'inventory',
synonyms: ['OnHand', 'Stock', 'Available', 'QtyAvailable'],
data_type: 'numeric'
},

// Location domain
{
concept_name: 'location_code',
canonical_field: 'location_code',
domain: 'location',
synonyms: ['SiteCode', 'PlantCode', 'WarehouseID', 'DC'],
data_type: 'string'
},

// Financial domain
{
concept_name: 'billing_milestone',
canonical_field: 'billing_milestone',
domain: 'finance',
synonyms: ['ProjectBilling', 'StageBilling', 'MilestonePayment'],
data_type: 'numeric'
}
];

6. Gateway Configuration

6.1 GCP API Gateway Setup

Stable endpoint: gateway.chainalign.com

Routing logic:

# openapi.yaml for GCP API Gateway
swagger: '2.0'
info:
title: ChainAlign Integration Gateway
version: 1.0.0

host: gateway.chainalign.com
schemes:
- https

x-google-backend:
address: https://ingestion-orchestrator-<hash>-uc.a.run.app
protocol: h2

paths:
/ingest/rest:
post:
operationId: ingestREST
summary: Ingest REST data
security:
- api_key: []
parameters:
- in: header
name: ChainAlign-Version
type: string
required: false
description: API version (Stripe-style dated versioning)
x-google-backend:
address: https://ingestion-orchestrator-<hash>-uc.a.run.app/ingest/rest

/graphql:
post:
operationId: graphql
summary: GraphQL endpoint
security:
- api_key: []
x-google-backend:
address: https://ingestion-orchestrator-<hash>-uc.a.run.app/graphql

/ingest/soap:
post:
operationId: ingestSOAP
summary: Ingest SOAP data
security:
- api_key: []
x-google-backend:
address: https://ingestion-orchestrator-<hash>-uc.a.run.app/ingest/soap

securityDefinitions:
api_key:
type: apiKey
name: Authorization
in: header

Environment routing:

  • gateway.chainalign.com → Production
  • gateway-staging.chainalign.com → Staging
  • gateway-dev.chainalign.com → Development

Version routing (Stripe-style):

  • Client sends header: ChainAlign-Version: 2025-11-17
  • Orchestrator reads header, applies appropriate mapping logic
  • Internal refactoring invisible to customers

6.2 Security & Auth

Authentication:

  • API Gateway enforces OAuth 2.0 / API key validation
  • Each tenant gets unique API credentials
  • Rate limiting per tenant (configurable)

Authorization:

  • All ingested data scoped by tenant_id
  • Mapped data only accessible within tenant boundary
  • Admin UI respects tenant isolation

Encryption:

  • TLS 1.3 for data in transit
  • API Gateway terminates SSL
  • Backend connections over private VPC

7. Customer-Facing APIs

7.1 GraphQL Mutation API (Ingest)

Published schema for customers:

type Mutation {
syncSalesOrders(orders: [SalesOrderInput!]!): SyncResponse!
syncInventory(items: [InventoryItemInput!]!): SyncResponse!
syncProducts(products: [ProductInput!]!): SyncResponse!
syncForecasts(forecasts: [ForecastInput!]!): SyncResponse!
}

input SalesOrderInput {
orderNumber: String!
productSKU: String!
quantity: Float!
revenue: Float
orderDate: DateTime!
locationCode: String
# Additional fields go to custom_attributes
customFields: JSON
}

input InventoryItemInput {
productSKU: String!
locationCode: String!
quantityOnHand: Float!
quantityReserved: Float
lastUpdated: DateTime!
customFields: JSON
}

type SyncResponse {
status: SyncStatus!
itemsProcessed: Int!
itemsFailed: Int!
errors: [SyncError!]
queueId: ID! # Reference for tracking async processing
}

enum SyncStatus {
QUEUED
PROCESSING
COMPLETED
FAILED
}

type SyncError {
item: String!
field: String
message: String!
}

Example customer call:

mutation {
syncSalesOrders(orders: [
{
orderNumber: "SO-12345"
productSKU: "WIDGET-001"
quantity: 100
revenue: 5000.00
orderDate: "2025-11-15T10:30:00Z"
locationCode: "DC-EAST"
customFields: {
proj_stage_bill: 1250.00 # Unknown field → semantic mapper
}
}
]) {
status
itemsProcessed
queueId
}
}

7.2 GraphQL Subscription API (Real-time Events)

Published schema:

type Subscription {
dataRequested: DataRequest!
syncStatusUpdate(queueId: ID!): SyncStatusUpdate!
}

type DataRequest {
requestId: ID!
dataType: DataType!
fields: [String!]!
filters: JSON
dateRange: DateRange
}

enum DataType {
SALES_ORDERS
INVENTORY
FORECASTS
PRODUCTS
}

type DateRange {
start: DateTime!
end: DateTime!
}

type SyncStatusUpdate {
queueId: ID!
status: SyncStatus!
progress: Float! # 0-100
itemsProcessed: Int!
errors: [SyncError!]
}

Example customer subscription:

subscription {
dataRequested {
requestId
dataType
fields
dateRange {
start
end
}
}
}

# When ChainAlign needs fresh inventory data:
# Customer receives:
{
"requestId": "req-789",
"dataType": "INVENTORY",
"fields": ["quantityOnHand", "locationCode"],
"dateRange": {
"start": "2025-11-01T00:00:00Z",
"end": "2025-11-17T23:59:59Z"
}
}

# Customer responds with mutation:
mutation {
syncInventory(items: [...]) {
status
queueId
}
}

8. Admin Validation UI

8.1 Extended CSV Admin UI

Location: frontend/src/pages/Admin/FieldMappingQueue.jsx

Features:

  1. Unified View Across All Sources

    • CSV uploads
    • REST API ingestions
    • GraphQL mutations
    • SOAP integrations
    • SQL syncs
  2. Pending Mappings Table

    <DataTable>
    <Column>Source Field</Column>
    <Column>Data Type</Column>
    <Column>Sample Values</Column>
    <Column>Suggested Concept</Column>
    <Column>Confidence</Column>
    <Column>LLM Reasoning</Column>
    <Column>Actions</Column>
    </DataTable>
  3. Review Actions

    • Approve → Add synonym to ontology
    • Correct → Select different concept from dropdown
    • Mark as New → Create migration, add to ontology
    • Reject → Ignore field, don't map
  4. Batch Operations

    • Select multiple similar fields
    • Apply same mapping to all
    • "Ignore all fields matching pattern"
  5. Learning Feedback

    • Each approval trains the agent
    • Next occurrence auto-maps with 100% confidence

8.2 Ontology Management

Location: frontend/src/pages/Admin/OntologyManager.jsx

Features:

  • View all canonical concepts
  • See synonym lists
  • Edit descriptions and validation rules
  • View usage statistics (how often mapped)
  • Add custom tenant-specific concepts

9. Schema Evolution & Governance

9.1 Three-Tier Schema Strategy

Tier 1: Core ChainAlign Schema (Rare changes)

  • Standard S&OP concepts: sales, core_entities, locations, forecasts
  • Requires migration + approval from product team
  • Benefits all customers

Tier 2: Ontology Synonyms (Frequent, automatic)

  • Customer-specific field names mapped to core concepts
  • No schema changes, just ontology updates
  • Most mappings fall into this tier

Tier 3: Tenant Custom Fields (Per-customer, JSONB)

  • Fields unique to one customer, not generalizable
  • Stored in custom_attributes JSONB
  • Queryable, but not part of core schema

9.2 Schema Change Approval Workflow

When new core concept suggested:

  1. Human reviews:

    • Is this valuable across multiple customers?
    • Does it fit existing tables or need new table?
    • What's the business justification?
  2. If approved:

    // Generate migration
    npx knex migrate:make add_carbon_footprint_field

    // Migration file:
    exports.up = function(knex) {
    return knex.schema.table('core_entities', (table) => {
    table.decimal('carbon_footprint_per_unit', 15, 4)
    .nullable()
    .comment('CO2 emissions per unit - sustainability tracking');
    });
    };
  3. Add to ontology:

    INSERT INTO ontology_concepts (
    concept_name, canonical_field, domain, description
    ) VALUES (
    'carbon_footprint_per_unit',
    'carbon_footprint_per_unit',
    'sustainability',
    'CO2 emissions per unit produced'
    );
  4. Backfill existing data:

    UPDATE core_entities
    SET carbon_footprint_per_unit = (custom_attributes->>'carbon_footprint')::decimal
    WHERE custom_attributes->>'carbon_footprint' IS NOT NULL;

9.3 Proactive Version Monitoring

ERP Version Registry:

// Weekly scheduled job
async function monitorERPVersions() {
// Check SAP release notes
const sapReleases = await fetchSAPReleaseNotes();

// Check Adler API
const adlerReleases = await fetchAdlerVersionInfo();

// Detect new versions
for (const release of [...sapReleases, ...adlerReleases]) {
const exists = await erpVersionRegistry.findByVersion(
release.system,
release.version
);

if (!exists) {
// New version detected
await erpVersionRegistry.create({
erp_system: release.system,
version: release.version,
release_date: release.date,
schema_changes: release.changes,
affected_tenant_ids: await findAffectedTenants(release.system)
});

// Notify admins
await notificationService.send({
type: 'erp_version_update',
message: `New ${release.system} version ${release.version} detected`,
action_required: 'Review schema changes and update ontology'
});
}
}
}

Anomaly Detection:

// Detect schema drift in customer data
async function detectSchemaDrift(tenantId, incomingData) {
const previousSchema = await getLastKnownSchema(tenantId);
const currentSchema = extractSchema(incomingData);

const drift = compareSchemas(previousSchema, currentSchema);

if (drift.newFields.length > 0 || drift.removedFields.length > 0) {
await notificationService.send({
type: 'schema_drift',
tenant_id: tenantId,
message: `Schema drift detected: ${drift.newFields.length} new fields, ${drift.removedFields.length} removed`,
action_required: 'Review mappings for affected fields'
});
}
}

10. Implementation Phases

Phase 1: Foundation (Weeks 1-4)

Deliverable: Universal ingestion infrastructure

Tasks:

  1. Set up gateway.chainalign.com with GCP API Gateway

    • Configure SSL certificates
    • Set up environment routing
    • Configure auth/rate limiting
  2. Build Ingestion Orchestrator (Node.js/Cloud Run)

    • REST/JSON parser
    • Protocol detection (Content-Type → parser)
    • Schema extraction logic
    • pgmq queue writer
  3. Database migrations:

    • ontology_concepts table
    • pending_field_mappings table
    • erp_version_registry table
    • ingestion_queue (pgmq)
  4. Monitoring:

    • Cloud Logging integration
    • Error alerting
    • Queue depth metrics

Success Criteria:

  • Any customer can POST JSON to gateway
  • Data lands in ingestion_queue with metadata
  • Gateway handles 1000 req/sec

Phase 2: Semantic Intelligence (Weeks 5-8)

Deliverable: LangGraph semantic mapper agent

Tasks:

  1. Python semantic mapper service (Cloud Run)

    • LangGraph workflow implementation
    • 6 nodes: extract, exact match, semantic search, LLM, validation, execute
    • Cyclic refinement logic
  2. Core ontology seeding:

    • Load standard S&OP concepts
    • Generate embeddings (Gemini)
    • Insert into ontology_concepts
  3. LLM integration:

    • Gemini API calls for reasoning
    • Prompt engineering for field mapping
    • Confidence scoring logic
  4. pgvector setup:

    • Enable extension
    • Create embedding index
    • Implement semantic search queries
  5. Background worker:

    • Reads from ingestion_queue
    • Processes via LangGraph
    • Writes results to PostgreSQL or validation queue

Success Criteria:

  • Known fields auto-map with 100% accuracy
  • Unknown fields route to validation queue
  • LLM provides clear reasoning for suggestions
  • Agent processes 100 fields/minute

Phase 3: Validation UI & CSV Unification (Weeks 9-10)

Deliverable: Unified governance workbench

Tasks:

  1. Extend CSV admin UI (React):

    • Show pending mappings from all sources
    • Add approve/reject/correct actions
    • Display LLM reasoning
    • Batch operation support
  2. Refactor CSV upload:

    • Use same ingestion_queue
    • Route through semantic mapper
    • Store CSV templates in ontology
  3. API endpoints:

    • GET /api/admin/pending-mappings
    • POST /api/admin/approve-mapping
    • POST /api/admin/reject-mapping
    • GET /api/admin/ontology
  4. Learning feedback loop:

    • Approvals update ontology immediately
    • Trigger re-processing of related pending fields

Success Criteria:

  • Admin can review 50 fields in 10 minutes
  • Approved mappings apply retroactively
  • CSV and API mappings managed in same UI

Phase 4: GraphQL & Advanced Protocols (Weeks 11-13)

Deliverable: Multi-protocol ingestion

Tasks:

  1. GraphQL schema definition:

    • Mutation API for data sync
    • Subscription API for real-time events
    • Input types with customFields support
  2. GraphQL server (extend Orchestrator):

    • Apollo Server integration
    • Schema validation
    • Subscription pub/sub (Redis/Pub/Sub)
  3. SOAP adapter:

    • WSDL parser
    • XML → JSON conversion
    • Add to Orchestrator
  4. VPN/SQL adapter:

    • SQL query executor
    • Read-only connection pool
    • Scheduled job runner
  5. Documentation:

    • GraphQL Playground
    • Code examples (JavaScript, Python, Java)
    • Authentication guide

Success Criteria:

  • Customers can write GraphQL mutations
  • Customers can subscribe to real-time events
  • SOAP integration tested with legacy ERP
  • SQL direct connection proven secure

Phase 5: Adler Pilot & Iteration (Weeks 14-16)

Deliverable: First production ERP integration

Tasks:

  1. Onboard Adler ERP:

    • Determine their preferred protocol
    • Set up credentials/VPN if needed
    • Configure gateway routing
  2. Initial data sync:

    • Run semantic mapper on sample data
    • Review all mappings with admin
    • Validate data quality
  3. Iterative tuning:

    • Refine LLM prompts based on Adler fields
    • Add Adler-specific synonyms to ontology
    • Optimize confidence thresholds
  4. Document learnings:

    • What protocol worked best?
    • Where did LLM struggle?
    • What UI improvements needed?
  5. Prepare for next customer:

    • Capture reusable patterns
    • Update onboarding checklist
    • Create runbook for similar ERPs

Success Criteria:

  • Adler data flows into ChainAlign
  • 95%+ fields auto-mapped
  • Customer satisfied with data quality
  • Architecture proven for next customer

11. Success Metrics

11.1 Technical KPIs

Ingestion Layer:

  • Uptime: 99.9%
  • Latency: p99 < 500ms (gateway → queue)
  • Throughput: 1000 requests/sec
  • Queue depth: < 1000 messages (normal load)

Semantic Mapper:

  • Auto-mapping rate: >80% (fields mapped without human review)
  • LLM accuracy: >90% (when human reviews, LLM suggestion was correct)
  • Processing time: <5 sec/field (including LLM calls)
  • Retry rate: <10% (fields requiring cyclic refinement)

Ontology Growth:

  • New synonyms/week: 20-50 (early), 5-10 (mature)
  • New core concepts/quarter: <5 (deliberate growth)
  • Tenant custom fields: <20% of total fields

11.2 Business KPIs

Customer Onboarding:

  • CSV → value: <2 days
  • API integration → value: <2 weeks
  • First sync → validated data: <1 week

Customer Satisfaction:

  • Manual mapping effort: <10% of fields
  • Data quality issues: <1% of records
  • Integration reliability: >99.5% uptime

Platform Differentiation:

  • Supported ERP systems: SAP, Oracle, Adler, Dynamics, +custom
  • Protocols supported: REST, GraphQL, SOAP, SQL, CSV
  • Customer self-service: 80% of new fields mapped by tenants (future)

12. Security & Compliance

12.1 Data Protection

Encryption:

  • TLS 1.3 for all external connections
  • Data at rest encrypted (GCP default)
  • API keys rotated every 90 days

Access Control:

  • Tenant isolation enforced at DB level
  • Admin UI requires special role
  • Audit log for all mapping approvals

PII Handling:

  • Customer data never logged in plaintext
  • Sample values truncated/masked in UI
  • LLM prompts exclude PII

12.2 Compliance

GDPR:

  • Customer controls their data (can delete anytime)
  • Data processing agreements signed
  • Right to export all mappings

SOC 2:

  • Change management for ontology updates
  • Access reviews quarterly
  • Incident response plan

13. Open Questions & Future Work

13.1 Phase 1 Scope

Decisions needed:

  • What's the LLM confidence threshold for auto-mapping? (Proposed: 95%)
  • Should we support batching of similar fields for efficiency?
  • How often to run proactive ERP version monitoring? (Proposed: weekly)

13.2 Future Enhancements

Advanced Features:

  • Multi-hop mappings: Map customer field → intermediate concept → canonical field
  • Temporal ontology: Track schema changes over time, support versioning
  • Collaborative filtering: "Customers like you mapped this field to X"
  • Auto-documentation: LLM generates mapping guides per customer

Scalability:

  • Federated semantic mapper: Multiple agents processing in parallel
  • Embedding caching: Pre-compute embeddings for common fields
  • Incremental learning: Fine-tune LLM on validated mappings

14. Appendix: Reference Architecture Diagrams

14.1 Complete System Architecture

┌───────────────────────────────────────────────────────────────────────┐
│ Customer ERP Landscape │
│ SAP S/4HANA │ Oracle EBS │ Adler │ Dynamics 365 │ Custom │
└─────┬─────────┴──────┬───────┴────┬────┴───────┬────────┴────┬───────┘
│ │ │ │ │
│ REST │ GraphQL │ SOAP │ SQL/VPN │ CSV
▼ ▼ ▼ ▼ ▼
┌───────────────────────────────────────────────────────────────────────┐
│ gateway.chainalign.com │
│ (GCP API Gateway) │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ Security: OAuth 2.0, Rate Limiting, DDoS Protection │ │
│ │ Routing: Environment (dev/staging/prod), Version (headers) │ │
│ └────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────┬─────────────────────────────────────────┘

┌───────────────────────────────────────────────────────────────────────┐
│ Ingestion Orchestrator (Node.js/Cloud Run) │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ Protocol Detection: │ │
│ │ • Content-Type: application/json → REST parser │ │
│ │ • Content-Type: application/graphql → GraphQL parser │ │
│ │ • Content-Type: application/soap+xml → SOAP parser │ │
│ │ │ │
│ │ Schema Extraction: │ │
│ │ • Field names, types, sample values │ │
│ │ • Data fingerprinting (patterns, formats) │ │
│ │ │ │
│ │ Queue Writing: │ │
│ │ • Format: {source, tenant_id, protocol, raw_data, schema} │ │
│ │ • Write to pgmq: ingestion_queue │ │
│ └────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────┬─────────────────────────────────────────┘

┌───────────────────────────────────────────────────────────────────────┐
│ pgmq: ingestion_queue │
│ [Message 1] [Message 2] [Message 3] ... [Message N] │
└─────────────────────────────┬─────────────────────────────────────────┘

┌───────────────────────────────────────────────────────────────────────┐
│ Semantic Mapper Agent (Python/LangGraph/Cloud Run) │
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ LangGraph Workflow │ │
│ │ │ │
│ │ [Node 1: Extract Schema] │ │
│ │ ↓ │ │
│ │ [Node 2: Exact Match Lookup] │ │
│ │ ↓ │ │
│ │ [Node 3: Semantic Search] ←─────┐ (Cycle: Retry with │ │
│ │ ↓ │ refined query) │ │
│ │ [Node 4: LLM Reasoning] ─────────┘ │ │
│ │ ↓ │ │
│ │ [Node 5: Human Validation] (if confidence < 95%) │ │
│ │ ↓ │ │
│ │ [Node 6: Execute Mapping] │ │
│ │ ↓ │ │
│ │ [END: Data mapped to PostgreSQL] │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
└───────────┬───────────────────────────────────┬───────────────────────┘
│ │
│ Known fields → │ Unknown fields →
▼ ▼
┌─────────────────────────┐ ┌──────────────────────────────────────┐
│ PostgreSQL │ │ pending_field_mappings │
│ (ChainAlign Schema) │ │ (Admin Review Queue) │
│ │ │ │
│ • core_entities │ │ • source_field │
│ • sales │ │ • suggested_concept │
│ • locations │ │ • llm_confidence │
│ • forecasts │ │ • llm_reasoning │
│ • inventory │ │ • status: pending │
└─────────────────────────┘ └──────────────┬───────────────────────┘


┌───────────────────────────────────────────────────────────────────────┐
│ Admin Validation UI (React) │
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ Pending Mappings Table: │ │
│ │ │ │
│ │ Source Field │ Suggested │ Confidence │ Reasoning │ Actions │ │
│ │ ─────────────────────────────────────────────────────────────── │ │
│ │ proj_stage_ │ billing_ │ 87% │ "Appears │ [Approve] │ │
│ │ bill │ milestone │ │ to be │ [Correct] │ │
│ │ │ │ │ project │ [Reject] │ │
│ │ │ │ │ billing" │ │ │
│ │ │ │
│ │ Actions: │ │
│ │ • Approve → Add synonym to ontology, execute mapping │ │
│ │ • Correct → Select different concept, execute mapping │ │
│ │ • Mark as new → Create migration, add to ontology │ │
│ │ • Reject → Ignore field │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────┬─────────────────────────────────────────┘
│ Approval updates ↓
┌───────────────────────────────────────────────────────────────────────┐
│ Core Ontology (PostgreSQL + pgvector) │
│ │
│ ontology_concepts: │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ concept_name: "billing_milestone" │ │
│ │ canonical_field: "billing_milestone" │ │
│ │ domain: "finance" │ │
│ │ synonyms: ["ProjectBilling", "StageBilling", "proj_stage_bill"]│ │
│ │ embedding: [0.123, -0.456, ...] (pgvector) │ │
│ │ usage_count: 15 │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ │
│ + 50 other core S&OP concepts │
└───────────────────────────────────────────────────────────────────────┘

15. Conclusion

ChainAlign's ERP Integration System transforms customer integration from a 6-12 month data engineering project into a days-to-weeks self-service onboarding experience. By combining:

  1. Multi-protocol flexibility (accept whatever customers offer)
  2. LLM-powered semantic intelligence (understand intent, not just syntax)
  3. Self-improving ontology (every validation makes the system smarter)
  4. Agentic automation (minimize human review over time)

ChainAlign becomes a strategic iPaaS partner that removes integration friction while maintaining data quality and governance. This architecture is ready for the Adler pilot and scalable to hundreds of diverse ERP integrations.


Status: Design Complete Next Step: Phase 1 Implementation (Weeks 1-4) Author: Pramod Prasanth Date: 2025-11-17