Product & BOM Review Integration Guide
This guide explains how to integrate the Product & BOM Review Management System with existing ProductService and BomService.
Overview
The review system runs asynchronously from the main S&OP process via pgmq message queues. No changes are needed to existing product upload/update flows - reviews are triggered and processed independently.
Database Migrations
Run these migrations before starting the system:
npm run knex migrate:latest --env production
This creates:
product_bom_reviews- Review lifecycle and findingsproduct_bom_change_log- Material change tracking
System Architecture
┌─────────────────────────────────────────────────────────────┐
│ ProductService │
│ ├─ CSV Upload → Creates products in core_entities │
│ └─ Triggers: initiateReview(INITIAL) → queues to pgmq │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ pgmq: product_review_queue │
│ (Messages stay in queue until processed) │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ productBomReviewWorker.js (background process) │
│ ├─ Reads messages from queue (poll every 5s) │
│ ├─ Processes review: pricing, supply chain, BOM analysis │
│ ├─ Generates action items │
│ └─ Archives message, notifies team │
└─────────────────────────────────────────────────────────────┘
↓
┌─────────────────────────────────────────────────────────────┐
│ API & Dashboard │
│ ├─ GET /api/products/:id/last-review │
│ ├─ GET /api/products/:id/reviews (history) │
│ ├─ POST /api/products/:id/review/initiate (manual) │
│ └─ GET /api/products/reviews/pending │
└─────────────────────────────────────────────────────────────┘
Integration Points
1. ProductService: Trigger INITIAL Review on Upload
In backend/src/services/ProductService.js, after successful CSV import:
import ProductBomReviewManagementService from './ProductBomReviewManagementService.js';
export const importProductsFromCsv = (filePath, tenantId) => {
return new Promise((resolve, reject) => {
// ... existing CSV parsing code ...
.on('end', async () => {
try {
// Insert products
const insertedProducts = await CoreEntitiesRepository.batchInsertAndReturn(productsToInsert);
// Enrich with AI insights
await Promise.all(insertedProducts.map(product =>
ProductIntelligenceEngine.processNewProduct(product)
));
// ⭐ NEW: Trigger INITIAL reviews for new products
for (const product of insertedProducts) {
try {
await ProductBomReviewManagementService.initiateReview({
tenantId,
productId: product.entity_id,
sku: product.sku,
reviewType: 'INITIAL',
triggeredBy: userId // if available, else 'SYSTEM'
});
} catch (reviewError) {
appLogger.warn(`Failed to initiate review for ${product.sku}: ${reviewError.message}`);
// Continue with import even if review initiation fails
}
}
resolve({
productsAdded: insertedProducts.length,
reviewsQueued: insertedProducts.length
});
} catch (error) {
reject(error);
}
});
});
};
2. ProductService: Track Changes & Trigger FORCED Reviews
When product data is updated, detect material changes:
// In product update endpoint or handler
import ProductBomReviewManagementService from './ProductBomReviewManagementService.js';
router.put('/api/products/:productId', authMiddleware, async (req, res) => {
try {
const productId = req.params.productId;
const tenantId = req.user.tenantId;
const previousData = await CoreEntitiesRepository.findById(productId);
const newData = { ...previousData, ...req.body };
// Update the product
const updated = await CoreEntitiesRepository.update(productId, newData);
// ⭐ NEW: Track changes and trigger FORCED review if material
try {
const changeResult = await ProductBomReviewManagementService.trackProductChange(
tenantId,
previousData,
newData
);
if (changeResult.forcedReviewTriggered) {
res.json({
success: true,
product: updated,
forcedReviewTriggered: true,
reviewId: changeResult.review.id,
message: 'Product updated. Material changes detected - forced review initiated.'
});
} else {
res.json({
success: true,
product: updated,
changeTracked: changeResult.changeLog.change_type === 'major'
});
}
} catch (reviewError) {
appLogger.warn(`Change tracking failed: ${reviewError.message}`);
// Still return updated product, just log review failure
res.json({ success: true, product: updated });
}
} catch (error) {
res.status(500).json({ error: error.message });
}
});
3. ProductIntelligenceEngine: Enhanced with Review Context
The existing ProductIntelligenceEngine can reference review findings:
// In ProductIntelligenceEngine.js, when generating insights:
async processNewProduct(productData) {
const intelligence = {
classification: await this.classifyProduct(productData),
supply_risk: await this.assessSupplyRisk(productData),
margin_optimization: await this.optimizeMargins(productData),
// ... existing code ...
};
// ⭐ This intelligence is stored and used by reviews
// when ProductBomReviewManagementService.performReviewAnalysis() runs
return intelligence;
}
4. BomService: Track BOM Changes
In backend/src/services/BomService.js, after importing BOM relationships:
export const importBomFromCsv = (filePath, tenantId) => {
return new Promise((resolve, reject) => {
// ... existing BOM parsing code ...
.on('end', async () => {
try {
if (relationships.length > 0) {
const inserted = await db.batchInsert('entity_relationships', relationships);
// ⭐ NEW: Trigger reviews for affected parent products
const parentProductIds = [...new Set(relationships.map(r => r.source_entity_id))];
for (const parentId of parentProductIds) {
try {
const product = await CoreEntitiesRepository.findById(parentId);
await ProductBomReviewManagementService.initiateReview({
tenantId,
productId: parentId,
sku: product.sku,
reviewType: 'FORCED', // BOM changes = material change
triggeredBy: 'SYSTEM'
});
} catch (error) {
appLogger.warn(`Failed to trigger review for parent ${parentId}`);
}
}
resolve({ relationshipsAdded: inserted.length });
} else {
resolve({ relationshipsAdded: 0 });
}
} catch (error) {
reject(error);
}
});
});
};
Running the System
1. Start Background Worker
The worker polls pgmq for review messages and processes them:
# In a separate terminal or process manager
node backend/workers/productBomReviewWorker.js
# Or with systemd
sudo systemctl start product-bom-review-worker
# Or with Docker
docker run -e DB_HOST=... -e DB_USER=... backend:review-worker
Worker will:
- Log "🚀 Starting Product BOM Review Worker"
- Connect to database
- Initialize pgmq queue
- Start polling for messages every 5 seconds
- Process up to 3 reviews concurrently
- Print status every 60 seconds
2. Configure Periodic Review Scheduler
In server.js or separate cron service:
import cron from 'node-cron';
import PeriodicReviewScheduler from './src/services/PeriodicReviewScheduler.js';
// Initialize at startup
const scheduler = new PeriodicReviewScheduler();
await scheduler.initialize();
// Run scheduler daily at 2 AM
cron.schedule('0 2 * * *', async () => {
appLogger.info('Running periodic review scheduler...');
try {
const results = await scheduler.runScheduler();
appLogger.info(`Scheduled ${results.reviewsQueued} periodic reviews`);
} catch (error) {
appLogger.error(`Scheduler error: ${error.message}`);
}
});
// Handle shutdown
process.on('SIGTERM', async () => {
await scheduler.close();
});
3. Verify Installation
Check that routes are working:
# Get last review for a product
curl -X GET http://localhost:8080/api/products/{productId}/last-review \
-H "Authorization: Bearer {token}"
# Get pending reviews
curl -X GET http://localhost:8080/api/products/reviews/pending \
-H "Authorization: Bearer {token}"
# Manually trigger review
curl -X POST http://localhost:8080/api/products/{productId}/review/initiate \
-H "Authorization: Bearer {token}"
Configuration
Review Cadence by Category
In backend/src/services/PeriodicReviewScheduler.js:
const REVIEW_CADENCE_DAYS = {
'Electronics': 180, // Semi-annual
'Raw Materials': 365, // Annual
'Strategic Components': 90, // Quarterly
'Commodity': 365, // Annual
'default': 365 // Annual
};
Adjust based on your organization's needs.
Material Change Thresholds
In backend/src/services/ProductBomReviewManagementService.js:
// Modify these to match your business rules
static MATERIAL_CHANGES = [
'lifecycle_status', // Any change
'unit_cost', // Changes >5%
'unit_price', // Changes >10%
'supply_risk', // Any change
'bom_structure', // Any change
];
Worker Concurrency
In backend/workers/productBomReviewWorker.js:
const workerConfig = {
maxConcurrent: 3, // Adjust based on server capacity
pollIntervalMs: 5000, // Poll frequency
visibilityTimeoutSeconds: 300, // 5 minute timeout
};
Monitoring
Check Worker Status
The worker logs status every 60 seconds:
============================================================
Product BOM Review Worker Status
============================================================
Uptime: 2h 15m 30s
Reviews Processed: 47
Processing Failed: 2
Currently Active: 1/3
Queue: product_review_queue
============================================================
Database Queries
Check review status:
-- All pending reviews
SELECT sku, review_type, triggered_at
FROM product_bom_reviews
WHERE status = 'PENDING'
ORDER BY triggered_at;
-- Reviews completed today
SELECT sku, action_items->>'count'::int as action_count
FROM product_bom_reviews
WHERE status = 'COMPLETED'
AND completed_at > NOW() - INTERVAL '1 day';
-- Products overdue for review (>365 days)
SELECT product_id, sku, MAX(last_reviewed_at) as last_reviewed
FROM product_bom_reviews
WHERE status = 'COMPLETED'
GROUP BY product_id, sku
HAVING MAX(last_reviewed_at) < NOW() - INTERVAL '365 days';
Error Handling
If Worker Crashes
- Check logs for error message
- Verify database connectivity
- Restart worker:
systemctl restart product-bom-review-worker - Messages in pgmq queue will be re-visible after visibility timeout
If Scheduler Fails
- Check periodic review scheduler logs
- Verify cron job is running:
systemctl status product-review-scheduler - Manually trigger scheduler:
node scripts/runPeriodicReviewScheduler.js
If Review Processing Hangs
- Worker has concurrent limit (default 3)
- Reviews have 5-minute timeout before re-visible in queue
- Check database for IN_PROGRESS reviews stuck for >5 minutes:
SELECT id, sku, started_at, NOW() - started_at as duration
FROM product_bom_reviews
WHERE status = 'IN_PROGRESS'
AND started_at < NOW() - INTERVAL '10 minutes';
Testing
Unit Test
Create backend/__tests__/services/ProductBomReviewManagementService.test.js:
import ProductBomReviewManagementService from '../../src/services/ProductBomReviewManagementService.js';
describe('ProductBomReviewManagementService', () => {
describe('Change Detection', () => {
it('should detect cost increase >5%', () => {
const prev = { unit_cost: 100 };
const next = { unit_cost: 106 };
const analysis = ProductChangeDetector.detectChanges(prev, next);
expect(analysis.isForcedReviewNeeded).toBe(true);
});
it('should detect price change >10%', () => {
const prev = { unit_price: 100 };
const next = { unit_price: 111 };
const analysis = ProductChangeDetector.detectChanges(prev, next);
expect(analysis.isForcedReviewNeeded).toBe(true);
});
});
describe('Review Initiation', () => {
it('should create INITIAL review for new product', async () => {
const review = await ProductBomReviewManagementService.initiateReview({
tenantId: 'test-tenant',
productId: 'prod-123',
sku: 'SKU-001',
reviewType: 'INITIAL'
});
expect(review.status).toBe('PENDING');
expect(review.review_type).toBe('INITIAL');
});
});
});
Manual Test
# 1. Create test product
curl -X POST http://localhost:8080/api/products \
-H "Authorization: Bearer {token}" \
-H "Content-Type: application/json" \
-d '{"sku":"TEST-001","name":"Test Product"}'
# 2. Check last review (should show never reviewed)
curl -X GET http://localhost:8080/api/products/{productId}/last-review \
-H "Authorization: Bearer {token}"
# 3. Watch worker logs
tail -f logs/worker.log
# 4. Initiate manual review
curl -X POST http://localhost:8080/api/products/{productId}/review/initiate \
-H "Authorization: Bearer {token}"
# 5. Verify review completed
curl -X GET http://localhost:8080/api/products/{productId}/reviews \
-H "Authorization: Bearer {token}"
Troubleshooting
| Issue | Solution |
|---|---|
| Reviews stuck in PENDING | Check worker is running: ps aux | grep productBomReviewWorker |
| pgmq queue not found | Run migrations: npm run knex migrate:latest |
| Worker memory usage high | Reduce maxConcurrent, restart worker |
| Forced reviews not triggering | Verify product update flow calls trackProductChange() |
| Scheduler not running | Check cron: systemctl status product-review-scheduler |
| Performance degradation | Check database for large action_items JSONB, index tables |
Next Steps
- ✅ Create migrations
- ✅ Integrate ProductService triggers
- ✅ Start background worker
- ✅ Configure periodic scheduler
- ⬜ Build frontend dashboard components
- ⬜ Add notification integrations (email, Slack)
- ⬜ Create reporting views for action items
Last Updated: 2025-10-23 Status: Ready for deployment