Skip to main content

CRITICAL AMENDMENTS: M60 FSD Architecture Review

Purpose: Address 10 critical gaps identified in architectural review before implementation

Status: APPROVED - These amendments supersede relevant sections in main FSD

Integration: These sections should be merged into the main FSD before implementation begins


AMENDMENT 1: Scenario Dependencies & Sequential Execution

Issue: Original FSD showed parallel scenario execution (Section 3.3), but CFO scenario depends on S&OP results (Section 4.2).

Fix: Implement explicit dependency graph with sequential execution where needed.

1.1 Scenario Dependency Graph

Execution Order (Respecting Dependencies):

TIER A (S&OP):
SOP Execution

(Complete)

TIER B (S&OP + CFO):
SOP Execution

(Complete - results cached)

CFO Execution
├─ Input: S&OP planning_phase_result
├─ Input: S&OP production plan
└─ Input: Financial historical data

(Complete)

TIER C (All 3):
SOP Execution

(Complete)

CFO Execution
├─ Input: SOP results (demand forecast, production plan)
└─ Input: Financial data

(Complete)

CapEx Execution
├─ Input: SOP production capacity implications
├─ Input: CFO financial capacity (budget available)
└─ Input: CapEx proposals + asset data

(Complete)

1.2 Dependency Specification

/**
* Scenario Dependency Definition
* Defines prerequisite scenarios for each scenario type
*/

const SCENARIO_DEPENDENCIES = {
'sop': {
name: 'S&OP Planning',
depends_on: [], // No dependencies
required_by: ['cfo_finance', 'capex'],

// What outputs does SOP provide to dependents?
provides: {
'cfo_finance': [
'demand_forecast',
'production_plan',
'inventory_target'
],
'capex': [
'production_capacity_needs',
'asset_utilization_forecast'
]
}
},

'cfo_finance': {
name: 'CFO Financial Analysis',
depends_on: ['sop'], // MUST run after SOP
required_by: ['capex'],

provides: {
'capex': [
'operating_cf_forecast',
'available_capital_budget',
'debt_covenant_headroom'
]
}
},

'capex': {
name: 'CapEx Allocation',
depends_on: ['sop', 'cfo_finance'], // MUST run after both
required_by: [],

provides: {}
}
};

/**
* Function: Get execution order for tier
* Returns scenarios sorted by dependency
*/
function getExecutionOrder(tier) {
const tierScenarios = {
'TIER_A': ['sop'],
'TIER_B': ['sop', 'cfo_finance'],
'TIER_C': ['sop', 'cfo_finance', 'capex']
};

const scenarios = tierScenarios[tier];

// Topological sort (respects dependencies)
return topologicalSort(scenarios, SCENARIO_DEPENDENCIES);
}

/**
* Helper: Topological Sort
* Orders scenarios respecting dependency constraints
*/
function topologicalSort(scenarios, dependencies) {
const sorted = [];
const visited = new Set();
const visiting = new Set();

function visit(scenario) {
if (visited.has(scenario)) return;
if (visiting.has(scenario)) {
throw new Error(`Circular dependency detected: ${scenario}`);
}

visiting.add(scenario);

const deps = dependencies[scenario]?.depends_on || [];
for (const dep of deps) {
if (scenarios.includes(dep)) {
visit(dep);
}
}

visiting.delete(scenario);
visited.add(scenario);
sorted.push(scenario);
}

for (const scenario of scenarios) {
visit(scenario);
}

return sorted;
}

1.3 Sequential Execution with Dependency Waiting

Update to Section 3.3 - Scenario Routing Logic:

/**
* Enqueue scenarios respecting dependencies
*
* Process:
* 1. Get subscription tier → execution order
* 2. For each scenario in order:
* - Enqueue job
* - If has dependencies: Mark as waiting
* - If no dependencies: Mark as ready
* 3. Worker polls for ready jobs
* 4. When scenario completes, unblock dependents
*/

async function routeDataEventToScenarios(event, tenantId) {
// 1. Get tenant subscription
const subscription = await subscriptionRepository.findByTenantId(tenantId);

// 2. Determine affected scenarios
const affectedScenarios = identifyAffectedScenarios(event);

// 3. Get all subscribed scenarios (may include non-affected)
const subscribedScenarios = getSubscribedScenarios(subscription.tier);

// 4. Intersect: only affected + subscribed scenarios
const toExecute = affectedScenarios.filter(s => subscribedScenarios.includes(s));

// 5. Sort by dependency order
const executionOrder = topologicalSort(toExecute, SCENARIO_DEPENDENCIES);

// 6. Create or get sop_cycle
let cycle = event.cycleId;
if (!cycle) {
cycle = await createNewCycle(event, tenantId);
}

// 7. Enqueue jobs respecting dependencies
const enqueuedJobs = [];

for (let i = 0; i < executionOrder.length; i++) {
const scenario = executionOrder[i];
const deps = SCENARIO_DEPENDENCIES[scenario].depends_on;

const job = {
jobId: uuid(),
tenantId,
cycleId: cycle.id,
scenarioType: scenario,
eventData: event.data,

// Dependency metadata
dependencies: deps,
dependencyCompleted: deps.length === 0, // True if no deps
waitingFor: deps.length > 0 ? deps : null,

// Execution metadata
enqueuedAt: now(),
priority: event.severity === 'HIGH' ? 'URGENT' : 'NORMAL',
retryCount: 0,
maxRetries: 3
};

// Enqueue
await pgmq.enqueue('scenario_execution_queue', job, {
// Only immediately available if no deps
delay_until: deps.length === 0 ? now() : now() + 10_000
});

enqueuedJobs.push({
scenario,
jobId: job.jobId,
status: deps.length === 0 ? 'READY' : 'WAITING',
waitingFor: deps.length > 0 ? deps : null
});

console.log(`[${tenantId}] Enqueued ${scenario} scenario`, {
status: job.dependencyCompleted ? 'READY' : 'WAITING',
waitingFor: deps
});
}

// 8. Return job summary
return enqueuedJobs;
}

1.4 Worker Processing with Dependency Checking

New Worker Lifecycle:

/**
* Background Worker - Process Scenario Execution Job
*/

async function processScenarioJob(job) {
try {
// 1. Check if dependencies are satisfied
if (job.dependencies.length > 0) {
const depsComplete = await checkDependenciesComplete(
job.cycleId,
job.dependencies
);

if (!depsComplete) {
console.log(`[${job.tenantId}] Scenario ${job.scenarioType} waiting for:`, job.dependencies);

// Re-queue with exponential backoff
const backoffSeconds = Math.min(60, 10 * Math.pow(2, job.retryCount));
await pgmq.enqueue('scenario_execution_queue', {
...job,
retryCount: job.retryCount + 1
}, {
delay_seconds: backoffSeconds
});

return {
status: 'RESCHEDULED',
reason: 'Dependencies not satisfied',
recheck_in_seconds: backoffSeconds
};
}
}

// 2. Load dependency results for context
let contextData = { ...job.eventData };

if (job.dependencies.length > 0) {
const depResults = await loadDependencyResults(
job.cycleId,
job.dependencies
);

contextData.dependencyOutputs = {
sop: depResults['sop'], // If SOP completed, include results
cfo_finance: depResults['cfo_finance'] // If CFO completed, include results
};

console.log(`[${job.tenantId}] Loaded dependency outputs for ${job.scenarioType}`);
}

// 3. Execute scenario (Planning + Control)
const planningResult = await runPlanningPhase(
job.scenarioType,
job.tenantId,
job.cycleId,
contextData
);

const controlResult = await runControlPhase(
job.scenarioType,
planningResult
);

// 4. Store result
const result = await storeScenarioResult(
job.cycleId,
job.scenarioType,
planningResult,
controlResult
);

// 5. Mark job complete in database (so dependents know)
await markScenarioComplete(job.cycleId, job.scenarioType);

// 6. Trigger dependent scenario jobs
const dependents = SCENARIO_DEPENDENCIES[job.scenarioType].required_by;
for (const dependent of dependents) {
// Wake up any waiting jobs for this dependent
await pgmq.send('scenario_execution_queue', {
action: 'UNBLOCK_DEPENDENT',
blockedScenario: dependent,
cycleId: job.cycleId,
completedScenario: job.scenarioType
});
}

return {
status: 'SUCCESS',
resultId: result.id,
duration_seconds: (now() - job.enqueuedAt) / 1000
};

} catch (error) {
console.error(`[${job.tenantId}] Error processing ${job.scenarioType}:`, error);

// Retry logic
if (job.retryCount < job.maxRetries) {
await pgmq.enqueue('scenario_execution_queue', {
...job,
retryCount: job.retryCount + 1
}, {
delay_seconds: Math.min(60, 10 * Math.pow(2, job.retryCount))
});

return {
status: 'RETRY_QUEUED',
attempt: job.retryCount + 1,
maxAttempts: job.maxRetries
};
} else {
// Mark as failed after max retries
await markScenarioFailed(job.cycleId, job.scenarioType, error.message);

return {
status: 'FAILED',
error: error.message,
attempts_exhausted: true
};
}
}
}

/**
* Check if all dependencies for a scenario have completed
*/
async function checkDependenciesComplete(cycleId, dependencies) {
if (dependencies.length === 0) return true;

const completionStates = await db.query(`
SELECT scenario_type, execution_status
FROM scenario_execution_results
WHERE sop_cycle_id = $1
AND scenario_type = ANY($2)
`, [cycleId, dependencies]);

// All dependencies must be SUCCESS or PARTIAL (with approval)
const completedCount = completionStates.filter(s =>
s.execution_status === 'SUCCESS' || s.execution_status === 'PARTIAL'
).length;

return completedCount === dependencies.length;
}

/**
* Load results from prerequisite scenarios
*/
async function loadDependencyResults(cycleId, dependencies) {
const results = {};

for (const depScenario of dependencies) {
const result = await db.query(`
SELECT
scenario_type,
planning_phase_result,
control_phase_result,
final_result
FROM scenario_execution_results
WHERE sop_cycle_id = $1
AND scenario_type = $2
LIMIT 1
`, [cycleId, depScenario]);

if (result.rows.length > 0) {
results[depScenario] = result.rows[0];
}
}

return results;
}

/**
* Mark scenario as completed in database
* (Allows dependent jobs to proceed)
*/
async function markScenarioComplete(cycleId, scenarioType) {
await db.query(`
UPDATE scenario_execution_results
SET completed_at = NOW(),
cache_valid = true,
cached_at = NOW()
WHERE sop_cycle_id = $1
AND scenario_type = $2
`, [cycleId, scenarioType]);
}

AMENDMENT 2: CSV Entity Resolution & Product Matching

Issue: CSV import specifies product_sku must map to core_entities.sku, but entity resolution logic was missing.

Fix: Add explicit entity resolution workflow during CSV import.

2.1 Entity Resolution During CSV Import

