Skip to main content

Product & BOM Review Integration Guide

This guide explains how to integrate the Product & BOM Review Management System with existing ProductService and BomService.

Overview

The review system runs asynchronously from the main S&OP process via pgmq message queues. No changes are needed to existing product upload/update flows - reviews are triggered and processed independently.

Database Migrations

Run these migrations before starting the system:

npm run knex migrate:latest --env production

This creates:

  • product_bom_reviews - Review lifecycle and findings
  • product_bom_change_log - Material change tracking

System Architecture

┌─────────────────────────────────────────────────────────────┐
│ ProductService │
│ ├─ CSV Upload → Creates products in core_entities │
│ └─ Triggers: initiateReview(INITIAL) → queues to pgmq │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐
│ pgmq: product_review_queue │
│ (Messages stay in queue until processed) │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐
│ productBomReviewWorker.js (background process) │
│ ├─ Reads messages from queue (poll every 5s) │
│ ├─ Processes review: pricing, supply chain, BOM analysis │
│ ├─ Generates action items │
│ └─ Archives message, notifies team │
└─────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────┐
│ API & Dashboard │
│ ├─ GET /api/products/:id/last-review │
│ ├─ GET /api/products/:id/reviews (history) │
│ ├─ POST /api/products/:id/review/initiate (manual) │
│ └─ GET /api/products/reviews/pending │
└─────────────────────────────────────────────────────────────┘

Integration Points

1. ProductService: Trigger INITIAL Review on Upload

In backend/src/services/ProductService.js, after successful CSV import:

import ProductBomReviewManagementService from './ProductBomReviewManagementService.js';

export const importProductsFromCsv = (filePath, tenantId) => {
return new Promise((resolve, reject) => {
// ... existing CSV parsing code ...

.on('end', async () => {
try {
// Insert products
const insertedProducts = await CoreEntitiesRepository.batchInsertAndReturn(productsToInsert);

// Enrich with AI insights
await Promise.all(insertedProducts.map(product =>
ProductIntelligenceEngine.processNewProduct(product)
));

// ⭐ NEW: Trigger INITIAL reviews for new products
for (const product of insertedProducts) {
try {
await ProductBomReviewManagementService.initiateReview({
tenantId,
productId: product.entity_id,
sku: product.sku,
reviewType: 'INITIAL',
triggeredBy: userId // if available, else 'SYSTEM'
});
} catch (reviewError) {
appLogger.warn(`Failed to initiate review for ${product.sku}: ${reviewError.message}`);
// Continue with import even if review initiation fails
}
}

resolve({
productsAdded: insertedProducts.length,
reviewsQueued: insertedProducts.length
});
} catch (error) {
reject(error);
}
});
});
};

2. ProductService: Track Changes & Trigger FORCED Reviews

When product data is updated, detect material changes:

// In product update endpoint or handler
import ProductBomReviewManagementService from './ProductBomReviewManagementService.js';

router.put('/api/products/:productId', authMiddleware, async (req, res) => {
try {
const productId = req.params.productId;
const tenantId = req.user.tenantId;
const previousData = await CoreEntitiesRepository.findById(productId);
const newData = { ...previousData, ...req.body };

// Update the product
const updated = await CoreEntitiesRepository.update(productId, newData);

// ⭐ NEW: Track changes and trigger FORCED review if material
try {
const changeResult = await ProductBomReviewManagementService.trackProductChange(
tenantId,
previousData,
newData
);

if (changeResult.forcedReviewTriggered) {
res.json({
success: true,
product: updated,
forcedReviewTriggered: true,
reviewId: changeResult.review.id,
message: 'Product updated. Material changes detected - forced review initiated.'
});
} else {
res.json({
success: true,
product: updated,
changeTracked: changeResult.changeLog.change_type === 'major'
});
}
} catch (reviewError) {
appLogger.warn(`Change tracking failed: ${reviewError.message}`);
// Still return updated product, just log review failure
res.json({ success: true, product: updated });
}
} catch (error) {
res.status(500).json({ error: error.message });
}
});

3. ProductIntelligenceEngine: Enhanced with Review Context

The existing ProductIntelligenceEngine can reference review findings:

// In ProductIntelligenceEngine.js, when generating insights:

