Functional Specification: ERP Integration System
Executive Summary
ChainAlign's ERP Integration System positions the platform as an intelligent iPaaS (Integration Platform as a Service) that normalizes disparate customer ERP systems to ChainAlign's canonical S&OP data model. Unlike traditional integration approaches that require customers to build data lakes or implement complex ETL pipelines, ChainAlign assumes the integration burden by providing:
- Multi-protocol ingestion (REST, GraphQL, SOAP, VPN/SQL) through a single stable gateway
- LLM-powered semantic mapping that learns customer-specific field mappings
- Self-improving ontology that evolves with each validated integration
- Agentic automation that minimizes human intervention over time
This architecture enables customers to integrate in days (via CSV) while evolving to real-time sync (via API) without rebuilding integrations.
1. Business Context
1.1 Problem Statement
Enterprise customers using ChainAlign for S&OP need to connect data from their existing ERP systems (SAP, Oracle, Microsoft Dynamics, Adler, etc.). Traditional approaches impose significant friction:
- Customer-side data lakes require 18-24 months and dedicated data engineering teams
- Fixed schema mappings break when ERPs are upgraded or customized
- One-size-fits-all APIs don't accommodate industry-specific fields
- Manual CSV exports don't scale for real-time decision-making
1.2 Strategic Differentiation
ChainAlign's approach:
"Upload CSV today, sync in real-time tomorrow"
- Start with CSV onboarding (2-day value delivery)
- Evolve to API integration (weeks, not months)
- Single semantic engine for all ingestion methods
- Customer-agnostic architecture (support whatever they offer)
- Self-service governance (tenants can map their own fields over time)
This positions ChainAlign as a SaaS-led iPaaS that removes integration as a barrier to S&OP transformation.
2. High-Level Architecture
2.1 System Components
┌─────────────────────────────────────────────────────────────────┐
│ Customer ERP Systems │
│ (SAP, Oracle, Adler, Dynamics, Custom) │
└─────────────────┬───────────────────────────────────────────────┘
│ REST / GraphQL / SOAP / VPN / CSV
▼
┌─────────────────────────────────────────────────────────────────┐
│ gateway.chainalign.com │
│ (GCP API Gateway - Single Stable Endpoint) │
│ - Environment routing (dev/staging/prod) │
│ - Version routing (Stripe-style headers) │
│ - Auth, rate limiting, DDoS protection │
└─────────────────┬───────────────────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────────────┐
│ Ingestion Orchestrator (Node.js/Cloud Run) │
│ - Protocol detection (Content-Type parsing) │
│ - Payload normalization (REST/GraphQL/SOAP → JSON) │
│ - Schema extraction (field names, types, samples) │
│ - Queue writing (pgmq: ingestion_queue) │
└─────────────────┬───────────────────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────────────┐
│ pgmq: ingestion_queue │
│ Format: {source, tenant_id, protocol, raw_data, schema_hints} │
└─────────────────┬───────────────────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────────────┐
│ Semantic Mapper Agent (Python/LangGraph/Cloud Run) │
│ │
│ Tiered Intelligence Workflow: │
│ 1. Exact match → ontology_concepts (PostgreSQL) │
│ 2. Semantic search → pgvector embeddings │
│ 3. LLM reasoning → Gemini confidence scoring │
│ 4. Human validation → pending_field_mappings queue │
│ 5. Execute mapping → update DB / create migration │
│ │
│ LangGraph enables cyclic refinement (retry with new strategy) │
└─────────────────┬───────────────────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────────────┐
│ Knowledge & Governance Layer │
│ │
│ • Core Ontology (PostgreSQL + pgvector) │
│ - Canonical S&OP concepts (sales, inventory, forecast) │
│ - Synonyms learned from validations │
│ - Embeddings for semantic similarity │
│ │
│ • Admin Validation UI (React - Extended CSV UI) │
│ - Review pending mappings across all sources │
│ - Approve/reject/correct/mark-as-new-concept │
│ - Approvals train the agent, update ontology │
│ │
│ • ERP Version Registry │
│ - Track known ERP versions and schema changes │
│ - Proactive monitoring of vendor releases │
│ - Anomaly detection for schema drift │
└─────────────────────────────────────────────────────────────────┘
2.2 Data Flow
End-to-End Request Flow:
Customer ERP
→ gateway.chainalign.com
→ GCP API Gateway (auth, routing)
→ Ingestion Orchestrator (protocol parsing)
→ pgmq ingestion_queue
→ Semantic Mapper Agent (LangGraph)
├─ Exact match? → Auto-map to PostgreSQL
├─ Semantic match? → Auto-map with confidence
├─ LLM scoring? → Auto-map if >95% confidence
└─ Low confidence? → pending_field_mappings → Human validation
→ PostgreSQL (core_entities, sales, locations, etc.)
3. Protocol Support
3.1 REST APIs (Pull or Push)
Customer exposes endpoints:
- ChainAlign polls on schedule (hourly/daily) OR
- Customer POSTs to
gateway.chainalign.com/ingest/rest
Orchestrator handling:
- Parse JSON payload
- Extract field names, types, sample values
- Write to
ingestion_queuewithprotocol: 'REST'
Example:
POST /ingest/rest HTTP/1.1
Host: gateway.chainalign.com
Content-Type: application/json
ChainAlign-Version: 2025-11-17
Authorization: Bearer <tenant_token>
{
"sales_orders": [
{
"order_id": "SO-12345",
"product_sku": "WIDGET-001",
"quantity": 100,
"order_date": "2025-11-15"
}
]
}
3.2 GraphQL (Write via Mutations)
Customer writes mutations to ChainAlign:
Example mutation:
mutation SyncInventory($items: [InventoryItemInput!]!) {
syncInventory(items: $items) {
status
itemsProcessed
errors {
item
message
}
}
}
Orchestrator handling:
- Validate against published schema
- Extract structured data
- Write to
ingestion_queuewithprotocol: 'GraphQL'
3.3 GraphQL (Real-time via Subscriptions)
ChainAlign publishes events, customer pushes data in response:
Customer subscribes:
subscription OnDataRequested {
dataRequested {
requestId
fields
dateRange
}
}
When ChainAlign needs fresh data:
- Publishes
dataRequestedevent - Customer responds with mutation containing requested data
3.4 SOAP/Legacy
Customer sends SOAP XML:
Example:
POST /ingest/soap HTTP/1.1
Host: gateway.chainalign.com
Content-Type: application/soap+xml
<soap:Envelope>
<soap:Body>
<SyncSalesOrders>
<Order>
<OrderID>12345</OrderID>
<ProductSKU>WIDGET-001</ProductSKU>
</Order>
</SyncSalesOrders>
</soap:Body>
</soap:Envelope>
Orchestrator handling:
- Parse SOAP XML using WSDL definitions
- Convert to canonical JSON structure
- Write to
ingestion_queuewithprotocol: 'SOAP'
3.5 VPN/Direct Database Connection
Secure tunnel to customer's SQL Server:
- ChainAlign runs scheduled jobs
- Queries tables directly via read-only credentials
- Treats query results like REST responses
- Write to
ingestion_queuewithprotocol: 'SQL'
3.6 CSV Upload (Existing Flow)
Unified with ERP integration:
- CSV upload UI writes to same
ingestion_queue - Same Semantic Mapper Agent processes all sources
- CSV templates stored in ontology as learned mappings
4. Semantic Mapper Agent (LangGraph)
4.1 Agent Workflow
The Python agent processes each queued ingestion job through a cyclic decision graph using LangGraph:
# Conceptual LangGraph structure
from langgraph.graph import StateGraph, END
workflow = StateGraph()
# Nodes
workflow.add_node("extract_schema", extract_schema_node)
workflow.add_node("exact_match", exact_match_lookup)
workflow.add_node("semantic_search", semantic_search_node)
workflow.add_node("llm_reasoning", llm_reasoning_node)
workflow.add_node("human_validation", human_validation_queue)
workflow.add_node("execute_mapping", execute_mapping_node)
# Edges (with conditional routing)
workflow.set_entry_point("extract_schema")
workflow.add_edge("extract_schema", "exact_match")
workflow.add_conditional_edges(
"exact_match",
should_use_semantic_search,
{
"auto_map": "execute_mapping",
"semantic_search": "semantic_search"
}
)
workflow.add_conditional_edges(
"semantic_search",
should_use_llm,
{
"high_confidence": "execute_mapping",
"llm_reasoning": "llm_reasoning"
}
)
workflow.add_conditional_edges(
"llm_reasoning",
check_llm_confidence,
{
"auto_map": "execute_mapping",
"human_review": "human_validation",
"retry": "semantic_search" # Cycle back with refined query
}
)
workflow.add_edge("human_validation", END) # Waits for human approval
workflow.add_edge("execute_mapping", END)
4.2 Node Details
Node 1: Schema Extraction
Purpose: Parse raw ingestion data to understand structure
Actions:
- Extract field names from JSON/XML/CSV
- Infer data types (string, numeric, date, boolean)
- Collect sample values (first 5-10 records)
- Generate field fingerprints (e.g., "10-15 digits, prefix 'INV-'" → invoice_number)
Output:
{
"fields": [
{
"name": "proj_stage_bill",
"type": "numeric",
"samples": [1250.00, 3400.50, 890.25],
"fingerprint": "decimal_currency"
}
]
}
Node 2: Exact Match Lookup
Purpose: Fast lookup for known field names
Actions:
- Query
ontology_conceptstable by exact name - Check
synonymsarray for known aliases - If match found → confidence = 100% → Route to execute_mapping
SQL:
SELECT concept_id, canonical_field
FROM ontology_concepts
WHERE tenant_id = $1
AND (concept_name = $2 OR $2 = ANY(synonyms))
Node 3: Semantic Search (Fallback)
Purpose: Find similar concepts when exact match fails
Actions:
- Generate embedding for unknown field (name + sample context)
- Query pgvector for nearest neighbors in ontology
- Calculate cosine similarity
- If similarity > 0.85 → High-confidence synonym → execute_mapping
- If 0.60-0.85 → Pass to LLM reasoning
- If < 0.60 → Low confidence → LLM reasoning
SQL:
SELECT concept_id, canonical_field,
embedding <=> $1 AS distance
FROM ontology_concepts
WHERE tenant_id = $2
ORDER BY embedding <=> $1
LIMIT 3
Node 4: LLM Reasoning
Purpose: Use Gemini to determine semantic equivalence
Prompt:
You are a data integration specialist for S&OP systems.
Context:
- Customer field: "{field_name}"
- Data type: {type}
- Sample values: {samples}
- Customer industry: {tenant_industry}
Top 3 ontology matches:
1. {match1_name} - {match1_description} (similarity: 0.72)
2. {match2_name} - {match2_description} (similarity: 0.68)
3. {match3_name} - {match3_description} (similarity: 0.65)
Question: Is this customer field semantically equivalent to one of these concepts?
Respond in JSON:
{
"is_match": true/false,
"matched_concept_id": "uuid or null",
"confidence": 0-100,
"reasoning": "Explain your decision"
}
If no match, suggest if this is:
- A synonym that should be added
- A truly new concept
- Junk data to ignore
Decision Logic:
- Confidence > 95% → Auto-map, log reasoning → execute_mapping
- Confidence 60-95% → Human validation queue
- Confidence < 60% → Retry semantic search with refined query (cycle up to 2x)
Refinement strategies for retry:
- Decompose compound field names (e.g., "proj_stage_bill" → "project", "stage", "billing")
- Use broader domain terms (e.g., "financial" instead of "billing")
- Query industry-specific aliases
Node 5: Human Validation Queue
Purpose: Route ambiguous mappings to admin review
Actions:
- Write to
pending_field_mappingstable:
INSERT INTO pending_field_mappings (
tenant_id, source_field, suggested_concept_id,
llm_confidence, llm_reasoning, sample_data, status
) VALUES ($1, $2, $3, $4, $5, $6, 'pending')
- Trigger notification (optional - email/Slack)
- Wait for human decision (workflow exits, resumes on approval)
Human options in UI:
- Approve → Update ontology, execute mapping
- Correct → Select different concept, execute mapping
- Mark as new concept → Create migration, add to ontology
- Reject as junk → Ignore field, don't map
Node 6: Execute Mapping
Purpose: Apply validated mapping to data
Actions:
Case 1: Synonym (most common)
-- Add to ontology
UPDATE ontology_concepts
SET synonyms = array_append(synonyms, $1)
WHERE concept_id = $2;
-- Move data from JSONB staging to proper column
UPDATE core_entities
SET billing_milestone = (custom_attributes->>'proj_stage_bill')::numeric
WHERE tenant_id = $1 AND custom_attributes->>'proj_stage_bill' IS NOT NULL;
-- Clean up staging
UPDATE core_entities
SET custom_attributes = custom_attributes - 'proj_stage_bill'
WHERE tenant_id = $1;
Case 2: New core concept (rare)
// Generate Knex migration
const migration = `
exports.up = function(knex) {
return knex.schema.table('core_entities', (table) => {
table.decimal('carbon_footprint_per_unit', 15, 4);
});
};
`;
// Add to ontology
await ontologyRepository.create({
concept_name: 'carbon_footprint_per_unit',
canonical_field: 'carbon_footprint_per_unit',
domain: 'sustainability',
description: 'CO2 emissions per unit produced'
});
Case 3: Tenant-specific field
-- Stays in JSONB, create tenant-specific index
CREATE INDEX idx_custom_attrs_tenant_field
ON core_entities USING GIN ((custom_attributes->'tenant_specific_field'))
WHERE tenant_id = $1;
4.3 Cyclic Refinement (LangGraph Key Feature)
LangGraph allows the agent to loop back and retry with refined strategies:
Example cycle:
- Field: "proj_stage_bill" fails semantic search (similarity 0.55)
- LLM reasoning: "Possibly related to billing or project stages" (confidence 58%)
- Cycle back to semantic search with decomposed terms: "project", "billing"
- New semantic search finds "billing_milestone" (similarity 0.88)
- Auto-map with high confidence
Max cycles: 2 (prevents infinite loops)
5. Core Ontology Structure
5.1 Database Schema
-- Core ontology storage
CREATE TABLE ontology_concepts (
concept_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID, -- NULL for global concepts
-- Concept definition
concept_name VARCHAR(100) NOT NULL,
canonical_field VARCHAR(100) NOT NULL, -- DB column name
domain VARCHAR(50), -- 'sales', 'inventory', 'finance', 'production'
description TEXT,
-- Mappings
synonyms TEXT[], -- Known aliases
embedding VECTOR(768), -- Gemini embedding for semantic search
-- Metadata
data_type VARCHAR(20), -- 'string', 'numeric', 'date', 'boolean'
is_required BOOLEAN DEFAULT false,
validation_rules JSONB, -- {min, max, regex, enum_values}
-- Evolution tracking
usage_count INTEGER DEFAULT 0,
last_mapped_at TIMESTAMP,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(),
UNIQUE(tenant_id, concept_name)
);
CREATE INDEX idx_ontology_tenant ON ontology_concepts(tenant_id);
CREATE INDEX idx_ontology_domain ON ontology_concepts(domain);
CREATE INDEX idx_ontology_embedding ON ontology_concepts USING ivfflat (embedding vector_cosine_ops);
-- Pending mappings awaiting human review
CREATE TABLE pending_field_mappings (
mapping_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL,
-- Source field info
source_field VARCHAR(200) NOT NULL,
source_type VARCHAR(20),
sample_data JSONB,
-- Suggested mapping
suggested_concept_id UUID,
llm_confidence INTEGER, -- 0-100
llm_reasoning TEXT,
-- Review tracking
status VARCHAR(20) DEFAULT 'pending', -- 'pending', 'approved', 'rejected'
reviewed_by UUID,
reviewed_at TIMESTAMP,
final_concept_id UUID, -- May differ from suggested
created_at TIMESTAMP DEFAULT NOW(),
FOREIGN KEY(tenant_id) REFERENCES tenants(tenant_id),
FOREIGN KEY(suggested_concept_id) REFERENCES ontology_concepts(concept_id),
FOREIGN KEY(final_concept_id) REFERENCES ontology_concepts(concept_id)
);
-- ERP version tracking for proactive monitoring
CREATE TABLE erp_version_registry (
registry_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
erp_system VARCHAR(50) NOT NULL, -- 'SAP_S4HANA', 'ORACLE_EBS', 'ADLER'
version VARCHAR(50) NOT NULL,
release_date DATE,
-- Schema changes in this version
schema_changes JSONB, -- [{type: 'added', field: 'new_field', description}]
-- Affected tenants
affected_tenant_ids UUID[],
-- Monitoring
detected_at TIMESTAMP DEFAULT NOW(),
review_status VARCHAR(20) DEFAULT 'pending',
UNIQUE(erp_system, version)
);
5.2 Initial Ontology Seeding
ChainAlign Standard S&OP Concepts:
const CORE_ONTOLOGY = [
// Product domain
{
concept_name: 'product_sku',
canonical_field: 'sku',
domain: 'product',
synonyms: ['SKU', 'Item', 'ItemCode', 'ProductCode', 'Material'],
data_type: 'string',
is_required: true
},
{
concept_name: 'unit_cost',
canonical_field: 'unit_cost',
domain: 'product',
synonyms: ['COGS', 'CostPerUnit', 'StandardCost', 'MaterialCost'],
data_type: 'numeric'
},
// Sales domain
{
concept_name: 'quantity_sold',
canonical_field: 'quantity_sold',
domain: 'sales',
synonyms: ['SalesQty', 'UnitsSold', 'Volume', 'Quantity'],
data_type: 'numeric',
is_required: true
},
{
concept_name: 'revenue',
canonical_field: 'revenue',
domain: 'sales',
synonyms: ['Sales', 'SalesAmount', 'Income', 'TotalSales'],
data_type: 'numeric'
},
// Inventory domain
{
concept_name: 'inventory_level',
canonical_field: 'quantity_on_hand',
domain: 'inventory',
synonyms: ['OnHand', 'Stock', 'Available', 'QtyAvailable'],
data_type: 'numeric'
},
// Location domain
{
concept_name: 'location_code',
canonical_field: 'location_code',
domain: 'location',
synonyms: ['SiteCode', 'PlantCode', 'WarehouseID', 'DC'],
data_type: 'string'
},
// Financial domain
{
concept_name: 'billing_milestone',
canonical_field: 'billing_milestone',
domain: 'finance',
synonyms: ['ProjectBilling', 'StageBilling', 'MilestonePayment'],
data_type: 'numeric'
}
];
6. Gateway Configuration
6.1 GCP API Gateway Setup
Stable endpoint: gateway.chainalign.com
Routing logic:
# openapi.yaml for GCP API Gateway
swagger: '2.0'
info:
title: ChainAlign Integration Gateway
version: 1.0.0
host: gateway.chainalign.com
schemes:
- https
x-google-backend:
address: https://ingestion-orchestrator-<hash>-uc.a.run.app
protocol: h2
paths:
/ingest/rest:
post:
operationId: ingestREST
summary: Ingest REST data
security:
- api_key: []
parameters:
- in: header
name: ChainAlign-Version
type: string
required: false
description: API version (Stripe-style dated versioning)
x-google-backend:
address: https://ingestion-orchestrator-<hash>-uc.a.run.app/ingest/rest
/graphql:
post:
operationId: graphql
summary: GraphQL endpoint
security:
- api_key: []
x-google-backend:
address: https://ingestion-orchestrator-<hash>-uc.a.run.app/graphql
/ingest/soap:
post:
operationId: ingestSOAP
summary: Ingest SOAP data
security:
- api_key: []
x-google-backend:
address: https://ingestion-orchestrator-<hash>-uc.a.run.app/ingest/soap
securityDefinitions:
api_key:
type: apiKey
name: Authorization
in: header
Environment routing:
gateway.chainalign.com→ Productiongateway-staging.chainalign.com→ Staginggateway-dev.chainalign.com→ Development
Version routing (Stripe-style):
- Client sends header:
ChainAlign-Version: 2025-11-17 - Orchestrator reads header, applies appropriate mapping logic
- Internal refactoring invisible to customers
6.2 Security & Auth
Authentication:
- API Gateway enforces OAuth 2.0 / API key validation
- Each tenant gets unique API credentials
- Rate limiting per tenant (configurable)
Authorization:
- All ingested data scoped by
tenant_id - Mapped data only accessible within tenant boundary
- Admin UI respects tenant isolation
Encryption:
- TLS 1.3 for data in transit
- API Gateway terminates SSL
- Backend connections over private VPC
7. Customer-Facing APIs
7.1 GraphQL Mutation API (Ingest)
Published schema for customers:
type Mutation {
syncSalesOrders(orders: [SalesOrderInput!]!): SyncResponse!
syncInventory(items: [InventoryItemInput!]!): SyncResponse!
syncProducts(products: [ProductInput!]!): SyncResponse!
syncForecasts(forecasts: [ForecastInput!]!): SyncResponse!
}
input SalesOrderInput {
orderNumber: String!
productSKU: String!
quantity: Float!
revenue: Float
orderDate: DateTime!
locationCode: String
# Additional fields go to custom_attributes
customFields: JSON
}
input InventoryItemInput {
productSKU: String!
locationCode: String!
quantityOnHand: Float!
quantityReserved: Float
lastUpdated: DateTime!
customFields: JSON
}
type SyncResponse {
status: SyncStatus!
itemsProcessed: Int!
itemsFailed: Int!
errors: [SyncError!]
queueId: ID! # Reference for tracking async processing
}
enum SyncStatus {
QUEUED
PROCESSING
COMPLETED
FAILED
}
type SyncError {
item: String!
field: String
message: String!
}
Example customer call:
mutation {
syncSalesOrders(orders: [
{
orderNumber: "SO-12345"
productSKU: "WIDGET-001"
quantity: 100
revenue: 5000.00
orderDate: "2025-11-15T10:30:00Z"
locationCode: "DC-EAST"
customFields: {
proj_stage_bill: 1250.00 # Unknown field → semantic mapper
}
}
]) {
status
itemsProcessed
queueId
}
}
7.2 GraphQL Subscription API (Real-time Events)
Published schema:
type Subscription {
dataRequested: DataRequest!
syncStatusUpdate(queueId: ID!): SyncStatusUpdate!
}
type DataRequest {
requestId: ID!
dataType: DataType!
fields: [String!]!
filters: JSON
dateRange: DateRange
}
enum DataType {
SALES_ORDERS
INVENTORY
FORECASTS
PRODUCTS
}
type DateRange {
start: DateTime!
end: DateTime!
}
type SyncStatusUpdate {
queueId: ID!
status: SyncStatus!
progress: Float! # 0-100
itemsProcessed: Int!
errors: [SyncError!]
}
Example customer subscription:
subscription {
dataRequested {
requestId
dataType
fields
dateRange {
start
end
}
}
}
# When ChainAlign needs fresh inventory data:
# Customer receives:
{
"requestId": "req-789",
"dataType": "INVENTORY",
"fields": ["quantityOnHand", "locationCode"],
"dateRange": {
"start": "2025-11-01T00:00:00Z",
"end": "2025-11-17T23:59:59Z"
}
}
# Customer responds with mutation:
mutation {
syncInventory(items: [...]) {
status
queueId
}
}
8. Admin Validation UI
8.1 Extended CSV Admin UI
Location: frontend/src/pages/Admin/FieldMappingQueue.jsx
Features:
-
Unified View Across All Sources
- CSV uploads
- REST API ingestions
- GraphQL mutations
- SOAP integrations
- SQL syncs
-
Pending Mappings Table
<DataTable>
<Column>Source Field</Column>
<Column>Data Type</Column>
<Column>Sample Values</Column>
<Column>Suggested Concept</Column>
<Column>Confidence</Column>
<Column>LLM Reasoning</Column>
<Column>Actions</Column>
</DataTable> -
Review Actions
- Approve → Add synonym to ontology
- Correct → Select different concept from dropdown
- Mark as New → Create migration, add to ontology
- Reject → Ignore field, don't map
-
Batch Operations
- Select multiple similar fields
- Apply same mapping to all
- "Ignore all fields matching pattern"
-
Learning Feedback
- Each approval trains the agent
- Next occurrence auto-maps with 100% confidence
8.2 Ontology Management
Location: frontend/src/pages/Admin/OntologyManager.jsx
Features:
- View all canonical concepts
- See synonym lists
- Edit descriptions and validation rules
- View usage statistics (how often mapped)
- Add custom tenant-specific concepts
9. Schema Evolution & Governance
9.1 Three-Tier Schema Strategy
Tier 1: Core ChainAlign Schema (Rare changes)
- Standard S&OP concepts:
sales,core_entities,locations,forecasts - Requires migration + approval from product team
- Benefits all customers
Tier 2: Ontology Synonyms (Frequent, automatic)
- Customer-specific field names mapped to core concepts
- No schema changes, just ontology updates
- Most mappings fall into this tier
Tier 3: Tenant Custom Fields (Per-customer, JSONB)
- Fields unique to one customer, not generalizable
- Stored in
custom_attributesJSONB - Queryable, but not part of core schema
9.2 Schema Change Approval Workflow
When new core concept suggested:
-
Human reviews:
- Is this valuable across multiple customers?
- Does it fit existing tables or need new table?
- What's the business justification?
-
If approved:
// Generate migration
npx knex migrate:make add_carbon_footprint_field
// Migration file:
exports.up = function(knex) {
return knex.schema.table('core_entities', (table) => {
table.decimal('carbon_footprint_per_unit', 15, 4)
.nullable()
.comment('CO2 emissions per unit - sustainability tracking');
});
}; -
Add to ontology:
INSERT INTO ontology_concepts (
concept_name, canonical_field, domain, description
) VALUES (
'carbon_footprint_per_unit',
'carbon_footprint_per_unit',
'sustainability',
'CO2 emissions per unit produced'
); -
Backfill existing data:
UPDATE core_entities
SET carbon_footprint_per_unit = (custom_attributes->>'carbon_footprint')::decimal
WHERE custom_attributes->>'carbon_footprint' IS NOT NULL;
9.3 Proactive Version Monitoring
ERP Version Registry:
// Weekly scheduled job
async function monitorERPVersions() {
// Check SAP release notes
const sapReleases = await fetchSAPReleaseNotes();
// Check Adler API
const adlerReleases = await fetchAdlerVersionInfo();
// Detect new versions
for (const release of [...sapReleases, ...adlerReleases]) {
const exists = await erpVersionRegistry.findByVersion(
release.system,
release.version
);
if (!exists) {
// New version detected
await erpVersionRegistry.create({
erp_system: release.system,
version: release.version,
release_date: release.date,
schema_changes: release.changes,
affected_tenant_ids: await findAffectedTenants(release.system)
});
// Notify admins
await notificationService.send({
type: 'erp_version_update',
message: `New ${release.system} version ${release.version} detected`,
action_required: 'Review schema changes and update ontology'
});
}
}
}
Anomaly Detection:
// Detect schema drift in customer data
async function detectSchemaDrift(tenantId, incomingData) {
const previousSchema = await getLastKnownSchema(tenantId);
const currentSchema = extractSchema(incomingData);
const drift = compareSchemas(previousSchema, currentSchema);
if (drift.newFields.length > 0 || drift.removedFields.length > 0) {
await notificationService.send({
type: 'schema_drift',
tenant_id: tenantId,
message: `Schema drift detected: ${drift.newFields.length} new fields, ${drift.removedFields.length} removed`,
action_required: 'Review mappings for affected fields'
});
}
}
10. Implementation Phases
Phase 1: Foundation (Weeks 1-4)
Deliverable: Universal ingestion infrastructure
Tasks:
-
Set up
gateway.chainalign.comwith GCP API Gateway- Configure SSL certificates
- Set up environment routing
- Configure auth/rate limiting
-
Build Ingestion Orchestrator (Node.js/Cloud Run)
- REST/JSON parser
- Protocol detection (Content-Type → parser)
- Schema extraction logic
- pgmq queue writer
-
Database migrations:
ontology_conceptstablepending_field_mappingstableerp_version_registrytableingestion_queue(pgmq)
-
Monitoring:
- Cloud Logging integration
- Error alerting
- Queue depth metrics
Success Criteria:
- Any customer can POST JSON to gateway
- Data lands in
ingestion_queuewith metadata - Gateway handles 1000 req/sec
Phase 2: Semantic Intelligence (Weeks 5-8)
Deliverable: LangGraph semantic mapper agent
Tasks:
-
Python semantic mapper service (Cloud Run)
- LangGraph workflow implementation
- 6 nodes: extract, exact match, semantic search, LLM, validation, execute
- Cyclic refinement logic
-
Core ontology seeding:
- Load standard S&OP concepts
- Generate embeddings (Gemini)
- Insert into
ontology_concepts
-
LLM integration:
- Gemini API calls for reasoning
- Prompt engineering for field mapping
- Confidence scoring logic
-
pgvector setup:
- Enable extension
- Create embedding index
- Implement semantic search queries
-
Background worker:
- Reads from
ingestion_queue - Processes via LangGraph
- Writes results to PostgreSQL or validation queue
- Reads from
Success Criteria:
- Known fields auto-map with 100% accuracy
- Unknown fields route to validation queue
- LLM provides clear reasoning for suggestions
- Agent processes 100 fields/minute
Phase 3: Validation UI & CSV Unification (Weeks 9-10)
Deliverable: Unified governance workbench
Tasks:
-
Extend CSV admin UI (React):
- Show pending mappings from all sources
- Add approve/reject/correct actions
- Display LLM reasoning
- Batch operation support
-
Refactor CSV upload:
- Use same
ingestion_queue - Route through semantic mapper
- Store CSV templates in ontology
- Use same
-
API endpoints:
GET /api/admin/pending-mappingsPOST /api/admin/approve-mappingPOST /api/admin/reject-mappingGET /api/admin/ontology
-
Learning feedback loop:
- Approvals update ontology immediately
- Trigger re-processing of related pending fields
Success Criteria:
- Admin can review 50 fields in 10 minutes
- Approved mappings apply retroactively
- CSV and API mappings managed in same UI
Phase 4: GraphQL & Advanced Protocols (Weeks 11-13)
Deliverable: Multi-protocol ingestion
Tasks:
-
GraphQL schema definition:
- Mutation API for data sync
- Subscription API for real-time events
- Input types with
customFieldssupport
-
GraphQL server (extend Orchestrator):
- Apollo Server integration
- Schema validation
- Subscription pub/sub (Redis/Pub/Sub)
-
SOAP adapter:
- WSDL parser
- XML → JSON conversion
- Add to Orchestrator
-
VPN/SQL adapter:
- SQL query executor
- Read-only connection pool
- Scheduled job runner
-
Documentation:
- GraphQL Playground
- Code examples (JavaScript, Python, Java)
- Authentication guide
Success Criteria:
- Customers can write GraphQL mutations
- Customers can subscribe to real-time events
- SOAP integration tested with legacy ERP
- SQL direct connection proven secure
Phase 5: Adler Pilot & Iteration (Weeks 14-16)
Deliverable: First production ERP integration
Tasks:
-
Onboard Adler ERP:
- Determine their preferred protocol
- Set up credentials/VPN if needed
- Configure gateway routing
-
Initial data sync:
- Run semantic mapper on sample data
- Review all mappings with admin
- Validate data quality
-
Iterative tuning:
- Refine LLM prompts based on Adler fields
- Add Adler-specific synonyms to ontology
- Optimize confidence thresholds
-
Document learnings:
- What protocol worked best?
- Where did LLM struggle?
- What UI improvements needed?
-
Prepare for next customer:
- Capture reusable patterns
- Update onboarding checklist
- Create runbook for similar ERPs
Success Criteria:
- Adler data flows into ChainAlign
- 95%+ fields auto-mapped
- Customer satisfied with data quality
- Architecture proven for next customer
11. Success Metrics
11.1 Technical KPIs
Ingestion Layer:
- Uptime: 99.9%
- Latency: p99 < 500ms (gateway → queue)
- Throughput: 1000 requests/sec
- Queue depth: < 1000 messages (normal load)
Semantic Mapper:
- Auto-mapping rate: >80% (fields mapped without human review)
- LLM accuracy: >90% (when human reviews, LLM suggestion was correct)
- Processing time: <5 sec/field (including LLM calls)
- Retry rate: <10% (fields requiring cyclic refinement)
Ontology Growth:
- New synonyms/week: 20-50 (early), 5-10 (mature)
- New core concepts/quarter: <5 (deliberate growth)
- Tenant custom fields: <20% of total fields
11.2 Business KPIs
Customer Onboarding:
- CSV → value: <2 days
- API integration → value: <2 weeks
- First sync → validated data: <1 week
Customer Satisfaction:
- Manual mapping effort: <10% of fields
- Data quality issues: <1% of records
- Integration reliability: >99.5% uptime
Platform Differentiation:
- Supported ERP systems: SAP, Oracle, Adler, Dynamics, +custom
- Protocols supported: REST, GraphQL, SOAP, SQL, CSV
- Customer self-service: 80% of new fields mapped by tenants (future)
12. Security & Compliance
12.1 Data Protection
Encryption:
- TLS 1.3 for all external connections
- Data at rest encrypted (GCP default)
- API keys rotated every 90 days
Access Control:
- Tenant isolation enforced at DB level
- Admin UI requires special role
- Audit log for all mapping approvals
PII Handling:
- Customer data never logged in plaintext
- Sample values truncated/masked in UI
- LLM prompts exclude PII
12.2 Compliance
GDPR:
- Customer controls their data (can delete anytime)
- Data processing agreements signed
- Right to export all mappings
SOC 2:
- Change management for ontology updates
- Access reviews quarterly
- Incident response plan
13. Open Questions & Future Work
13.1 Phase 1 Scope
Decisions needed:
- What's the LLM confidence threshold for auto-mapping? (Proposed: 95%)
- Should we support batching of similar fields for efficiency?
- How often to run proactive ERP version monitoring? (Proposed: weekly)
13.2 Future Enhancements
Advanced Features:
- Multi-hop mappings: Map customer field → intermediate concept → canonical field
- Temporal ontology: Track schema changes over time, support versioning
- Collaborative filtering: "Customers like you mapped this field to X"
- Auto-documentation: LLM generates mapping guides per customer
Scalability:
- Federated semantic mapper: Multiple agents processing in parallel
- Embedding caching: Pre-compute embeddings for common fields
- Incremental learning: Fine-tune LLM on validated mappings
14. Appendix: Reference Architecture Diagrams
14.1 Complete System Architecture
┌───────────────────────────────────────────────────────────────────────┐
│ Customer ERP Landscape │
│ SAP S/4HANA │ Oracle EBS │ Adler │ Dynamics 365 │ Custom │
└─────┬─────────┴──────┬───────┴────┬────┴───────┬────────┴────┬───────┘
│ │ │ │ │
│ REST │ GraphQL │ SOAP │ SQL/VPN │ CSV
▼ ▼ ▼ ▼ ▼
┌───────────────────────────────────────────────────────────────────────┐
│ gateway.chainalign.com │
│ (GCP API Gateway) │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ Security: OAuth 2.0, Rate Limiting, DDoS Protection │ │
│ │ Routing: Environment (dev/staging/prod), Version (headers) │ │
│ └────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────┬─────────────────────────────────────────┘
▼
┌───────────────────────────────────────────────────────────────────────┐
│ Ingestion Orchestrator (Node.js/Cloud Run) │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ Protocol Detection: │ │
│ │ • Content-Type: application/json → REST parser │ │
│ │ • Content-Type: application/graphql → GraphQL parser │ │
│ │ • Content-Type: application/soap+xml → SOAP parser │ │
│ │ │ │
│ │ Schema Extraction: │ │
│ │ • Field names, types, sample values │ │
│ │ • Data fingerprinting (patterns, formats) │ │
│ │ │ │
│ │ Queue Writing: │ │
│ │ • Format: {source, tenant_id, protocol, raw_data, schema} │ │
│ │ • Write to pgmq: ingestion_queue │ │
│ └────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────┬─────────────────────────────────────────┘
▼
┌───────────────────────────────────────────────────────────────────────┐
│ pgmq: ingestion_queue │
│ [Message 1] [Message 2] [Message 3] ... [Message N] │
└─────────────────────────────┬─────────────────────────────────────────┘
▼
┌───────────────────────────────────────────────────────────────────────┐
│ Semantic Mapper Agent (Python/LangGraph/Cloud Run) │
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ LangGraph Workflow │ │
│ │ │ │
│ │ [Node 1: Extract Schema] │ │
│ │ ↓ │ │
│ │ [Node 2: Exact Match Lookup] │ │
│ │ ↓ │ │
│ │ [Node 3: Semantic Search] ←─────┐ (Cycle: Retry with │ │
│ │ ↓ │ refined query) │ │
│ │ [Node 4: LLM Reasoning] ─────────┘ │ │
│ │ ↓ │ │
│ │ [Node 5: Human Validation] (if confidence < 95%) │ │
│ │ ↓ │ │
│ │ [Node 6: Execute Mapping] │ │
│ │ ↓ │ │
│ │ [END: Data mapped to PostgreSQL] │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
└───────────┬───────────────────────────────────┬───────────────────────┘
│ │
│ Known fields → │ Unknown fields →
▼ ▼
┌─────────────────────────┐ ┌──────────────────────────────────────┐
│ PostgreSQL │ │ pending_field_mappings │
│ (ChainAlign Schema) │ │ (Admin Review Queue) │
│ │ │ │
│ • core_entities │ │ • source_field │
│ • sales │ │ • suggested_concept │
│ • locations │ │ • llm_confidence │
│ • forecasts │ │ • llm_reasoning │
│ • inventory │ │ • status: pending │
└─────────────────────────┘ └──────────────┬───────────────────────┘
│
▼
┌───────────────────────────────────────────────────────────────────────┐
│ Admin Validation UI (React) │
│ │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ Pending Mappings Table: │ │
│ │ │ │
│ │ Source Field │ Suggested │ Confidence │ Reasoning │ Actions │ │
│ │ ─────────────────────────────────────────────────────────────── │ │
│ │ proj_stage_ │ billing_ │ 87% │ "Appears │ [Approve] │ │
│ │ bill │ milestone │ │ to be │ [Correct] │ │
│ │ │ │ │ project │ [Reject] │ │
│ │ │ │ │ billing" │ │ │
│ │ │ │
│ │ Actions: │ │
│ │ • Approve → Add synonym to ontology, execute mapping │ │
│ │ • Correct → Select different concept, execute mapping │ │
│ │ • Mark as new → Create migration, add to ontology │ │
│ │ • Reject → Ignore field │ │
│ │ │ │
│ └─────────────────────────────────────────────────────────────────┘ │
└─────────────────────────────┬─────────────────────────────────────────┘
│ Approval updates ↓
┌───────────────────────────────────────────────────────────────────────┐
│ Core Ontology (PostgreSQL + pgvector) │
│ │
│ ontology_concepts: │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ concept_name: "billing_milestone" │ │
│ │ canonical_field: "billing_milestone" │ │
│ │ domain: "finance" │ │
│ │ synonyms: ["ProjectBilling", "StageBilling", "proj_stage_bill"]│ │
│ │ embedding: [0.123, -0.456, ...] (pgvector) │ │
│ │ usage_count: 15 │ │
│ └────────────────────────────────────────────────────────────────┘ │
│ │
│ + 50 other core S&OP concepts │
└───────────────────────────────────────────────────────────────────────┘
15. Conclusion
ChainAlign's ERP Integration System transforms customer integration from a 6-12 month data engineering project into a days-to-weeks self-service onboarding experience. By combining:
- Multi-protocol flexibility (accept whatever customers offer)
- LLM-powered semantic intelligence (understand intent, not just syntax)
- Self-improving ontology (every validation makes the system smarter)
- Agentic automation (minimize human review over time)
ChainAlign becomes a strategic iPaaS partner that removes integration friction while maintaining data quality and governance. This architecture is ready for the Adler pilot and scalable to hundreds of diverse ERP integrations.
Status: Design Complete Next Step: Phase 1 Implementation (Weeks 1-4) Author: Pramod Prasanth Date: 2025-11-17