/**
* Process S&OP CSV Upload with Entity Resolution
*
* Workflow:
* 1. Parse CSV
* 2. Extract unique SKUs
* 3. Resolve SKUs to core_entities
* 4. Create missing entities
* 5. Insert sop_plan_data with resolved entity_ids
*/

async function processSopCsvImport(tenantId, csvFilePath) {
console.log(`[${tenantId}] Processing S&OP CSV: ${csvFilePath}`);

// Step 1: Parse CSV
const csvRows = await parseCsvFile(csvFilePath);
console.log(`Parsed ${csvRows.length} rows from CSV`);

// Step 2: Validation
validateSopCsvStructure(csvRows);
validateSopCsvDataQuality(csvRows);

// Step 3: Extract unique product SKUs
const uniqueSkus = [...new Set(csvRows.map(r => r.product_sku))];
console.log(`Found ${uniqueSkus.length} unique product SKUs: [${uniqueSkus.slice(0, 5).join(', ')}...]`);

// Step 4: Resolve SKUs to entity IDs (with auto-creation)
const skuToEntityMap = await resolveProductSkus(tenantId, uniqueSkus);
console.log(`Resolved ${Object.keys(skuToEntityMap).length} entities`);

// Step 5: Create sop_cycle
const cycle = await createSopCycle(tenantId, 'CSV_BOOTSTRAP', {
filename: path.basename(csvFilePath),
row_count: csvRows.length,
unique_skus: uniqueSkus.length
});
console.log(`Created cycle: ${cycle.id}`);

// Step 6: Transform CSV rows to sop_plan_data (with resolved entity_ids)
const planDataRows = csvRows.map((row, index) => {
const entityId = skuToEntityMap[row.product_sku];

if (!entityId) {
throw new Error(
`Row ${index + 1}: SKU '${row.product_sku}' could not be resolved to entity`
);
}

return {
cycle_id: cycle.id,
tenant_id: tenantId,
product_id: entityId, // Resolved!
period: parseDate(row.period),
bookings_plan: parseFloat(row.bookings_plan) || 0,
shipments_actual: parseFloat(row.shipments_actual) || 0,
inventory_actual: parseFloat(row.inventory_actual) || 0,
production_actual: parseFloat(row.production_actual) || 0,
revenue_plan: parseFloat(row.revenue_plan) || 0,
revenue_actual: parseFloat(row.revenue_actual) || 0,
region: row.region || null,
segment: row.segment || null,
cost_center: row.cost_center || null,
capacity_utilized_percent: row.capacity_utilized_percent
? parseFloat(row.capacity_utilized_percent)
: null,
data_source: 'CSV_UPLOAD',
created_at: now(),
updated_at: now()
};
});

console.log(`Transformed ${planDataRows.length} rows for insertion`);

// Step 7: Bulk insert sop_plan_data
const inserted = await sopPlanDataRepository.bulkInsert(planDataRows);
console.log(`Inserted ${inserted.length} sop_plan_data rows`);

// Step 8: Return summary
return {
cycleId: cycle.id,
rowsImported: csvRows.length,
uniqueSkusCreated: Object.values(skuToEntityMap).filter(id => id.isNew).length,
uniqueSkusMatched: Object.values(skuToEntityMap).filter(id => !id.isNew).length,
totalUniqueSkus: uniqueSkus.length,
status: 'SUCCESS'
};
}

/**
* Resolve Product SKUs to Entity IDs
*
* Process:
* 1. Query existing entities for matching SKUs
* 2. For unmatched SKUs, create new entities
* 3. Return map of SKU → Entity ID
*/

async function resolveProductSkus(tenantId, skus) {
const skuToEntityMap = {};

// Step 1: Find existing entities
const existingEntities = await db.query(`
SELECT entity_id, sku, entity_name
FROM core_entities
WHERE tenant_id = $1
AND entity_type = 'PRODUCT'
AND sku = ANY($2)
`, [tenantId, skus]);

const existingSkuMap = new Map(
existingEntities.map(e => [e.sku, e.entity_id])
);

console.log(`Found ${existingSkuMap.size} existing product entities`);

// Step 2: Find SKUs that need entity creation
const newSkus = skus.filter(sku => !existingSkuMap.has(sku));
console.log(`${newSkus.length} SKUs need new entities`);

if (newSkus.length > 0) {
// Batch insert new entities
const newEntities = newSkus.map(sku => ({
tenant_id: tenantId,
entity_type: 'PRODUCT',
sku: sku,
entity_name: sku, // Default name = SKU (can be updated later)
created_at: now()
}));

const inserted = await db.query(`
INSERT INTO core_entities (tenant_id, entity_type, sku, entity_name, created_at)
VALUES ${newEntities.map((_, i) => `($1, $2, $${i * 4 + 3}, $${i * 4 + 4}, NOW())`).join(', ')}
ON CONFLICT (tenant_id, sku) DO NOTHING
RETURNING entity_id, sku
`, [
tenantId,
'PRODUCT',
...newEntities.flatMap(e => [e.sku, e.entity_name])
]);

console.log(`Created ${inserted.rows.length} new product entities`);

inserted.rows.forEach(row => {
existingSkuMap.set(row.sku, row.entity_id);
});
}

// Step 3: Build final map
for (const sku of skus) {
const entityId = existingSkuMap.get(sku);
skuToEntityMap[sku] = {
entityId: entityId,
isNew: newSkus.includes(sku)
};
}

return skuToEntityMap;
}

/**
* Validate S&OP CSV Structure
*
* Checks:
* - Required columns present
* - Data types correct
* - No duplicate SKU+period combinations
*/

function validateSopCsvStructure(csvRows) {
const requiredColumns = [
'period', 'product_sku', 'bookings_plan', 'shipments_actual',
'inventory_actual', 'production_actual', 'revenue_plan', 'revenue_actual'
];

if (!csvRows || csvRows.length === 0) {
throw new ValidationError('CSV file is empty');
}

const csvHeaders = Object.keys(csvRows[0]);

for (const required of requiredColumns) {
if (!csvHeaders.includes(required)) {
throw new ValidationError(
`Missing required column: '${required}'`
);
}
}

console.log(`✓ All required columns present: [${requiredColumns.join(', ')}]`);
}

/**
* Validate S&OP CSV Data Quality
*
* Checks:
* - Dates are valid (YYYY-MM-01)
* - Numbers are valid
* - No nulls in required fields
* - Logical constraints (inventory >= 0, etc.)
*/

function validateSopCsvDataQuality(csvRows) {
const errors = [];

for (let i = 0; i < csvRows.length; i++) {
const row = csvRows[i];
const rowNum = i + 2; // +2 for header + 0-indexing

// Validate period (must be YYYY-MM-01)
if (!row.period || !row.period.match(/^\d{4}-\d{2}-01$/)) {
errors.push({
row: rowNum,
column: 'period',
value: row.period,
error: 'Invalid date format (expected YYYY-MM-01)'
});
}

// Validate product_sku (required, non-empty)
if (!row.product_sku || row.product_sku.trim() === '') {
errors.push({
row: rowNum,
column: 'product_sku',
value: row.product_sku,
error: 'Required field cannot be empty'
});
}

// Validate numeric fields
const numericFields = [
'bookings_plan', 'shipments_actual', 'inventory_actual',
'production_actual', 'revenue_plan', 'revenue_actual'
];

for (const field of numericFields) {
const value = row[field];

if (value === null || value === undefined || value === '') {
errors.push({
row: rowNum,
column: field,
value: value,
error: 'Required numeric field is empty'
});
continue;
}

const num = parseFloat(value);
if (isNaN(num)) {
errors.push({
row: rowNum,
column: field,
value: value,
error: `Invalid number: '${value}'`
});
}

if (num < 0) {
errors.push({
row: rowNum,
column: field,
value: value,
error: 'Negative value not allowed for quantity/revenue'
});
}
}

// Logical constraint: inventory >= 0
const inventory = parseFloat(row.inventory_actual);
if (!isNaN(inventory) && inventory < 0) {
errors.push({
row: rowNum,
column: 'inventory_actual',
value: row.inventory_actual,
error: 'Inventory cannot be negative'
});
}
}

if (errors.length > 0) {
throw new ValidationError(`Data quality validation failed`, {
errorCount: errors.length,
errors: errors.slice(0, 10), // Show first 10 errors
totalErrors: errors.length
});
}

console.log(`✓ All ${csvRows.length} rows passed data quality validation`);
}

2.2 Update CSV File Detection

Update Section 2.3 of FSD:

When CSV file is uploaded, detect the tier-specific file type:

/**
* Enhance UploadAssistantService.detectFileType()
* to recognize tier-specific filenames
*/

async function detectFileType(headers, sampleRows, tenantId, user) {
// Check filename pattern first
const filename = user.uploadedFilename.toLowerCase();

// Tier A files
if (filename.includes('sales_and_operations')) {
return {
fileType: 'SALES_AND_OPERATIONS',
tier: 'TIER_A',
confidence: 1.0
};
}

// Tier B files
if (filename.includes('financial_statement')) {
return {
fileType: 'FINANCIAL_STATEMENT',
tier: 'TIER_B',
confidence: 1.0
};
}

if (filename.includes('operating_expenses')) {
return {
fileType: 'OPERATING_EXPENSES',
tier: 'TIER_B',
confidence: 1.0
};
}

// Tier C files
if (filename.includes('capex_proposals')) {
return {
fileType: 'CAPEX_PROPOSALS',
tier: 'TIER_C',
confidence: 1.0
};
}

if (filename.includes('asset')) {
return {
fileType: 'ASSET_INVENTORY',
tier: 'TIER_C',
confidence: 0.9
};
}

// Fallback: Use existing Gemini-based detection
return await useGeminiDetection(headers, sampleRows, tenantId);
}

AMENDMENT 3: Incremental vs Full Execution Modes

Issue: Section 6.2 mentioned "Incremental Execution" optimization but never defined the logic.

Fix: Add complete specification for when to use incremental vs full execution.

3.1 Execution Mode Determination

/**
* Determine whether to execute scenario in FULL or INCREMENTAL mode
*
* Goals:
* - FULL execution: When significant data changed (>20% of rows)
* - INCREMENTAL execution: When only recent period updated (cost saving)
* - FULL execution: When historical data changed (affects patterns)
*/