async processNewProduct(productData) {
const intelligence = {
classification: await this.classifyProduct(productData),
supply_risk: await this.assessSupplyRisk(productData),
margin_optimization: await this.optimizeMargins(productData),
// ... existing code ...
};

// ⭐ This intelligence is stored and used by reviews
// when ProductBomReviewManagementService.performReviewAnalysis() runs

return intelligence;
}

4. BomService: Track BOM Changes

In backend/src/services/BomService.js, after importing BOM relationships:

export const importBomFromCsv = (filePath, tenantId) => {
return new Promise((resolve, reject) => {
// ... existing BOM parsing code ...

.on('end', async () => {
try {
if (relationships.length > 0) {
const inserted = await db.batchInsert('entity_relationships', relationships);

// ⭐ NEW: Trigger reviews for affected parent products
const parentProductIds = [...new Set(relationships.map(r => r.source_entity_id))];

for (const parentId of parentProductIds) {
try {
const product = await CoreEntitiesRepository.findById(parentId);
await ProductBomReviewManagementService.initiateReview({
tenantId,
productId: parentId,
sku: product.sku,
reviewType: 'FORCED', // BOM changes = material change
triggeredBy: 'SYSTEM'
});
} catch (error) {
appLogger.warn(`Failed to trigger review for parent ${parentId}`);
}
}

resolve({ relationshipsAdded: inserted.length });
} else {
resolve({ relationshipsAdded: 0 });
}
} catch (error) {
reject(error);
}
});
});
};

Running the System

1. Start Background Worker

The worker polls pgmq for review messages and processes them:

# In a separate terminal or process manager
node backend/workers/productBomReviewWorker.js

# Or with systemd
sudo systemctl start product-bom-review-worker

# Or with Docker
docker run -e DB_HOST=... -e DB_USER=... backend:review-worker

Worker will:

  • Log "🚀 Starting Product BOM Review Worker"
  • Connect to database
  • Initialize pgmq queue
  • Start polling for messages every 5 seconds
  • Process up to 3 reviews concurrently
  • Print status every 60 seconds

2. Configure Periodic Review Scheduler

In server.js or separate cron service:

import cron from 'node-cron';
import PeriodicReviewScheduler from './src/services/PeriodicReviewScheduler.js';

// Initialize at startup
const scheduler = new PeriodicReviewScheduler();
await scheduler.initialize();

// Run scheduler daily at 2 AM
cron.schedule('0 2 * * *', async () => {
appLogger.info('Running periodic review scheduler...');
try {
const results = await scheduler.runScheduler();
appLogger.info(`Scheduled ${results.reviewsQueued} periodic reviews`);
} catch (error) {
appLogger.error(`Scheduler error: ${error.message}`);
}
});

// Handle shutdown
process.on('SIGTERM', async () => {
await scheduler.close();
});

3. Verify Installation

Check that routes are working:

# Get last review for a product
curl -X GET http://localhost:8080/api/products/{productId}/last-review \
-H "Authorization: Bearer {token}"

# Get pending reviews
curl -X GET http://localhost:8080/api/products/reviews/pending \
-H "Authorization: Bearer {token}"

# Manually trigger review
curl -X POST http://localhost:8080/api/products/{productId}/review/initiate \
-H "Authorization: Bearer {token}"

Configuration

Review Cadence by Category

In backend/src/services/PeriodicReviewScheduler.js:

const REVIEW_CADENCE_DAYS = {
'Electronics': 180, // Semi-annual
'Raw Materials': 365, // Annual
'Strategic Components': 90, // Quarterly
'Commodity': 365, // Annual
'default': 365 // Annual
};

Adjust based on your organization's needs.

Material Change Thresholds

In backend/src/services/ProductBomReviewManagementService.js:

// Modify these to match your business rules
static MATERIAL_CHANGES = [
'lifecycle_status', // Any change
'unit_cost', // Changes >5%
'unit_price', // Changes >10%
'supply_risk', // Any change
'bom_structure', // Any change
];

Worker Concurrency

In backend/workers/productBomReviewWorker.js:

const workerConfig = {
maxConcurrent: 3, // Adjust based on server capacity
pollIntervalMs: 5000, // Poll frequency
visibilityTimeoutSeconds: 300, // 5 minute timeout
};

Monitoring

Check Worker Status

The worker logs status every 60 seconds:

============================================================
Product BOM Review Worker Status
============================================================
Uptime: 2h 15m 30s
Reviews Processed: 47
Processing Failed: 2
Currently Active: 1/3
Queue: product_review_queue
============================================================

