FSD: AI Core Platform & Real-Time Streaming Engine
| Version | 1.0 |
| Status | Draft |
| Author | Gemini, Pramod |
| Last Updated | 2025-10-29 |
1. Introduction & Vision
1.1. Goal
This document specifies the architecture and functionality for a significant evolution of the ChainAlign platform. The primary goal is to refactor the existing AI integration into a modular, model-agnostic AI Core Platform and to implement a Real-Time Scenario Streaming Engine.
This initiative will transform ChainAlign from a monolithic application into a highly extensible and performant platform for decision intelligence, enabling real-time, collaborative workflows and paving the way for a public developer ecosystem.
1.2. Vision
The long-term vision is to establish ChainAlign as a "Judgment OS for Builders"—a platform that provides Decision Intelligence as a Service (DIaaS). Developers, both internal and external, will be able to leverage ChainAlign's core reasoning, forecasting, and constraint-checking primitives to build their own domain-specific applications within our compliant and auditable framework.
1.3. Scope
This FSD covers the design and implementation of two primary components:
- The AI Core Platform: The foundational layer responsible for all interactions with Large Language Models (LLMs) and other AI services.
- The Real-Time Scenario Workbench: A user-facing feature, powered by a new streaming architecture, for dynamic, interactive "What-If" analysis and Monte Carlo simulations.
Integration with M49 (Decision Execution Tracker): The Streaming Engine's Tier 4 (LLM-Powered Recommendations) output generates actionable decision suggestions. When a user confirms a suggested decision using the drag-confirm gesture, this triggers the M49 TaskService to create a DecisionTask, forming the bridge between analysis and execution. See section 3.6 for demo flow details.
2. Core Architecture: The AI Core Platform
This platform component is inspired by the robust, model-agnostic principles of modern frameworks like the Vercel AI SDK. It will replace the existing AIGateway.js with a more structured, extensible, and observable system.
Key Design Principle: Operational logs (system health, performance) are separated from audit logs (business events, decisions, AI model usage) to optimize performance and comply with data governance regulations.
2.6.2. Operational Logging (Winston + GCP)
Purpose: Real-time monitoring of system health, performance, and errors. Non-blocking, high-frequency logging optimized for observability.
Implementation:
- Logger: Winston.js with structured JSON format
- Middleware:
auditLoggingMiddleware(Express-style) integrated into LLMClient middleware stack - Log Levels: DEBUG (development), INFO, WARN, ERROR
- Development Output: Human-readable console with colors
- Production Output:
- Local files with automatic rotation (5MB per file, 10-file retention)
- GCP Cloud Logging for centralized aggregation and alerting
Logged Events:
// Request/Response Lifecycle
logger.info('HTTP GET /api/scenarios', {
requestId: 'req_123...',
userId: 'user_456',
tenantId: 'org_789',
duration: '145ms',
statusCode: 200
});
// LLM Interactions
logger.debug('LLM token stream', {
model: 'gemini-2.0',
tokens: 1250,
latency: '2.3s'
});
// System Events
logger.error('Database connection failed', {
error: 'ECONNREFUSED',
retryAttempt: 1,
retryable: true
});
Data Minimization: Sensitive fields (passwords, tokens, API keys) are automatically redacted before logging.
2.6.3. Audit Logging System (TimescaleDB)
Purpose: Immutable, queryable record of all business-critical events for compliance, investigations, and decision traceability. Optimized for time-series data with compression and retention policies.
Infrastructure:
- Database: TimescaleDB (PostgreSQL extension for time-series)
- Repository:
AuditLogRepository(Knex-based, follows repository pattern) - Table:
audit_logs(hypertable with automatic time-based indexing) - Indexes: Composite indexes for fast tenant/user/resource queries
- Storage: Auto-compression for chunks >7 days old (10-20x savings)
- Retention: Configurable policy (default: 2555 days/7 years for enterprises)
Schema:
{
id: BIGINT PRIMARY KEY,
time: TIMESTAMP WITH TIME ZONE, // Critical dimension for TimescaleDB
action: VARCHAR(50), // e.g., 'scenario_created', 'ai_decision'
user_id: UUID,
tenant_id: UUID, // Multi-tenant isolation
resource_type: VARCHAR(50), // e.g., 'scenario', 'forecast', 'decision'
resource_id: UUID,
details: JSONB, // Flexible schema for audit metadata
ip_address: INET,
status: VARCHAR(20), // 'success', 'failed', 'partial'
created_at: TIMESTAMP,
updated_at: TIMESTAMP
}
Audited Events:
// Scenario Creation
await AuditService.recordScenarioAction('created', userId, tenantId, scenarioId, {
name: 'Q4 Planning',
constraints: 5,
financialImpact: { margin_change: -2.5 },
dataProcessingBasis: 'contractual_necessity'
});
// AI Decision Logging (EU AI Act Compliance)
await AuditService.recordAIDecision(userId, tenantId, {
decisionId: 'dec_123',
model: 'scenario_optimizer_v2',
modelVersion: '2.1.0',
confidence: 0.92,
inputParameters: { demand_increase: '20%' },
outputDecision: { safety_stock_change: '+15%' },
reasoningChain: [...], // Full CoT reasoning
biasCheck: { passed: true, score: 0.98 },
humanReviewRequired: false
});
// Data Access (GDPR Tracking)
await AuditService.recordDataAccess(userId, tenantId, 'forecast', forecastId, 'read');
Performance: Write-optimized for 100k+ events/day. Time-series indexing enables fast historical queries.
2.6.4. GraphQL Integration for Compliance Queries
Value: Exposes audit data through a unified, flexible API that enables sophisticated compliance reporting without building dedicated REST endpoints.
Extended GraphQL Schema:
type AuditLog {
id: String!
action: String!
userId: String
tenantId: String!
resourceType: String
resourceId: String
status: String!
details: JSON
ipAddress: String
createdAt: DateTime!
}
type ComplianceReport {
tenantId: String!
period: DateRange!
totalEvents: Int!
# Aggregated metrics
byAction: JSON! # Count by event type
byUser: JSON! # Count by user
# Filtered subsets
aiDecisions: [AuditLog!]! # All AI model decisions
dataAccess: [AuditLog!]! # All data access events
failedOperations: [AuditLog!]! # Error tracking
gdprEvents: [AuditLog!]! # Data processing events
}
extend type Decision {
id: ID!
# ... existing fields
auditHistory: [AuditLog!]! # Full audit trail for this decision
aiMetadata: JSON # Model, confidence, reasoning chain
}
extend type Scenario {
id: ID!
# ... existing fields
auditHistory: [AuditLog!]! # Scenario creation, modifications, analyses
}
# Root Query Extensions
extend type Query {
# Compliance Reports
complianceReport(
tenantId: String!
startDate: DateTime!
endDate: DateTime!
): ComplianceReport!
# Audit Trail
auditTrail(
tenantId: String!
filter: AuditFilter
): [AuditLog!]!
# Resource History
resourceHistory(
resourceId: String!
resourceType: String!
): [AuditLog!]!
# User Actions (GDPR Subject Access Requests)
userActionHistory(
userId: String!
days: Int
): [AuditLog!]!
}
input AuditFilter {
userId: String
action: String
resourceType: String
startTime: DateTime
endTime: DateTime
limit: Int
}
Example Queries:
# Get compliance report for a tenant
{
complianceReport(
tenantId: "org_789"
startDate: "2025-01-01"
endDate: "2025-12-31"
) {
totalEvents
byAction
aiDecisions { id action confidence }
}
}
# Get decision with full audit trail
{
decision(id: "dec_123") {
id
question
chosenOption
auditHistory {
action timestamp details
}
aiMetadata {
model confidence reasoningChain
}
}
}
# GDPR Subject Access Request
{
userActionHistory(userId: "user_456", days: 365) {
action resourceType details createdAt
}
}
2.6.5. Compliance Framework Coverage
GDPR Compliance:
- ✅ Data Minimization: Audit logs store only essential information
- ✅ Right to Erasure:
deleteOlderThan()enables bulk deletion by age; can filter by userId for right-to-be-forgotten - ✅ Subject Access Requests:
userActionHistory()query provides exportable record of all user actions - ✅ Data Processing Basis: Logged in
details.dataProcessingBasisfield (contractual, legitimate_interest, etc.) - ✅ Retention Policies: Automatic deletion after configurable period (default: 7 years for enterprises)
SOC2 Audit Trail:
- ✅ Immutable Records: TimescaleDB time-series design ensures chronological integrity
- ✅ User Action Tracking: Every action logged with userId, timestamp, IP address
- ✅ Error Tracking: Failed operations recorded with error details for security monitoring
- ✅ Access Logs: All data access (read, write, delete) tracked for compliance reports
- ✅ Retention: Long-term retention (7 years) supports multi-year audit cycles
EU AI Act Compliance:
- ✅ Model Decision Logging: Every AI decision recorded with:
- Model name and version
- Input parameters and output decision
- Confidence score and reasoning chain (from CoT Service)
- Bias check results
- Human review requirement flag
- ✅ Explainability: Full reasoning chains enable auditors to understand how decisions were made
- ✅ High-Risk Flagging: Decisions below confidence threshold or with bias concerns marked for human review
- ✅ Transparency: Queryable API enables regulators to audit model behavior at scale
Implementation Integration:
The auditLoggingMiddleware in the LLMClient middleware stack (section 2.4) hooks into every AI interaction:
llmClient.use(auditLoggingMiddleware); // Added to middleware stack
// Internally, middleware records:
// - LLM provider, model, token counts
// - Input constraints, output recommendations
// - Confidence scores, reasoning chains
// - Any model-specific audit fields
3. Feature Specification: Real-Time Scenario Workbench
This feature brings to life the vision detailed in docs/architecture/ai-design/streaming-scenario-architecture.md. It provides a dynamic, collaborative environment for users to conduct "What-If" scenario analysis.
3.1. Overview
Users will interact with a workbench where they can adjust input parameters for a scenario and see the multi-stage analysis results stream back in real-time. This transforms S&OP from a slow, static process into a live, interactive exploration. The architecture is designed for collaboration, allowing multiple users to work on the same scenario simultaneously.
3.2. Backend Architecture (ScenarioStreamingEngine)
- Technology: The backend will be an event-driven engine built on WebSockets for persistent, bidirectional communication.
- Session Management: The engine will manage
ScenarioStateandCollaborativeSessionobjects to track parameters, user connections, and results. - Tiered Analysis Pipeline: When a user changes a parameter, the engine initiates a non-blocking, multi-stage analysis, streaming results as each stage completes:
- Tier 1 (Fast): Constraint Analysis: The
ConstraintIntelligenceEngineprovides immediate feedback on feasibility. - Tier 2 (Medium): Financial Impact: The
FinancialImpactModelcalculates and streams key financial metrics. - Tier 3 (Slow): Monte Carlo Simulation: For deep probabilistic analysis, results are streamed progressively.
- Tier 4 (Final): LLM-Powered Recommendations: The engine gathers all structured results and calls the
LLMClientto generate a final, narrative-rich recommendation and summary.
- Tier 1 (Fast): Constraint Analysis: The
3.3. Frontend Architecture
- Technology: The frontend will be built in React.
- Streaming Hook (
useScenarioStreaming): A custom hook will manage the WebSocket connection, handle incoming messages, and provide a simple interface for components to send data (e.g., parameter changes). - Components:
ScenarioWorkbench: The main container, managing controls and results panels.RealTimeResults: A component that displays the live feed of analysis results as they arrive.MonteCarloVisualization: A specialized component that visualizes the distribution and confidence intervals of the simulation as it runs.CollaborativeWorkbench: A wrapper for handling multi-user state and UI elements like participant lists and parameter locks.
3.4. Advanced Streaming Features
- Monte Carlo Progress Streaming: The backend will not wait for the simulation to finish. It will stream intermediate statistics (e.g., every 50 iterations), allowing the frontend to update visualizations in real-time.
- Collaborative Features: The engine will support multi-user sessions with features like:
- Broadcasting parameter changes to all participants.
- Parameter locking to prevent simultaneous edits.
- A live activity feed.
- Scenario Comparison: A mode where multiple scenarios can be run in parallel, with a dashboard streaming comparative metrics in real-time.
3.5. Performance & Reliability
- Data Management: To handle high-frequency updates, the backend will use intelligent buffering and data compression (e.g., delta compression for numerical data) before sending batches to the frontend.
- Error Handling: The system will have robust error handling, with automatic recovery strategies (e.g., exponential backoff for network timeouts) and the ability to fall back to a non-streaming batch mode if necessary.
3.6. Demo Flow: Analysis → Decision → Execution (M49 Integration)
MVP Demo Scenario:
- User launches Scenario Workbench: Adjusts parameters (e.g., "increase safety stock by 20%")
- Tiered Analysis Streams:
- Tier 1 (Constraint): Feasibility feedback
- Tier 2 (Financial): Impact on margins
- Tier 3 (Monte Carlo): Probabilistic outcomes
- Tier 4 (LLM): Recommendation narrative generated
- Decision Suggested: The LLMClient (Tier 4) generates a recommendation: "Recommended: Increase safety stock by 20%. Risk mitigation justifies 2.5% cost impact."
- User Confirms Decision: Clicks the drag-confirm button (preventing accidental confirmation) to accept the recommendation
- Decision Recorded & Task Created:
- DecisionService records the confirmed decision
- Decision confirms event emitted
- M49 TaskService listens and creates a DecisionTask automatically
- Task appears in home screen task list with decision context
- User Tracks Execution: Marks task as IN_PROGRESS as they implement the decision
- Outcome Recording: After implementation and observation, user records outcome with narrative and optional metrics
- Full Audit Trail: Complete loop visible in Judgment Graph: Decision → Analysis Details → Task → Outcome
Post-MVP Enhancements:
- External sync to Linear/Jira (optional)
- Automatic task creation suggestions based on constraint violations
- ERP integration to auto-detect implementation
4. The Developer Platform (@chainalign/sdk)
[Vision-Only - Post-MVP]
The ultimate goal is to expose the power of the AI Core Platform and the Streaming Engine to external developers through a public SDK. This section outlines the strategic vision for external developers. Implementation is deferred to later phases but provides the foundation for the "Judgment OS for Builders" long-term vision.
4.1. SDK Primitives
The @chainalign/sdk will expose high-level functions that mirror the platform's core capabilities:
| Layer | SDK Primitive | Description |
|---|---|---|
| Judgment Engine | ai.reason() | Get structured rationale for choices. |
| Forecasting Engine | ai.forecast() | Predict sales, inventory, or demand patterns. |
| Constraint Engine | ai.validatePlan() | Check the feasibility of a plan against constraints. |
| Streaming Engine | ai.scenarios.create() | Initiate a new real-time scenario session. |
| Streaming Engine | ai.scenarios.stream() | Subscribe to the results of a scenario session. |
| Knowledge Graph | ai.queryKnowledge() | Ask multi-hop questions across organizational memory. |
4.2. Example Usage
import { ChainAlignAI } from '@chainalign/sdk';
const ai = new ChainAlignAI({ apiKey: process.env.CA_API_KEY });
// Create a real-time scenario
const scenario = await ai.scenarios.create({
initial_parameters: { demand_forecast: 1.2 }
});
// Stream results and log them
for await (const update of ai.scenarios.stream(scenario.id)) {
console.log(`Analysis Step: ${update.step}, Progress: ${update.progress * 100}%`);
if (update.step === 'recommendations') {
console.log('Final Recommendation:', update.results.summary);
}
}
5. Implementation Plan
5.1. Phase 1: Foundational Refactoring
- Tasks: Implement the
ProviderRegistry, introduce Zod for one action, and refactorAIGateway.jsinto theLLMClient. - Goal: Establish the core, model-agnostic AI layer.
5.2. Phase 2: Observability & Control
- Tasks: Implement lifecycle hooks, build the
CostManagerv1, and formalize the middleware stack. - Goal: Create the necessary "cockpit" for monitoring and managing the AI core.
5.3. Phase 3: Implement the Real-Time Scenario Workbench
- Tasks: Build the
ScenarioStreamingEnginebackend and theuseScenarioStreamingfrontend hook. Integrate theLLMClientfor the recommendations step. - Goal: Launch the user-facing real-time analysis feature.
5.4. Phase 4: The Platform Vision
- Tasks: Design and prototype the external
@chainalign/sdk. Implement collaborative features and scenario comparison. - Goal: Begin the transition from an internal product to an external developer platform.
6. Success Metrics
6.1. Technical Performance
- Model Agnosticism: Time to add and test a new LLM provider should be < 2 developer days.
- Streaming Latency: <100ms for parameter change acknowledgement; <2s for Tier 1 analysis results to appear.
- Reliability: 99.9% uptime for the streaming engine; automatic recovery in >95% of handled error cases.
6.2. User Experience & Business Impact
- Decision Speed: Reduce average time for multi-scenario analysis from hours to minutes.
- Engagement: >75% of users in scenario analysis sessions should interact with the real-time results.
- Collaboration: Achieve a 40% improvement in cross-functional decision alignment in S&OP meetings.
- Developer Adoption (Long-term): Onboard first external partner to build on the
@chainalign/sdk.
🔧 Addendum: External Interfaces & Event Registry (SDK Scaffolding)
[Post-MVP]
This addendum provides the strategic vision and API surface architecture for the @chainalign/sdk and external system integration. Full implementation of these interfaces is deferred to post-MVP phases.
1. External Interfaces (Public API Layer)
The AI Platform Public API Layer is the single abstraction surface through which all external clients, SDKs, or automation scripts interact with ChainAlign.
It exposes a consistent and model-agnostic API for enterprise reasoning, forecasting, validation, and decision capture.
1.1.
Base Structure
| Element | Description |
|---|---|
| Base URL | /api/v1/ai |
| Auth | Bearer token (Authorization: Bearer <tenant_api_key>) |
| Headers | X-Tenant-ID, X-Request-ID, Content-Type: application/json |
| Transport | HTTPS / WebSocket for streaming |
1.2.
Endpoints
POST /forecast
Purpose: Generate an adaptive forecast based on contextual and historical data.
Maps to: HybridForecastingService.js → forecasting_engine.py
Request:
{
"datasetId": "sales_europe_2024",
"covariates": ["promotions", "temperature", "holiday_flag"],
"horizon": "3M"
}
Response:
{
"forecastId": "fcst_20251029_001",
"modelUsed": "Prophet + XGBoost Residual",
"forecast": [
{ "date": "2025-11-01", "value": 10452 },
{ "date": "2025-12-01", "value": 11320 }
],
"metrics": { "MAPE": 4.2, "RMSE": 540 }
}
POST /validate
Purpose: Validate a business plan or scenario against operational and financial constraints.
Maps to: constraintValidationService.js → monteCarloSimulator.js
Request:
{
"planId": "Q4_PRODUCTION_PLAN",
"constraints": ["MAX_CAPACITY", "TARGET_MARGIN"],
"context": { "plant": "Basel", "product": "A1" }
}
Response:
{
"validation": "partial_pass",
"violations": [
{ "constraint": "TARGET_MARGIN", "impact": "-0.5%", "revenueAtRisk": "1.2M" }
],
"recommendation": "Reduce overtime by 5% or raise price by 2% to meet target margin."
}
POST /reason
Purpose: Generate structured reasoning or analysis from the GraphRAG + Judgment Engine.
Maps to: RAGService.js → cognee-service → AIGateway.js
Request:
{
"question": "What is the risk to on-time delivery if Supplier Z delays by 3 weeks?",
"context": ["supply_chain_risks", "supplier_contracts"],
"timeframe": "Q1_2026"
}
Response:
{
"reasoningId": "rsn_20251029_023",
"summary": "Supplier Z's delay creates a 12% risk to Q1 delivery.",
"supportingEvidence": [
"Supplier Z provides 80% of component C for Product A.",
"No alternate supplier identified with approved QA."
],
"recommendation": "Activate alternate sourcing from Supplier K (lead time 14 days)."
}
POST /decision
Purpose: Record or update a formal decision and capture rationale.
Maps to: DecisionService.js + LinearJudgmentEngineService.js
Request:
{
"decisionId": "dec_20251029_009",
"question": "Should we expedite Supplier K shipments for Q1?",
"chosenOption": "Yes",
"rationale": "Based on 12% risk of delay from Supplier Z; mitigation cost justified.",
"linkedForecasts": ["fcst_20251029_001"],
"linkedValidations": ["val_20251029_021"]
}
Response:
{
"status": "recorded",
"decisionId": "dec_20251029_009",
"judgmentGraphNode": "node_8345"
}
GET /stream
Purpose: Subscribe to live event streams from the AI Platform.
Maps to: Internal Streaming Engine and Event Bus.
Example WebSocket Subscription:
{
"subscribe": [
"forecast.progress",
"constraint.checked",
"decision.lifecycle",
"llm.token_stream"
]
}
Event Message Format:
{
"type": "forecast.progress",
"timestamp": "2025-10-29T18:00:00Z",
"payload": { "step": "training_xgboost", "progress": 0.7 }
}
2. Unified Data Contracts
To ensure SDK compatibility, all responses must follow a standard envelope:
{
"status": "success",
"data": { ... },
"meta": {
"source": "forecasting_engine",
"version": "v1.0.0",
"duration_ms": 2140
},
"error": null
}
Errors use a consistent schema:
{
"status": "error",
"error": {
"code": "CIE_VALIDATION_FAIL",
"message": "Target margin constraint violated",
"severity": "warning"
}
}
This uniform contract makes it trivial for the SDK to wrap calls like:
const forecast = await ai.forecast({ ... });
forecast.on("progress", handleProgress);
3. Event Registry (System-to-SDK Signals)
Each core service publishes standard events to the platform event bus.
The SDK’s .on(event, handler) system will later listen to these topics.
| Event | Origin | Payload Summary | Description |
|---|---|---|---|
| forecast.requested | Forecasting Engine | datasetId, covariates | Forecast job created |
| forecast.progress | Forecasting Engine | step, progress | Model training updates |
| forecast.completed | Forecasting Engine | forecastId, metrics | Final forecast ready |
| constraint.checked | CIE | planId, validationStatus | Constraint validation run |
| constraint.violation_detected | CIE | constraint, impact | Soft/hard constraint breach |
| decision.recorded | Judgment Engine | decisionId, rationale | Decision stored |
| decision.executed | Judgment Engine | decisionId, outcome | Decision acted upon |
| llm.token_stream | AIGateway | token, sequence | Streamed LLM outputs |
| audit.logged | Trust Layer | user, action, redactionLevel | Compliance log entry |
All event payloads conform to this structure:
{
"event": "decision.recorded",
"timestamp": "2025-10-29T17:50:00Z",
"tenantId": "T001",
"payload": { "decisionId": "dec_20251029_009" }
}
4. Authentication & Tenant Context
Each API call requires a valid X-Tenant-ID header and API key.
| Header | Description |
|---|---|
| X-Tenant-ID | Logical tenant or business unit ID |
| X-User-ID | Optional, for traceability in audits |
| Authorization | Bearer token issued via Tenant Admin console |
| X-Request-ID | Optional for tracing across event streams |
Future SDK clients can auto-inject these headers after a one-time setup:
const ai = new ChainAlignAI({
apiKey: process.env.CA_API_KEY,
tenant: "BASF_EMEA"
})
5. Versioning & Extensibility
- Version every endpoint and event (/api/v1/ai/...)
- Maintain backward-compatible schema evolution via semantic versioning.
- Allow providers to be dynamically injected later (e.g., provider: "gemini" | "openai" | "mistral").
- All internal LLM interactions go through AIGateway.js (the “firewall” provider abstraction).
6. Hook System (for Future SDK Integration)
These hooks provide internal observability and form the bridge to SDK events.
| Hook | Trigger | Purpose |
|---|---|---|
| onRequestStart | Before request is dispatched | Initialize audit trail |
| onTokenStream | During streaming | Live token output |
| onResponse | After successful response | Summarize cost & latency |
| onError | On error or exception | Report to observability service |
| onAudit | On audit log write | Sync with compliance layer |
Later, the SDK can map these to developer-facing callbacks:
ai.on("token", streamToConsole)
ai.on("audit", pushToSIEM)
7. Summary: SDK-Ready FSD Alignment
| SDK Concept | FSD Anchor | Ready? |
|---|---|---|
| Forecast / Validate / Reason / Decision APIs | Defined under /api/v1/ai | ✅ |
| Unified schema contracts | Envelope & error model | ✅ |
| Streaming event registry | Defined with topics | ✅ |
| Hook system | Added for observability | ✅ |
| Provider abstraction | Built-in via AIGateway | ✅ |
| Versioning | /v1 namespace | ✅ |
| Multi-tenancy | Header model defined | ✅ |
| SDK scaffolding gap | Authentication SDK + typed model interfaces | 🔜 (deferred) |