async function determineExecutionMode(job, lastExecution) {
const changeSummary = analyzeDataChange(job.eventData);

// Rule 1: New cycle always gets FULL execution
if (!lastExecution) {
return {
mode: 'FULL',
reason: 'First execution for this cycle',
confidence: 1.0
};
}

// Rule 2: If >20% of data rows changed → FULL execution
const changePercentage = changeSummary.rows_changed / changeSummary.total_rows;
if (changePercentage > 0.20) {
return {
mode: 'FULL',
reason: `Major data change (${(changePercentage * 100).toFixed(1)}% > 20%)`,
confidence: 1.0
};
}

// Rule 3: If historical periods changed → FULL execution
const affectedPeriods = changeSummary.affected_periods || [];
const lastExecutedPeriod = lastExecution.latest_period;

const historicalChanges = affectedPeriods.filter(p => p < lastExecutedPeriod);
if (historicalChanges.length > 0) {
return {
mode: 'FULL',
reason: `Historical data changed (periods: [${historicalChanges.join(', ')}])`,
reason_detail: 'Historical changes affect trend patterns',
confidence: 1.0
};
}

// Rule 4: If only latest period updated → INCREMENTAL execution
const recentPeriodChanges = affectedPeriods.filter(p >= lastExecutedPeriod);
if (affectedPeriods.length > 0 && recentPeriodChanges.length === affectedPeriods.length) {
return {
mode: 'INCREMENTAL',
reason: `Only recent periods updated (${affectedPeriods.length} period(s))`,
scope: affectedPeriods,
confidence: 0.85,
cost_savings: {
estimated_time_percent: 0.67, // 67% faster (10s → 3s)
estimated_llm_cost_percent: 0.25 // 75% cheaper
}
};
}

// Rule 5: If minor changes (<5%) to any period → INCREMENTAL
if (changePercentage < 0.05) {
return {
mode: 'INCREMENTAL',
reason: `Minor data update (${(changePercentage * 100).toFixed(2)}% < 5%)`,
scope: affectedPeriods,
confidence: 0.75
};
}

// Default: FULL (conservative/safe)
return {
mode: 'FULL',
reason: 'Default safe mode',
confidence: 0.5
};
}

/**
* Analyze what data changed in the event
*/

function analyzeDataChange(eventData) {
// Expected event structure: { dataType, newValues, oldValues }
const { dataType, newRows, oldRows } = eventData;

if (!oldRows || oldRows.length === 0) {
return {
rows_changed: newRows.length,
total_rows: newRows.length,
affected_periods: extractPeriods(newRows),
change_type: 'NEW_DATA'
};
}

// Compare old vs new
const oldRowMap = new Map(oldRows.map(r => [`${r.product_sku}:${r.period}`, r]));
const newRowMap = new Map(newRows.map(r => [`${r.product_sku}:${r.period}`, r]));

let changedCount = 0;
const changedPeriods = new Set();

// Find changed rows
for (const [key, newRow] of newRowMap) {
const oldRow = oldRowMap.get(key);

if (!oldRow) {
changedCount++; // New row
changedPeriods.add(newRow.period);
} else if (!rowsEqual(oldRow, newRow)) {
changedCount++; // Modified row
changedPeriods.add(newRow.period);
}
}

return {
rows_changed: changedCount,
total_rows: Math.max(oldRows.length, newRows.length),
affected_periods: Array.from(changedPeriods).sort(),
change_type: 'UPDATE'
};
}

/**
* Execute scenario with appropriate mode (FULL or INCREMENTAL)
*/

async function executeScenarioWithMode(job, executionMode, tenantId, cycleId) {
console.log(
`[${tenantId}] Executing ${job.scenarioType} in ${executionMode.mode} mode`,
executionMode
);

if (executionMode.mode === 'INCREMENTAL') {
return await executeScenarioIncremental(job, executionMode);
} else {
return await executeScenarioFull(job);
}
}

/**
* INCREMENTAL Execution: Re-forecast only affected periods
*
* Input:
* - Previous execution results
* - New data for affected periods
*
* Process:
* 1. Load previous planning result
* 2. Identify periods to re-forecast
* 3. Run planning agent on ONLY those periods
* 4. Merge results: keep previous periods, override changed periods
* 5. Re-run control phase on merged result
*/

async function executeScenarioIncremental(job, executionMode) {
const { scenarioType, cycleId, tenantId, eventData } = job;
const { scope: affectedPeriods } = executionMode;

console.log(
`[${tenantId}] Running INCREMENTAL ${scenarioType}`,
`Affected periods: [${affectedPeriods.join(', ')}]`
);

// Load previous execution
const previousResult = await db.query(`
SELECT planning_phase_result, control_phase_result
FROM scenario_execution_results
WHERE sop_cycle_id = $1 AND scenario_type = $2
ORDER BY created_at DESC
LIMIT 1
`, [cycleId, scenarioType]);

if (!previousResult.rows || previousResult.rows.length === 0) {
// No previous result, fall back to FULL
console.log(`[${tenantId}] No previous result found, falling back to FULL execution`);
return await executeScenarioFull(job);
}

const previous = previousResult.rows[0];

// For S&OP scenario: re-forecast only affected periods
if (scenarioType === 'sop') {
const incrementalForecast = await planningAgent.forecastIncremental({
previous_demand_forecast: previous.planning_phase_result.demand_forecast,
previous_production_plan: previous.planning_phase_result.production_plan,
new_data_periods: affectedPeriods,
updated_actuals: eventData,
tenant_context: await getTenantContext(tenantId)
});

// Merge: keep non-affected periods, override affected periods
const mergedDemandForecast = {
...previous.planning_phase_result.demand_forecast
};

for (const period of affectedPeriods) {
if (incrementalForecast.demand_forecast[period]) {
mergedDemandForecast[period] = incrementalForecast.demand_forecast[period];
}
}

const mergedPlanningResult = {
...previous.planning_phase_result,
demand_forecast: mergedDemandForecast,
production_plan: incrementalForecast.production_plan,
execution_mode: 'INCREMENTAL',
updated_periods: affectedPeriods,
retained_periods: Object.keys(previous.planning_phase_result.demand_forecast)
.filter(p => !affectedPeriods.includes(p))
};

// Re-run control phase on merged result
const controlResult = await runControlPhase(scenarioType, mergedPlanningResult);

return {
planning_phase_result: mergedPlanningResult,
control_phase_result: controlResult,
execution_mode: 'INCREMENTAL',
affected_periods: affectedPeriods
};
}

// For CFO scenario: similar incremental approach
if (scenarioType === 'cfo_finance') {
// ... similar pattern
}

// For CapEx: typically requires FULL execution
if (scenarioType === 'capex') {
return await executeScenarioFull(job);
}
}

/**
* FULL Execution: Complete re-run of planning
*/

async function executeScenarioFull(job) {
const { scenarioType, cycleId, tenantId, eventData, contextData } = job;

console.log(`[${tenantId}] Running FULL ${scenarioType} scenario`);

const planningResult = await runPlanningPhase(
scenarioType,
tenantId,
cycleId,
{ ...eventData, ...contextData }
);

const controlResult = await runControlPhase(scenarioType, planningResult);

return {
planning_phase_result: planningResult,
control_phase_result: controlResult,
execution_mode: 'FULL'
};
}

3.2 Cost Impact of Incremental Execution

EXECUTION TIME COMPARISON:

Scenario Type FULL Execution INCREMENTAL Savings
─────────────────────────────────────────────────────────
S&OP 10-15s 2-5s 67-87%
CFO Finance 15-20s 4-8s 60-80%
CapEx 20-30s N/A (usually FULL)

LLM API COST IMPACT:

Metric FULL INCREMENTAL Savings
─────────────────────────────────────────────────────────
Input Tokens ~5,000 ~1,500 70%
Output Tokens ~2,000 ~800 60%
Cost per execution $0.10 $0.02 80%

TENANT ECONOMICS:

Example: Tier B tenant with 2 S&OP + 2 CFO executions/month

Scenario: 60% incremental, 40% full

S&OP Cost:
2 scenarios × (60% × $0.02 + 40% × $0.10) = $0.064/month

CFO Cost:
2 scenarios × (60% × $0.02 + 40% × $0.12) = $0.064/month

Total: ~$0.13/month (vs $0.44/month if all full)
Savings: 70%

AMENDMENT 4: Meta-Prompting Service API Integration

Issue: Multiple references to meta-prompting service but no API specification.

Fix: Define complete API contracts and integration patterns.

4.1 Meta-Prompting Service API Endpoints

Service Location:

  • Development: http://localhost:5002
  • Production: http://meta-prompting-service:5002 (Docker)

Endpoint 1: POST /refine-prompt

Purpose: Improve a prompt based on execution feedback and historical data

Request:
Content-Type: application/json

{
"initial_prompt": "string (current prompt template)",
"task_description": "string (what scenario is trying to achieve)",
"execution_history": [
{
"prompt_version": "v1.2.3",
"accuracy_score": 0.65,
"execution_date": "2025-11-01T10:00:00Z",
"failure_modes": [
"over-forecasted demand by 15%",
"missed seasonal pattern for Q4",
"failed to account for supplier constraints"
],
"duration_seconds": 12.5,
"token_usage": {
"input_tokens": 5200,
"output_tokens": 1800
}
}
],
"tenant_context": {
"industry": "manufacturing",
"company_size": "mid-market",
"typical_lead_times_days": 14,
"seasonal_patterns": true,
"seasonality_description": "Q4 spike 30%, Q1 decline 15%",
"recent_anomalies": [
"Supplier strike in Nov (7 days)",
"Demand spike 25% in Oct"
]
},
"constraints": [
"Must stay under 6000 input tokens",
"Output must be valid JSON",
"Must consider at least 3 historical patterns"
]
}

Response (200 OK):
{
"refined_prompt": "string (improved prompt text)",
"changes_made": [
"Added seasonal adjustment clause referencing Q4 peaks",
"Strengthened lead time consideration with concrete examples",
"Added constraint checking for supplier delays"
],
"refinement_metadata": {
"changes_count": 3,
"tokens_added": 450,
"prompt_version": "v1.2.4"
},
"improvement_forecast": {
"expected_accuracy_improvement": 0.12,
"expected_duration_increase_percent": 5,
"expected_token_increase_percent": 8
},
"reasoning": "Added specific tenant context patterns identified in failure modes"
}

Error (400 Bad Request):
{
"error": "Missing required field: task_description",
"validation_errors": [...]
}

Endpoint 2: POST /inject-context

Purpose: Enrich a static prompt template with tenant-specific context

Request:
{
"base_prompt": "string (static template)",
"tenant_id": "uuid",
"scenario_type": "sop|cfo_finance|capex",
"historical_performance": {
"forecast_accuracy_mape": 0.087,
"typical_forecast_variance": 0.12,
"anomaly_frequency_percent": 0.05,
"improvement_trend_percent": 2.5
},
"data_freshness": {
"latest_data_age_days": 2,
"typical_update_frequency_days": 7,
"data_quality_score": 0.92
},
"tenant_size_metrics": {
"annual_revenue_millions": 250,
"number_of_products": 45,
"number_of_markets": 8
}
}

Response (200 OK):
{
"enriched_prompt": "string (template + injected context)",
"context_sections": {
"historical_performance": true,
"seasonal_patterns": true,
"recent_events": true,
"risk_factors": true
},
"metadata": {
"base_tokens": 1200,
"context_tokens": 450,
"total_tokens": 1650,
"injection_sections_count": 4
},
"effectiveness_score": 0.88
}

Endpoint 3: POST /track-execution

Purpose: Record execution outcome for learning/refinement