Database Queries

Check review status:

-- All pending reviews
SELECT sku, review_type, triggered_at
FROM product_bom_reviews
WHERE status = 'PENDING'
ORDER BY triggered_at;

-- Reviews completed today
SELECT sku, action_items->>'count'::int as action_count
FROM product_bom_reviews
WHERE status = 'COMPLETED'
AND completed_at > NOW() - INTERVAL '1 day';

-- Products overdue for review (>365 days)
SELECT product_id, sku, MAX(last_reviewed_at) as last_reviewed
FROM product_bom_reviews
WHERE status = 'COMPLETED'
GROUP BY product_id, sku
HAVING MAX(last_reviewed_at) < NOW() - INTERVAL '365 days';

Error Handling

If Worker Crashes

  1. Check logs for error message
  2. Verify database connectivity
  3. Restart worker: systemctl restart product-bom-review-worker
  4. Messages in pgmq queue will be re-visible after visibility timeout

If Scheduler Fails

  1. Check periodic review scheduler logs
  2. Verify cron job is running: systemctl status product-review-scheduler
  3. Manually trigger scheduler: node scripts/runPeriodicReviewScheduler.js

If Review Processing Hangs

  • Worker has concurrent limit (default 3)
  • Reviews have 5-minute timeout before re-visible in queue
  • Check database for IN_PROGRESS reviews stuck for >5 minutes:
SELECT id, sku, started_at, NOW() - started_at as duration
FROM product_bom_reviews
WHERE status = 'IN_PROGRESS'
AND started_at < NOW() - INTERVAL '10 minutes';

Testing

Unit Test

Create backend/__tests__/services/ProductBomReviewManagementService.test.js:

import ProductBomReviewManagementService from '../../src/services/ProductBomReviewManagementService.js';

describe('ProductBomReviewManagementService', () => {
describe('Change Detection', () => {
it('should detect cost increase >5%', () => {
const prev = { unit_cost: 100 };
const next = { unit_cost: 106 };
const analysis = ProductChangeDetector.detectChanges(prev, next);
expect(analysis.isForcedReviewNeeded).toBe(true);
});

it('should detect price change >10%', () => {
const prev = { unit_price: 100 };
const next = { unit_price: 111 };
const analysis = ProductChangeDetector.detectChanges(prev, next);
expect(analysis.isForcedReviewNeeded).toBe(true);
});
});

describe('Review Initiation', () => {
it('should create INITIAL review for new product', async () => {
const review = await ProductBomReviewManagementService.initiateReview({
tenantId: 'test-tenant',
productId: 'prod-123',
sku: 'SKU-001',
reviewType: 'INITIAL'
});
expect(review.status).toBe('PENDING');
expect(review.review_type).toBe('INITIAL');
});
});
});

Manual Test

# 1. Create test product
curl -X POST http://localhost:8080/api/products \
-H "Authorization: Bearer {token}" \
-H "Content-Type: application/json" \
-d '{"sku":"TEST-001","name":"Test Product"}'

# 2. Check last review (should show never reviewed)
curl -X GET http://localhost:8080/api/products/{productId}/last-review \
-H "Authorization: Bearer {token}"

# 3. Watch worker logs
tail -f logs/worker.log

# 4. Initiate manual review
curl -X POST http://localhost:8080/api/products/{productId}/review/initiate \
-H "Authorization: Bearer {token}"

# 5. Verify review completed
curl -X GET http://localhost:8080/api/products/{productId}/reviews \
-H "Authorization: Bearer {token}"

Troubleshooting

IssueSolution
Reviews stuck in PENDINGCheck worker is running: ps aux | grep productBomReviewWorker
pgmq queue not foundRun migrations: npm run knex migrate:latest
Worker memory usage highReduce maxConcurrent, restart worker
Forced reviews not triggeringVerify product update flow calls trackProductChange()
Scheduler not runningCheck cron: systemctl status product-review-scheduler
Performance degradationCheck database for large action_items JSONB, index tables

Next Steps

  1. ✅ Create migrations
  2. ✅ Integrate ProductService triggers
  3. ✅ Start background worker
  4. ✅ Configure periodic scheduler
  5. ⬜ Build frontend dashboard components
  6. ⬜ Add notification integrations (email, Slack)
  7. ⬜ Create reporting views for action items

Last Updated: 2025-10-23 Status: Ready for deployment