Request:
{
"execution_id": "uuid",
"prompt_version": "v1.2.4",
"scenario_type": "sop",
"tenant_id": "uuid",
"execution_date": "2025-11-05T14:30:00Z",
"execution_duration_seconds": 14.2,
"token_usage": {
"input_tokens": 5650,
"output_tokens": 1950
},
"actual_outcome": {
"forecast_actual": 15000,
"forecast_predicted": 16200,
"accuracy_percent": 92
},
"constraints_violated": 0,
"notes": "Prompt version 1.2.4 performed well on this execution"
}

Response (201 Created):
{
"tracked": true,
"execution_record_id": "exec-12345",
"data_stored": {
"execution_metadata": "stored",
"performance_metrics": "stored",
"ready_for_refinement": true
}
}

4.2 Planning Agent Integration Pattern

/**
* S&OP Planning Agent Integration with Meta-Prompting Service
*/

async function generateSopPlanningPrompt(tenantId, cycleId, tenantContext) {
// Step 1: Load static base template
const baseTemplate = STATIC_SOP_PLANNING_TEMPLATE;

console.log(`[${tenantId}] Generating S&OP planning prompt`);

// Step 2: Call meta-prompting service to inject tenant context
const enrichedPromptResponse = await callMetaPromptService({
endpoint: '/inject-context',
payload: {
base_prompt: baseTemplate,
tenant_id: tenantId,
scenario_type: 'sop',
historical_performance: tenantContext.performance,
data_freshness: tenantContext.freshness,
tenant_size_metrics: tenantContext.size
}
});

const enrichedPrompt = enrichedPromptResponse.enriched_prompt;

console.log(
`[${tenantId}] Context injected`,
`Base: ${enrichedPromptResponse.metadata.base_tokens} tokens`,
`+ Context: ${enrichedPromptResponse.metadata.context_tokens} tokens`
);

// Step 3: Append current cycle data
const cycleData = await getCurrentCycleData(tenantId, cycleId);
const finalPrompt = enrichedPrompt + '\n\n' + JSON.stringify(cycleData);

return {
finalPrompt,
promptVersion: 'v1.2.4',
promptSource: 'meta-prompt-service',
enrichmentMetadata: enrichedPromptResponse.metadata
};
}

/**
* After Planning Phase: Track execution for learning
*/

async function trackSopPlanningExecution(
executionId,
promptVersion,
tenantId,
planningResult,
actualOutcome
) {
const accuracy = calculateAccuracy(planningResult.forecast, actualOutcome);

console.log(
`[${tenantId}] Tracking S&OP execution`,
`Prompt: ${promptVersion}`,
`Accuracy: ${(accuracy * 100).toFixed(1)}%`
);

// Call meta-prompting service to record execution
await callMetaPromptService({
endpoint: '/track-execution',
payload: {
execution_id: executionId,
prompt_version: promptVersion,
scenario_type: 'sop',
tenant_id: tenantId,
execution_date: now().toISOString(),
execution_duration_seconds: planningResult.duration || 12.5,
token_usage: planningResult.tokenUsage || {},
actual_outcome: {
forecast_actual: actualOutcome.actual,
forecast_predicted: planningResult.forecast,
accuracy_percent: accuracy * 100
},
constraints_violated: planningResult.violations?.length || 0
}
});
}

/**
* Prompt Refinement Loop (runs post-deployment)
*/

async function refineSopPromptIfNeeded(tenantId, executionHistory) {
const recentAccuracy = calculateAverageAccuracy(executionHistory);

console.log(
`[${tenantId}] S&OP accuracy: ${(recentAccuracy * 100).toFixed(1)}%`
);

// Trigger refinement if accuracy < 70%
if (recentAccuracy < 0.70) {
console.log(`[${tenantId}] Triggering prompt refinement (accuracy < 70%)`);

const refinementResponse = await callMetaPromptService({
endpoint: '/refine-prompt',
payload: {
initial_prompt: STATIC_SOP_PLANNING_TEMPLATE,
task_description:
`S&OP demand forecasting for ${tenantId}. ` +
`Recent accuracy: ${(recentAccuracy * 100).toFixed(1)}%. ` +
`Primary failures: ${extractFailureModes(executionHistory).join(', ')}`,
execution_history: executionHistory.map(e => ({
prompt_version: e.promptVersion,
accuracy_score: e.accuracy,
execution_date: e.executedAt,
failure_modes: e.failureModes || [],
duration_seconds: e.duration,
token_usage: e.tokenUsage
})),
tenant_context: {
industry: 'manufacturing',
company_size: 'mid-market',
typical_lead_times_days: 14,
seasonal_patterns: true,
recent_anomalies: extractRecentEvents(tenantId)
}
}
});

// Store refined prompt version
const newPromptVersion = generatePromptVersion();
await promptVersionRepository.create({
tenantId,
scenarioType: 'sop',
promptVersion: newPromptVersion,
promptText: refinementResponse.refined_prompt,
parentVersion: STATIC_VERSION,
changeLog: refinementResponse.changes_made,
expectedImprovement: refinementResponse.improvement_forecast.expected_accuracy_improvement,
appliesFrom: now() + ONE_DAY
});

console.log(
`[${tenantId}] Refined prompt saved as ${newPromptVersion}`,
`Expected improvement: +${(refinementResponse.improvement_forecast.expected_accuracy_improvement * 100).toFixed(1)}%`
);
}
}

/**
* Helper: Call Meta-Prompting Service
*/

async function callMetaPromptService({ endpoint, payload }) {
const baseUrl = process.env.META_PROMPT_SERVICE_URL || 'http://localhost:5002';
const url = `${baseUrl}${endpoint}`;

try {
const response = await fetch(url, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload),
timeout: 30000 // 30 second timeout
});

if (!response.ok) {
throw new Error(
`Meta-Prompt Service error (${response.status}): ${response.statusText}`
);
}

return await response.json();
} catch (error) {
console.error(`Meta-Prompt Service call failed: ${error.message}`);
throw error;
}
}

AMENDMENT 5: Financial Data Architecture Clarification

Issue: Unclear how product-level vs company-level financial data flows into CFO scenario.

Fix: Specify complete data assembly logic for CFO scenario.

5.1 Financial Data Model

FINANCIAL DATA HIERARCHY:

Product-Level Financial Data (from sop_plan_data):
├─ Source: sales_and_operations.csv (TIER A)
├─ Granularity: Product × Period
├─ Columns: revenue_plan, revenue_actual, cogs_plan, cogs_actual
└─ Use Case: Detailed product profitability analysis

Company-Level Financial Data (from financial_summary):
├─ Source: financial_statement.csv (TIER B)
├─ Granularity: Company × Period
├─ Columns: revenue_plan, revenue_actual, cogs_plan, cogs_actual, gross_margin
└─ Use Case: Consolidated P&L when product breakdown not available

Operating Expenses (from operating_expense_actuals):
├─ Source: operating_expenses.csv (TIER B)
├─ Granularity: Expense Category × Period
├─ Columns: amount_plan, amount_actual
└─ Use Case: Departmental/functional cost analysis

EXAMPLE DATA FLOW:

Scenario 1: Customer has product breakdown
────────────────────────────────────────────
sales_and_operations.csv:
[
{ period: '2025-10-01', product_sku: 'SKU-001', revenue_actual: 50000, cogs_actual: 30000 },
{ period: '2025-10-01', product_sku: 'SKU-002', revenue_actual: 75000, cogs_actual: 40000 }
]

↓ (Import)

sop_plan_data:
product_id=1, revenue_actual=50000, cogs_actual=30000
product_id=2, revenue_actual=75000, cogs_actual=40000

↓ (CFO Query)

Aggregation:
Total Revenue = 50000 + 75000 = 125000
Total COGS = 30000 + 40000 = 70000
Gross Margin = (125000 - 70000) / 125000 = 44%


Scenario 2: Customer has only consolidated financials
────────────────────────────────────────────────────────
financial_statement.csv:
[
{ period: '2025-10-01', revenue_actual: 125000, cogs_actual: 70000, gross_margin_actual: 0.44 }
]

↓ (Import)

financial_summary:
period='2025-10-01', revenue_actual=125000, cogs_actual=70000, gross_margin_actual=0.44

↓ (CFO Query)

Direct use:
Total Revenue = 125000
Total COGS = 70000
Gross Margin = 0.44

5.2 CSV Import Routing Logic

/**
* Determine how to import financial CSV based on structure
*/

async function importFinancialStatementCsv(tenantId, csvFile) {
// Step 1: Parse CSV
const csvRows = await parseCsvFile(csvFile);

// Step 2: Detect structure
const structure = detectFinancialStructure(csvRows);
// structure.hasProductBreakdown: boolean
// structure.dataType: 'product_level' | 'consolidated'

console.log(
`[${tenantId}] Detected financial CSV structure:`,
structure
);

if (structure.hasProductBreakdown) {
// Product-level data: import to sop_plan_data
console.log(`[${tenantId}] Importing as product-level financial data`);

return await importFinancialToSopPlanData(tenantId, csvRows, {
dataSource: 'FINANCIAL_STATEMENT_CSV',
importedAs: 'product_level'
});
} else {
// Consolidated data: import to financial_summary
console.log(`[${tenantId}] Importing as company-level financial data`);

return await importFinancialToFinancialSummary(tenantId, csvRows, {
dataSource: 'FINANCIAL_STATEMENT_CSV',
importedAs: 'consolidated'
});
}
}

/**
* Detect whether CSV has product-level breakdown
*/

function detectFinancialStructure(csvRows) {
if (csvRows.length === 0) {
throw new ValidationError('CSV file is empty');
}

// Check for product_sku column
const headers = Object.keys(csvRows[0]);
const hasProductSku = headers.includes('product_sku');

// Check if product_sku values are unique (not all 'CONSOLIDATED')
let uniqueSkus = new Set();
let consolidatedCount = 0;

for (const row of csvRows) {
const sku = row.product_sku || 'CONSOLIDATED';

if (sku === 'CONSOLIDATED') {
consolidatedCount++;
} else {
uniqueSkus.add(sku);
}
}

const hasBreakdown =
hasProductSku &&
uniqueSkus.size > 1 && // More than 1 unique non-consolidated SKU
consolidatedCount === 0; // No explicit CONSOLIDATED rows

return {
hasProductBreakdown: hasBreakdown,
dataType: hasBreakdown ? 'product_level' : 'consolidated',
detectionReason: hasBreakdown
? `${uniqueSkus.size} unique product SKUs detected`
: 'No product breakdown or consolidated data detected',
uniqueSkus: Array.from(uniqueSkus),
consolidatedRowCount: consolidatedCount
};
}

/**
* Import product-level financial data to sop_plan_data
*/

async function importFinancialToSopPlanData(
tenantId,
csvRows,
metadata
) {
// Resolve product SKUs to entities
const uniqueSkus = [...new Set(csvRows.map(r => r.product_sku))];
const skuToEntityMap = await resolveProductSkus(tenantId, uniqueSkus);

// Transform rows
const planDataUpdates = csvRows.map(row => ({
tenant_id: tenantId,
product_id: skuToEntityMap[row.product_sku].entityId,
period: parseDate(row.period),
// Update financial fields
revenue_plan: row.revenue_plan ? parseFloat(row.revenue_plan) : null,
revenue_actual: row.revenue_actual ? parseFloat(row.revenue_actual) : null,
cogs_plan: row.cogs_plan ? parseFloat(row.cogs_plan) : null,
cogs_actual: row.cogs_actual ? parseFloat(row.cogs_actual) : null,
data_source: metadata.dataSource
}));

// Upsert: if product+period exists, update; otherwise insert
for (const update of planDataUpdates) {
await db.query(`
INSERT INTO sop_plan_data (
tenant_id, product_id, period,
revenue_plan, revenue_actual, cogs_plan, cogs_actual,
data_source, updated_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, NOW())
ON CONFLICT (tenant_id, product_id, period)
DO UPDATE SET
revenue_plan = $4,
revenue_actual = $5,
cogs_plan = $6,
cogs_actual = $7,
data_source = $8,
updated_at = NOW()
`, [
update.tenant_id,
update.product_id,
update.period,
update.revenue_plan,
update.revenue_actual,
update.cogs_plan,
update.cogs_actual,
update.data_source
]);
}

return {
rowsImported: planDataUpdates.length,
importedTo: 'sop_plan_data',
uniqueProducts: uniqueSkus.length,
status: 'SUCCESS'
};
}

/**
* Import consolidated financial data to financial_summary
*/

async function importFinancialToFinancialSummary(
tenantId,
csvRows,
metadata
) {
const summaryRows = csvRows.map(row => ({
tenant_id: tenantId,
period: parseDate(row.period),
revenue_plan: row.revenue_plan ? parseFloat(row.revenue_plan) : null,
revenue_actual: row.revenue_actual ? parseFloat(row.revenue_actual) : null,
cogs_plan: row.cogs_plan ? parseFloat(row.cogs_plan) : null,
cogs_actual: row.cogs_actual ? parseFloat(row.cogs_actual) : null,
gross_margin_plan: row.gross_margin_plan ? parseFloat(row.gross_margin_plan) : null,
gross_margin_actual: row.gross_margin_actual ? parseFloat(row.gross_margin_actual) : null,
data_source: metadata.dataSource
}));

// Upsert
for (const summary of summaryRows) {
await db.query(`
INSERT INTO financial_summary (
tenant_id, period,
revenue_plan, revenue_actual,
cogs_plan, cogs_actual,
gross_margin_plan, gross_margin_actual,
data_source, updated_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, NOW())
ON CONFLICT (tenant_id, period)
DO UPDATE SET
revenue_plan = $3,
revenue_actual = $4,
cogs_plan = $5,
cogs_actual = $6,
gross_margin_plan = $7,
gross_margin_actual = $8,
data_source = $9,
updated_at = NOW()
`, [
summary.tenant_id,
summary.period,
summary.revenue_plan,
summary.revenue_actual,
summary.cogs_plan,
summary.cogs_actual,
summary.gross_margin_plan,
summary.gross_margin_actual,
summary.data_source
]);
}

return {
rowsImported: summaryRows.length,
importedTo: 'financial_summary',
status: 'SUCCESS'
};
}

5.3 CFO Agent Data Assembly

/**
* Assemble data for CFO Planning Agent
* Handles both product-level and consolidated data
*/

async function assembleCfoAgentInputData(tenantId, cycleId) {
// Check which financial data exists
const hasProductLevel = await checkSopPlanDataExists(tenantId, cycleId);
const hasConsolidated = await checkFinancialSummaryExists(tenantId, cycleId);

let financialData = {};

if (hasProductLevel) {
// Build from product-level sop_plan_data
financialData = await buildFinancialFromProductLevel(tenantId, cycleId);
} else if (hasConsolidated) {
// Use consolidated financial_summary
financialData = await buildFinancialFromConsolidated(tenantId, cycleId);
} else {
throw new Error(
`No financial data found for tenant ${tenantId} in cycle ${cycleId}`
);
}

// Get operating expenses
const expenses = await getOperatingExpenses(tenantId, cycleId);

// Get S&OP scenario results (if available, for dependency)
const sopResults = await getSopScenarioResults(cycleId);

return {
financial_data: financialData,
operating_expenses: expenses,
sop_context: sopResults
? {
demand_forecast: sopResults.planning_phase_result.demand_forecast,
production_plan: sopResults.planning_phase_result.production_plan
}
: null
};
}

/**
* Build financial data from product-level sop_plan_data
*/

async function buildFinancialFromProductLevel(tenantId, cycleId) {
const result = await db.query(`
SELECT
period,
SUM(revenue_plan) as total_revenue_plan,
SUM(revenue_actual) as total_revenue_actual,
SUM(cogs_plan) as total_cogs_plan,
SUM(cogs_actual) as total_cogs_actual
FROM sop_plan_data
WHERE tenant_id = $1
AND cycle_id = $2
GROUP BY period
ORDER BY period
`, [tenantId, cycleId]);

const byPeriod = {};
for (const row of result.rows) {
byPeriod[row.period] = {
revenue_plan: row.total_revenue_plan,
revenue_actual: row.total_revenue_actual,
cogs_plan: row.total_cogs_plan,
cogs_actual: row.total_cogs_actual,
gross_margin_plan: row.total_revenue_plan > 0
? (row.total_revenue_plan - row.total_cogs_plan) / row.total_revenue_plan
: 0,
gross_margin_actual: row.total_revenue_actual > 0
? (row.total_revenue_actual - row.total_cogs_actual) / row.total_revenue_actual
: 0,
data_source: 'aggregated_from_sop_plan_data'
};
}

return byPeriod;
}

/**
* Build financial data from consolidated financial_summary
*/

async function buildFinancialFromConsolidated(tenantId, cycleId) {
const result = await db.query(`
SELECT
period,
revenue_plan,
revenue_actual,
cogs_plan,
cogs_actual,
gross_margin_plan,
gross_margin_actual
FROM financial_summary
WHERE tenant_id = $1
AND period IN (SELECT DISTINCT period FROM sop_cycles WHERE id = $2)
ORDER BY period
`, [tenantId, cycleId]);

const byPeriod = {};
for (const row of result.rows) {
byPeriod[row.period] = {
revenue_plan: row.revenue_plan,
revenue_actual: row.revenue_actual,
cogs_plan: row.cogs_plan,
cogs_actual: row.cogs_actual,
gross_margin_plan: row.gross_margin_plan,
gross_margin_actual: row.gross_margin_actual,
data_source: 'financial_summary'
};
}

return byPeriod;
}

AMENDMENT 6: Adaptive Event Detection Thresholds

Issue: Fixed thresholds (20%, 15%, 25%) don't account for tenant-specific volatility.

Fix: Implement adaptive thresholds based on historical patterns.

6.1 Adaptive Threshold Calculation

/**
* Calculate adaptive event detection threshold
*
* Accounts for:
* - Historical volatility (standard deviation)
* - Seasonal patterns
* - Tenant size and industry
*/

async function calculateAdaptiveThreshold(tenantId, metric, days = 365) {
// Step 1: Get historical data
const history = await db.query(`
SELECT value, period
FROM historical_metrics
WHERE tenant_id = $1
AND metric_type = $2
AND period >= NOW() - INTERVAL '${days} days'
ORDER BY period
`, [tenantId, metric]);

if (history.rows.length < 12) {
// Insufficient data: use conservative defaults
return {
threshold_percent: 0.20, // 20%
confidence: 0.3,
reason: 'Insufficient historical data (< 12 months)',
fallback_threshold: true
};
}

// Step 2: Calculate volatility (coefficient of variation)
const values = history.rows.map(r => r.value);
const mean = average(values);
const stdDev = standardDeviation(values);
const cv = stdDev / mean;

// Step 3: Base threshold = 2.5 standard deviations (covers ~99% of normal variations)
let threshold = 2.5 * cv;

// Step 4: Adjust for seasonality
const seasonality = detectSeasonality(history.rows);
if (seasonality.present) {
const { current_season, peak_variance, trough_variance } = seasonality;

// During peak season, allow more variance
if (current_season === 'PEAK') {
threshold *= (1 + peak_variance / 2);
}
// During trough, less tolerance for changes
else if (current_season === 'TROUGH') {
threshold *= (1 - trough_variance / 4);
}

console.log(
`[${tenantId}] Seasonality adjustment: ${current_season}`,
`threshold multiplier: ${(current_season === 'PEAK' ? 1 + peak_variance / 2 : 1 - trough_variance / 4).toFixed(2)}`
);
}

// Step 5: Adjust for tenant size
if (mean < 1000000) {
// Small tenant: more volatile, increase threshold
threshold *= 1.3;
} else if (mean > 100000000) {
// Large tenant: more stable, decrease threshold
threshold *= 0.8;
}

// Step 6: Ensure minimum threshold (prevent alert fatigue)
threshold = Math.max(threshold, 0.05); // Minimum 5%
// And maximum threshold (ensure we detect real issues)
threshold = Math.min(threshold, 0.50); // Maximum 50%

return {
threshold_percent: threshold,
confidence: Math.min(history.rows.length / 365, 1.0),
calculation_basis: {
coefficient_of_variation: cv.toFixed(4),
standard_deviations: 2.5,
seasonality_adjusted: seasonality.present,
tenant_size_adjusted: true
},
historical_range: {
min: Math.min(...values),
max: Math.max(...values),
mean: mean
}
};
}

/**
* Detect seasonality patterns in historical data
*/

function detectSeasonality(history) {
if (history.length < 24) { // Need at least 2 years
return { present: false };
}

// Calculate month-over-month statistics
const byMonth = {};
for (const record of history) {
const month = getMonth(record.period);

if (!byMonth[month]) byMonth[month] = [];
byMonth[month].push(record.value);
}

// Calculate mean and variance per month
const monthlyStats = Object.entries(byMonth).map(([month, values]) => ({
month,
mean: average(values),
variance: variance(values),
coeff_variation: standardDeviation(values) / average(values)
}));

// Find peak and trough months
const peakMonth = monthlyStats.reduce((a, b) => a.mean > b.mean ? a : b);
const troughMonth = monthlyStats.reduce((a, b) => a.mean < b.mean ? a : b);

const seasonalityRatio = peakMonth.mean / troughMonth.mean;

// If peak is >20% higher than trough, consider seasonality present
if (seasonalityRatio > 1.2) {
return {
present: true,
peak_month: peakMonth.month,
peak_variance: peakMonth.coeff_variation,
trough_month: troughMonth.month,
trough_variance: troughMonth.coeff_variation,
seasonality_strength: seasonalityRatio
};
}

return { present: false };
}

/**
* Detect event from data change using adaptive threshold
*/

async function detectEventFromDataChange(event, tenantId) {
// Identify metric type (demand, revenue, cost, etc.)
const metric = identifyMetricType(event);

// Get adaptive threshold
const thresholdConfig = await calculateAdaptiveThreshold(tenantId, metric);

// Calculate change percentage
const change = (event.new_value - event.old_value) / event.old_value;
const changePercent = Math.abs(change);

console.log(
`[${tenantId}] Data change detected`,
`Metric: ${metric}`,
`Change: ${(changePercent * 100).toFixed(1)}%`,
`Threshold: ${(thresholdConfig.threshold_percent * 100).toFixed(1)}%`
);

// Determine if this is an event
const isEvent = changePercent > thresholdConfig.threshold_percent;

if (isEvent) {
// Classify severity
const severity = classifyEventSeverity(
changePercent,
thresholdConfig.threshold_percent
);

return {
is_event: true,
severity: severity,
change_percent: changePercent,
threshold_percent: thresholdConfig.threshold_percent,
metric: metric,
threshold_config: thresholdConfig
};
}

return { is_event: false };
}

/**
* Classify event severity
*/

function classifyEventSeverity(changePercent, threshold) {
const ratio = changePercent / threshold;

if (ratio > 3) return 'CRITICAL'; // 3x threshold
if (ratio > 2) return 'HIGH'; // 2x threshold
if (ratio > 1.5) return 'MEDIUM'; // 1.5x threshold
return 'LOW';
}

AMENDMENT 7: Complete Cache Invalidation Matrix

Issue: Cache invalidation rules incomplete; missing configuration changes, API failures, user actions.

Fix: Define comprehensive invalidation rules for all change types.

7.1 Complete Invalidation Rule Set

CACHE_INVALIDATION_RULES:

# Data Changes (Tier A/B/C Data)
data_changes:
sop_sales_data_updated:
invalidates: [sop, cfo_finance]
reason: Affects demand forecast and revenue
trigger_type: DATA_UPDATE

sop_inventory_updated:
invalidates: [sop]
reason: Affects inventory plan
trigger_type: DATA_UPDATE

sop_production_updated:
invalidates: [sop]
reason: Affects production constraints
trigger_type: DATA_UPDATE

financial_revenue_updated:
invalidates: [cfo_finance]
reason: Core CFO metric changed
trigger_type: DATA_UPDATE

financial_expense_updated:
invalidates: [cfo_finance, capex]
reason: Affects cash flow and capital availability
trigger_type: DATA_UPDATE

capex_proposal_changed:
invalidates: [capex]
reason: Portfolio changed
trigger_type: DATA_UPDATE

asset_utilization_updated:
invalidates: [capex]
reason: Capacity analysis affected
trigger_type: DATA_UPDATE

# Configuration Changes
config_changes:
corporate_objectives_updated:
invalidates: [cfo_finance, capex]
reason: Strategic priorities changed
trigger_type: CONFIG_CHANGE
action: RE_RUN_CONTROL_PHASE_ONLY
detail: Planning results still valid; constraints may have changed

constraint_definitions_modified:
invalidates: [ALL]
reason: Constraint set changed
trigger_type: CONFIG_CHANGE
action: RE_RUN_CONTROL_PHASE_ONLY
detail: Re-validate planning results against new constraints

risk_tolerance_changed:
invalidates: [capex]
reason: Risk filtering criteria changed
trigger_type: CONFIG_CHANGE

subscription_tier_upgraded:
invalidates: []
action: ACTIVATE_NEW_SCENARIOS
reason: New scenarios now available
trigger_type: SUBSCRIPTION_CHANGE
example: Upgrade TIER_A → TIER_B unlocks cfo_finance scenario

# User Actions
user_actions:
user_edited_forecast:
invalidates: [sop, cfo_finance]
reason: Manual override of planning result
trigger_type: USER_ACTION

user_approved_plan_with_violations:
invalidates: []
action: MARK_AS_APPROVED_OVERRIDE
reason: Human approved despite violations
trigger_type: USER_DECISION

user_rejected_plan:
invalidates: [affected_scenario]
action: MARK_AS_REJECTED, TRIGGER_REPLAN
reason: User requested new execution
trigger_type: USER_DECISION

user_manually_triggered_rerun:
invalidates: [requested_scenario]
reason: User explicitly requested re-execution
trigger_type: USER_ACTION

# System Events
system_events:
api_sync_failed:
invalidates: []
action: MARK_AS_STALE_WITH_WARNING
fallback: Use last successful data (max 7 days old)
retry_strategy: Exponential backoff, max 3 retries
trigger_type: SYSTEM_ERROR

llm_api_timeout:
invalidates: []
action: RETRY_WITH_BACKOFF
retry_config:
max_attempts: 3
initial_backoff_seconds: 5
max_backoff_seconds: 60
fallback: Return cached result + "Generating..." badge
trigger_type: SYSTEM_ERROR

database_connection_error:
invalidates: []
action: CIRCUIT_BREAKER
behavior: Return stale cached result, fail gracefully
trigger_type: SYSTEM_ERROR

meta_prompt_service_unavailable:
invalidates: []
action: USE_STATIC_PROMPT
reason: Fall back to un-enriched base prompt
trigger_type: SYSTEM_ERROR

# Time-Based
time_based:
cache_ttl_exceeded:
condition: cached_at < now() - 24 hours AND cache_valid = true
action: MARK_STALE but still serve
behavior: Return result with "Last updated X hours ago" badge
background: Enqueue low-priority refresh job
trigger_type: SCHEDULED

7.2 Cache Invalidation Implementation

/**
* Process cache invalidation event
*/

async function processCacheInvalidationEvent(event) {
const { tenantId, changeType, affectedData } = event;

// Look up invalidation rule
const rule = INVALIDATION_RULES[changeType];

if (!rule) {
console.warn(`[${tenantId}] Unknown invalidation event: ${changeType}`);
return;
}

console.log(
`[${tenantId}] Cache invalidation`,
`Event: ${changeType}`,
`Invalidates: [${rule.invalidates.join(', ')}]`
);

const invalidated = [];

// Update cache validity for affected scenarios
for (const scenario of rule.invalidates) {
await db.query(`
UPDATE scenario_execution_results
SET
cache_valid = false,
cache_invalidated_at = NOW(),
cache_invalidation_reason = $1
WHERE tenant_id = $2
AND scenario_type = $3
AND cache_valid = true
`, [changeType, tenantId, scenario]);

invalidated.push(scenario);

// If rule says "RE_RUN_CONTROL_PHASE_ONLY", mark for partial re-execution
if (rule.action === 'RE_RUN_CONTROL_PHASE_ONLY') {
await db.query(`
UPDATE scenario_execution_results
SET partial_rerun_needed = 'CONTROL_PHASE_ONLY'
WHERE tenant_id = $1 AND scenario_type = $2
`, [tenantId, scenario]);
}
}

// Enqueue re-execution jobs for invalidated scenarios
for (const scenario of invalidated) {
const priority = rule.trigger_type === 'DATA_UPDATE' ? 'HIGH' : 'NORMAL';

await pgmq.enqueue('scenario_execution_queue', {
jobId: uuid(),
tenantId,
scenarioType: scenario,
triggerReason: changeType,
rereason_action: rule.action || 'FULL_RERUN',
enqueuedAt: now(),
priority
});
}

// Notify frontend of invalidation
await notificationService.broadcastCacheInvalidation({
tenantId,
invalidatedScenarios: invalidated,
reason: changeType,
message: `Recalculating ${invalidated.join(', ')} based on data update...`
});

return { invalidated, rule };
}

/**
* Handle special actions (CONFIG changes, upgrades, etc.)
*/

async function handleSpecialCacheInvalidationAction(event) {
const { tenantId, changeType, action } = event;

switch (action) {
case 'RE_RUN_CONTROL_PHASE_ONLY':
// Planning result still valid; re-validate against constraints
await runControlPhaseOnlyRevalidation(tenantId, event.scenario);
break;

case 'MARK_AS_APPROVED_OVERRIDE':
// User explicitly approved despite violations
await db.query(`
UPDATE scenario_execution_results
SET
user_approved_at = NOW(),
user_approval_notes = $1,
cache_valid = true
WHERE tenant_id = $2
`, [event.userNotes, tenantId]);
break;

case 'ACTIVATE_NEW_SCENARIOS':
// Tier upgrade: unlock new scenarios for immediate execution
const newScenarios = event.newlyActiveScenarios;
for (const scenario of newScenarios) {
await enqueueScenarioForExecution(tenantId, scenario, 'TIER_UPGRADE');
}
break;

case 'USE_STATIC_PROMPT':
// Service unavailable: fall back to base template
console.log(
`[${tenantId}] Using static prompt fallback for ${event.scenario}`
);
break;
}
}

AMENDMENT 8: User Approval Workflow API

Issue: System can produce results with requires_approval: true but no approval API defined.

Fix: Add complete approval workflow with API and UI flow.

8.1 Approval Workflow Endpoints

POST /api/scenarios/{scenarioType}/results/{resultId}/approve

Purpose: Approve a scenario result despite violations

Request:
{
"action": "APPROVE" | "REJECT" | "REQUEST_RERUN",
"user_notes": "string (optional)",
"override_critical_violations": [
"VIOLATION_ID_1",
"VIOLATION_ID_2"
],
"timestamp": "ISO8601"
}

Response (200 OK):
{
"result_id": "uuid",
"approval_status": "APPROVED",
"approved_at": "timestamp",
"approved_by_user_id": "uuid",
"approved_by_email": "user@company.com",
"override_violations_count": 2,
"dependent_scenarios_unlocked": ["capex"],
"next_steps": {
"message": "CapEx scenario can now execute",
"action": "View CapEx results",
"link": "/scenarios/capex/results"
}
}

Error (400 Bad Request):
{
"error": "Cannot approve: attempt to override non-overrideable violation",
"violation_id": "CRITICAL_BUDGET_CONSTRAINT",
"violation_detail": "Total investment exceeds corporate budget by $5M"
}

GET /api/scenarios/{resultId}/approval-status

Purpose: Check current approval status

Response:
{
"requires_approval": true,
"approval_status": "PENDING",
"submitted_by_system_at": "timestamp",
"critical_violations": [
{
"violation_id": "SUPPLY_CONSTRAINT",
"severity": "CRITICAL",
"description": "Production exceeds max capacity",
"impact": "Plan violates manufacturing constraint",
"overrideable": true,
"remediation_suggestion": "Reduce production by 500 units or invest in capacity"
}
],
"medium_violations": [...],
"approvers_notified": ["cfo@company.com", "coo@company.com"],
"approval_deadline": "2025-11-06T12:00:00Z",
"allow_auto_fallback": false
}

DELETE /api/scenarios/{resultId}/approval-override

Purpose: Withdraw an approval override and return to flagged state

Response:
{
"status": "OVERRIDE_WITHDRAWN",
"scenario_status": "FLAGGED",
"message": "Plan returned to flagged state. Violations are active again."
}

8.2 Approval UI Flow

USER SEES RESULT WITH VIOLATIONS:

┌──────────────────────────────────────┐
│ S&OP Planning Result │
│ Status: ⚠️ REQUIRES APPROVAL │
│ │
│ Critical Violations: 1 │
│ ├─ Production exceeds capacity │
│ │ Impact: Cannot manufacture │
│ │ [Fix] [Override] [View Details] │
│ │
│ Medium Violations: 2 │
│ ├─ Forecast variance high │
│ ├─ Lead time extends timeline │
│ │
│ [Approve Plan] [Request Changes] │
└──────────────────────────────────────┘

CLICK "Approve Plan":

┌──────────────────────────────────────┐
│ APPROVAL CONFIRMATION │
│ │
│ You are approving this plan despite │
│ 1 critical violation: │
│ │
│ ✓ Production exceeds capacity │
│ → Exceeds by 500 units │
│ → Remediation: Invest in tooling │
│ │
│ By approving, you confirm: │
│ □ Violation impact understood │
│ □ Mitigation plan in place │
│ │
│ Additional notes: │
│ [Text area: optional user notes] │
│ │
│ [APPROVE & PROCEED] [CANCEL] │
└──────────────────────────────────────┘

AFTER APPROVAL:

┌──────────────────────────────────────┐
│ S&OP Planning Result │
│ Status: ✅ APPROVED │
│ Approved by: john.doe@company.com │
│ Approved at: 2025-11-05 14:30:00 │
│ │
│ ℹ️ Approval overrode 1 critical │
│ violation. This decision is │
│ tracked in audit log. │
│ │
│ Downstream Impact: │
│ → CFO scenario can now execute │
│ → CapEx scenario waiting for CFO │
│ │
│ [View Audit Log] [View Full Plan] │
└──────────────────────────────────────┘

8.3 Audit Logging

/**
* Log approval decision for audit trail
*/

async function auditApprovalDecision(event) {
const {
resultId,
userId,
userEmail,
action,
overriddenViolations,
userNotes
} = event;

await db.query(`
INSERT INTO approval_audit_log (
result_id,
user_id,
user_email,
action,
overridden_violation_count,
user_notes,
timestamp
) VALUES ($1, $2, $3, $4, $5, $6, NOW())
`, [
resultId,
userId,
userEmail,
action,
overriddenViolations.length,
userNotes || null
]);

console.log(
`[AUDIT] Approval decision`,
`Result: ${resultId}`,
`User: ${userEmail}`,
`Action: ${action}`,
`Overrides: ${overriddenViolations.length}`
);
}

AMENDMENT 9: Subscription Management APIs

Issue: Subscription tiers defined but no admin APIs for managing them.

Fix: Add subscription CRUD and access control APIs.

9.1 Subscription Management Endpoints

POST /api/admin/subscriptions

Purpose: Create or update tenant subscription

Request:
{
"tenant_id": "uuid",
"tier": "TIER_A" | "TIER_B" | "TIER_C",
"effective_date": "2025-11-01",
"billing_cycle": "monthly" | "annual",
"max_concurrent_users": 10,
"custom_constraints": {...}
}

Response (201 Created):
{
"subscription_id": "uuid",
"tenant_id": "uuid",
"tier": "TIER_B",
"active_scenarios": ["sop", "cfo_finance"],
"data_requirements": {
"required_csv_files": [
"sales_and_operations.csv",
"financial_statement.csv",
"operating_expenses.csv"
],
"minimum_historical_months": 24
},
"effective_date": "2025-11-01",
"created_at": "2025-11-05T12:00:00Z"
}

GET /api/admin/subscriptions/{subscriptionId}

Purpose: Get subscription details

Response:
{
"subscription_id": "uuid",
"tenant_id": "uuid",
"tenant_name": "Acme Corp",
"tier": "TIER_B",
"active_scenarios": ["sop", "cfo_finance"],
"status": "ACTIVE",
"created_at": "2025-09-01T00:00:00Z",
"effective_date": "2025-09-01",
"renewal_date": "2025-12-01",
"billing_cycle": "monthly",
"max_concurrent_users": 10,
"current_user_count": 7,
"data_completeness": {
"sop_data": {
"status": "COMPLETE",
"last_update": "2025-11-05",
"months_historical": 18,
"quality_score": 0.95
},
"financial_data": {
"status": "INCOMPLETE",
"last_update": null,
"months_required": 24
}
}
}

PATCH /api/admin/subscriptions/{subscriptionId}

Purpose: Upgrade, downgrade, or modify subscription

Request:
{
"tier": "TIER_C",
"effective_date": "2025-11-15",
"reason": "Customer requested upgrade"
}

Response:
{
"subscription_id": "uuid",
"tier_changed_from": "TIER_B",
"tier_changed_to": "TIER_C",
"new_scenarios_unlocked": ["capex"],
"additional_data_required": ["capex_proposals.csv", "asset_utilization.csv"],
"new_scenarios_ready_for_execution": false,
"reason_not_ready": "CapEx data not yet provided"
}

GET /api/subscriptions/check-access

Purpose: Check if current tenant/user has access to a scenario

Query:
?scenario=sop
OR
?scenario=cfo_finance

Response:
{
"has_access": true,
"subscription_tier": "TIER_B",
"scenario": "cfo_finance",
"access_reason": "Tier B includes CFO Financial scenario",
"data_completeness": {
"financial_data": "COMPLETE",
"operating_expenses": "COMPLETE",
"timestamp": "2025-11-05T12:00:00Z"
},
"ready_to_execute": true
}

POST /api/admin/subscriptions/{subscriptionId}/notify-upgrade

Purpose: Send tenant notification about new scenarios

Request:
{
"message_type": "NEW_TIER_UNLOCKED",
"include_tutorial": true
}

Response:
{
"notification_sent": true,
"recipients": ["admin@acme.com", "finance@acme.com"],
"message_id": "uuid",
"scheduled_tutorial": true
}

9.2 Access Control Middleware

/**
* Middleware: Check subscription tier for scenario access
*/

async function checkScenarioAccess(req, res, next) {
const { tenantId } = req.user;
const { scenarioType } = req.params;

// Get tenant subscription
const subscription = await subscriptionRepository.findByTenantId(tenantId);

if (!subscription) {
return res.status(403).json({
error: 'No active subscription',
message: 'Contact sales@company.com to activate service'
});
}

// Get allowed scenarios for tier
const tierScenarios = {
'TIER_A': ['sop'],
'TIER_B': ['sop', 'cfo_finance'],
'TIER_C': ['sop', 'cfo_finance', 'capex']
};

const allowedScenarios = tierScenarios[subscription.tier];

if (!allowedScenarios.includes(scenarioType)) {
return res.status(403).json({
error: 'Scenario not available for your tier',
current_tier: subscription.tier,
available_scenarios: allowedScenarios,
requested_scenario: scenarioType,
upgrade_required: true,
next_tier: subscription.tier === 'TIER_A' ? 'TIER_B' : 'TIER_C'
});
}

// Check data completeness
const completeness = await checkDataCompleteness(tenantId, scenarioType);

if (!completeness.complete) {
return res.status(412).json({
error: 'Scenario data incomplete',
missing_files: completeness.missing,
setup_instructions: completeness.instructions
});
}

// Access granted
req.subscription = subscription;
next();
}

app.get('/api/scenarios/:scenarioType/results',
authenticate,
checkScenarioAccess,
scenarioResultsHandler
);

AMENDMENT 10: Background Worker Implementation

Issue: Workers mentioned throughout but never specified which framework/library, scaling strategy, or error handling.

Fix: Define complete worker architecture.

10.1 Worker Framework Selection

Choice: node-pg-boss

Why:

  • Uses PostgreSQL as message queue (no external Redis dependency)
  • Built for reliability (ACID-compliant job tracking)
  • Already integrated with existing database
  • Supports delayed jobs, retries, scaling across multiple workers
WORKER_CONFIGURATION:

Framework: node-pg-boss (npm package)
Queue_Database: Existing PostgreSQL (same as app)
Queue_Schema: pgboss.*
Queue_Name: scenario_execution_queue

Worker_Pool:
Development:
instances: 1
max_concurrent_jobs: 1

Production:
instances: 3 per scenario type
sop_workers: 3
cfo_workers: 3
capex_workers: 3

max_concurrent_jobs: 1 (per worker)

scaling_policy: AUTO
scale_up_trigger: queue_depth > 10
scale_down_trigger: queue_depth < 3 (sustained 5 min)
cooldown_period: 5 minutes

10.2 Worker Implementation

/**
* Initialize Background Worker Pool
*/

const PgBoss = require('pg-boss');

async function initializeWorkerPool() {
const boss = new PgBoss({
connectionString: process.env.DATABASE_URL,
schema: 'pgboss',
// ... other config
});

// Listen for scenario execution jobs
boss.subscribe(
'scenario_execution_queue',
{ concurrency: 1 },
processScenarioJob
);

// Listen for maintenance jobs
boss.subscribe(
'cache_refresh_queue',
{ concurrency: 1, throttle: { duration: 60000 } }, // Max 1 per minute
processCacheRefreshJob
);

// Listen for approval notification jobs
boss.subscribe(
'notification_queue',
{ concurrency: 5 },
sendNotificationJob
);

console.log('✅ Background worker pool initialized');

return boss;
}

/**
* Main Job Handler: Process Scenario Execution
*/

async function processScenarioJob(job) {
const {
jobId,
tenantId,
cycleId,
scenarioType,
dependencies,
eventData
} = job.data;

try {
console.log(
`[WORKER] Processing job ${jobId}`,
`Tenant: ${tenantId}, Scenario: ${scenarioType}`
);

// **STEP 1: Check Dependencies**
if (dependencies && dependencies.length > 0) {
const depsComplete = await checkDependenciesComplete(
cycleId,
dependencies
);

if (!depsComplete) {
console.log(
`[WORKER] Job ${jobId} waiting for dependencies:`,
dependencies
);

// Re-queue with exponential backoff
const backoffSeconds = Math.min(60, 10 * Math.pow(2, job.attempt || 0));

await boss.send(
'scenario_execution_queue',
job.data,
{ delay: backoffSeconds }
);

return { status: 'RESCHEDULED', reason: 'Dependencies pending' };
}
}

// **STEP 2: Determine Execution Mode**
const lastExecution = await getLastScenarioExecution(tenantId, scenarioType);
const executionMode = await determineExecutionMode(job, lastExecution);

console.log(`[WORKER] Execution mode: ${executionMode.mode}`, executionMode);

// **STEP 3: Run Planning Phase**
const startTime = Date.now();

const planningResult = await runPlanningPhase(
scenarioType,
tenantId,
cycleId,
{ ...eventData }
);

const planningDuration = (Date.now() - startTime) / 1000;
console.log(`[WORKER] Planning phase completed (${planningDuration}s)`);

// **STEP 4: Run Control Phase**
const controlResult = await runControlPhase(scenarioType, planningResult);

console.log(`[WORKER] Control phase completed`);

// **STEP 5: Store Result**
const result = await storeScenarioResult(
cycleId,
scenarioType,
planningResult,
controlResult,
{
duration_seconds: (Date.now() - startTime) / 1000,
execution_mode: executionMode.mode,
job_id: jobId
}
);

console.log(`[WORKER] Result stored: ${result.id}`);

// **STEP 6: Mark Dependencies as Complete**
await markScenarioComplete(cycleId, scenarioType);

// **STEP 7: Trigger Dependent Scenarios**
const dependents = SCENARIO_DEPENDENCIES[scenarioType].required_by;
for (const dependent of dependents) {
console.log(`[WORKER] Unblocking dependent scenario: ${dependent}`);

// This will wake up any waiting jobs for the dependent
await boss.send('scenario_execution_queue', {
tenantId,
cycleId,
scenarioType: dependent,
dependencies: [scenarioType],
action: 'UNBLOCK'
}, { delay: 2 }); // Small delay to ensure writes are visible
}

// **STEP 8: Notify Frontend**
await notificationService.broadcastScenarioComplete({
tenantId,
scenarioType,
status: result.execution_status,
requiresApproval: result.critical_violations.length > 0
});

console.log(`[WORKER] ✅ Job ${jobId} completed successfully`);

return {
status: 'SUCCESS',
resultId: result.id,
duration_seconds: (Date.now() - startTime) / 1000
};

} catch (error) {
console.error(`[WORKER] ❌ Error processing job ${jobId}:`, error);

// Retry with exponential backoff (max 3 attempts)
const attempt = job.attempt || 0;
if (attempt < 3) {
const backoffSeconds = Math.min(300, 10 * Math.pow(2, attempt));

console.log(
`[WORKER] Retrying job ${jobId} in ${backoffSeconds}s (attempt ${attempt + 1}/3)`
);

// Re-queue with exponential backoff
await boss.send('scenario_execution_queue', job.data, {
delay: backoffSeconds,
retryLimit: 3
});

return { status: 'RETRY_QUEUED', attempt: attempt + 1 };

} else {
// Max retries exceeded
console.error(`[WORKER] Job ${jobId} failed after 3 attempts`);

await markScenarioFailed(
job.data.cycleId,
job.data.scenarioType,
error.message
);

await notificationService.notifyJobFailure({
jobId,
tenantId: job.data.tenantId,
scenarioType: job.data.scenarioType,
error: error.message
});

return { status: 'FAILED', error: error.message };
}
}
}

/**
* Cache Refresh Job Handler
*/

async function processCacheRefreshJob(job) {
const { tenantId, scenarioType } = job.data;

console.log(
`[WORKER] Cache refresh job`,
`Tenant: ${tenantId}, Scenario: ${scenarioType}`
);

// Load last execution
const lastExecution = await getLastScenarioExecution(tenantId, scenarioType);

if (!lastExecution) {
console.log(`[WORKER] No last execution found, skipping refresh`);
return { status: 'SKIPPED', reason: 'No baseline execution' };
}

// Re-run scenario
const result = await executeScenarioFull({
tenantId,
scenarioType,
cycleId: lastExecution.cycleId
});

console.log(`[WORKER] Cache refresh completed for ${scenarioType}`);

return { status: 'REFRESHED', resultId: result.id };
}

/**
* Health Check: Monitor Worker Pool
*/

async function getWorkerPoolHealth(boss) {
const queueStats = await boss.getQueues();

const stats = {
active: await boss.getQueueSize('scenario_execution_queue'),
completed_24h: await boss.getQueueSize('scenario_execution_queue', { state: 'completed' }),
failed_24h: await boss.getQueueSize('scenario_execution_queue', { state: 'failed' }),
timestamp: new Date().toISOString()
};

return stats;
}

// Export for use in main app
module.exports = {
initializeWorkerPool,
getWorkerPoolHealth
};

AMENDMENT 11: Recommendations & Best Practices

11.1 Health Check Endpoint

/**
* System health check endpoint
*/

app.get('/api/health', async (req, res) => {
const health = {
status: 'healthy',
timestamp: new Date().toISOString(),
services: {}
};

try {
// Database health
const dbHealth = await checkDatabaseHealth();
health.services.database = dbHealth;

// Message queue health
const queueHealth = await checkQueueHealth(boss);
health.services.message_queue = queueHealth;

// LLM API health
const llmHealth = await checkLlmApiHealth();
health.services.llm_api = llmHealth;

// Meta-prompting service health
const metaPromptHealth = await checkMetaPromptingServiceHealth();
health.services.meta_prompting_service = metaPromptHealth;

// Worker pool health
health.workers = {
active: queueHealth.active_jobs,
idle: queueHealth.available_workers,
queue_depth: queueHealth.pending_jobs
};

// Determine overall status
const allHealthy = Object.values(health.services).every(s => s.status === 'healthy');
const anyDegraded = Object.values(health.services).some(s => s.status === 'degraded');

if (!allHealthy && anyDegraded) {
health.status = 'degraded';
} else if (!allHealthy) {
health.status = 'down';
}

res.status(allHealthy ? 200 : 503).json(health);

} catch (error) {
res.status(500).json({
status: 'down',
error: error.message
});
}
});

11.2 Data Quality Scoring

/**
* Calculate data quality score for tenant
* Used in insights: "Forecast confidence: 87% (based on 92% data quality)"
*/

async function calculateDataQualityScore(tenantId) {
// Completeness: % of required fields populated
const completeness = await db.query(`
SELECT
COUNT(*) as total_rows,
COUNT(CASE WHEN revenue_actual IS NOT NULL THEN 1 END) as filled_revenue,
COUNT(CASE WHEN inventory_actual IS NOT NULL THEN 1 END) as filled_inventory
FROM sop_plan_data
WHERE tenant_id = $1
`, [tenantId]);

const completePercent = completeness.rows[0].filled_revenue / completeness.rows[0].total_rows;

// Freshness: Latest data within acceptable age
const freshness = await db.query(`
SELECT MAX(period) as latest_period
FROM sop_plan_data
WHERE tenant_id = $1
`, [tenantId]);

const daysSinceLastUpdate = Math.floor(
(Date.now() - new Date(freshness.rows[0].latest_period)) / (1000 * 60 * 60 * 24)
);

const freshnessPercent = daysSinceLastUpdate < 7 ? 1.0
: daysSinceLastUpdate < 30 ? 0.9
: daysSinceLastUpdate < 60 ? 0.5
: 0.0;

// Consistency: No logical contradictions
const consistency = await db.query(`
SELECT
COUNT(*) as total,
COUNT(CASE WHEN revenue_actual < cogs_actual THEN 1 END) as impossible_margins,
COUNT(CASE WHEN inventory_actual < 0 THEN 1 END) as impossible_inventory
FROM sop_plan_data
WHERE tenant_id = $1
`, [tenantId]);

const consistencyPercent = 1 - (
(consistency.rows[0].impossible_margins + consistency.rows[0].impossible_inventory) /
consistency.rows[0].total
);

// Overall score
const overall = (completePercent * 0.4) + (freshnessPercent * 0.3) + (consistencyPercent * 0.3);

return {
completeness: Math.round(completePercent * 100),
freshness: Math.round(freshnessPercent * 100),
consistency: Math.round(consistencyPercent * 100),
overall: Math.round(overall * 100),
days_since_update: daysSinceLastUpdate,
recommendation: overall < 0.7
? 'Data quality is low. Please update missing values and verify accuracy.'
: overall < 0.85
? 'Data quality is acceptable. Consider resolving any data gaps.'
: 'Data quality is excellent.'
};
}

11.3 Scenario Comparison View

/**
* Compare scenario results across cycles
* Used when event-triggered cycle shows before/after impact
*/

app.get('/api/scenarios/:scenarioType/compare', async (req, res) => {
const { oldCycleId, newCycleId, scenarioType } = req.query;
const tenantId = req.user.tenantId;

const oldResult = await db.query(`
SELECT planning_phase_result, control_phase_result
FROM scenario_execution_results
WHERE sop_cycle_id = $1 AND scenario_type = $2
`, [oldCycleId, scenarioType]);

const newResult = await db.query(`
SELECT planning_phase_result, control_phase_result
FROM scenario_execution_results
WHERE sop_cycle_id = $1 AND scenario_type = $2
`, [newCycleId, scenarioType]);

if (!oldResult.rows[0] || !newResult.rows[0]) {
return res.status(404).json({ error: 'Results not found' });
}

const comparison = {
old_cycle: oldCycleId,
new_cycle: newCycleId,
scenario_type: scenarioType,

old_result: oldResult.rows[0].planning_phase_result,
new_result: newResult.rows[0].planning_phase_result,

// Compute key metric differences
changes: [
{
metric: 'demand_forecast',
old_value: oldResult.rows[0].planning_phase_result.demand_forecast,
new_value: newResult.rows[0].planning_phase_result.demand_forecast,
change_percent: (
(newResult.rows[0].planning_phase_result.demand_forecast -
oldResult.rows[0].planning_phase_result.demand_forecast) /
oldResult.rows[0].planning_phase_result.demand_forecast * 100
).toFixed(1),
direction: 'up' || 'down',
explanation: 'Demand spike detected in latest data'
}
// ... other metrics
],

violations_comparison: {
old_critical: oldResult.rows[0].critical_violations?.length || 0,
new_critical: newResult.rows[0].critical_violations?.length || 0,
old_medium: oldResult.rows[0].medium_violations?.length || 0,
new_medium: newResult.rows[0].medium_violations?.length || 0
}
};

res.json(comparison);
});

SUMMARY: All Amendments Ready for Integration

AmendmentStatusImpactPriority
1. Scenario Dependencies✅ CompleteCritical - fixes parallel execution bugCRITICAL
2. CSV Entity Resolution✅ CompleteHigh - required for data importCRITICAL
3. Incremental Execution✅ CompleteHigh - cost optimization (70% savings)HIGH
4. Meta-Prompt Integration✅ CompleteHigh - enables learning loopHIGH
5. Financial Data Architecture✅ CompleteHigh - clarifies CFO scenario data flowHIGH
6. Adaptive Event Detection✅ CompleteMedium - prevents false alertsMEDIUM
7. Cache Invalidation Matrix✅ CompleteHigh - comprehensive cache rulesHIGH
8. Approval Workflow✅ CompleteMedium - user override capabilityMEDIUM
9. Subscription Management✅ CompleteHigh - tenant tier managementHIGH
10. Worker Implementation✅ CompleteCritical - specifies execution frameworkCRITICAL
11. Best Practices✅ CompleteMedium - operational excellenceMEDIUM

Next Step: Merge all amendments into main FSD document and proceed to implementation planning phase.

Approval Status: ✅ READY FOR IMPLEMENTATION