Skip to main content

Database Concurrency & Connection Management

Part 1: Test Environment - Migration Lock Issues

The Problem

When multiple test suites start simultaneously (e.g., npm test running 41 parallel test files), they all attempt to acquire the migration lock at the same time:

Error: Migration table is already locked

This happens because:

  1. Global setup runs once per test session
  2. Vitest parallelizes test execution across multiple worker processes
  3. All workers try to acquire the migration lock simultaneously
  4. PostgreSQL allows only ONE process to hold the migration lock at a time

Current Mitigation Strategy

File: backend/global-setup.js

// Clears stale migration locks before running migrations
await dbInstance.migrate.forceFreeMigrationsLock();

This ensures a clean state, but when multiple test runners start concurrently, they can still collide.

Solutions Applied

1. Retry Logic with Backoff (Currently Implemented)

const MAX_RETRIES = 5;
const RETRY_DELAY_MS = 2000;

for (let i = 0; i < MAX_RETRIES; i++) {
try {
await dbInstance.migrate.latest();
break;
} catch (error) {
if (i === MAX_RETRIES - 1) throw error;
await new Promise(resolve => setTimeout(resolve, RETRY_DELAY_MS));
}
}

Current Timing: 5 retries × 2 seconds = 10 seconds max wait

Limitation: Doesn't scale well if many test instances start simultaneously.

2. Manual Lock Release (For Stuck States)

If you encounter persistent migration locks:

# SSH into the database container
docker-compose -f docker-compose.test.yml exec test-db psql -U testuser -d testdb

# Inside psql, run:
DELETE FROM knex_migrations_lock;

Or from your local environment:

PGPASSWORD=testpassword psql -h 127.0.0.1 -p 5433 -U testuser -d testdb -c "DELETE FROM knex_migrations_lock;"

3. Sequential Test Execution (For Reliability)

If tests are flaky due to lock contention:

# Run tests sequentially instead of in parallel
npm test -- --no-coverage --threads false

# Or limit parallelism
npm test -- --threads 1

Update backend/vitest.config.js:

export default defineConfig({
test: {
// Run tests serially to avoid migration lock contention
threads: process.env.CI === 'true' ? false : true,
// Increase timeout for lock acquisition
hookTimeout: 30000,
testTimeout: 10000,
// Use single global setup to avoid race conditions
globals: true,
globalSetup: './global-setup.js',
},
});

Part 2: Production Readiness - Concurrency Analysis

Current Connection Pool Configuration

EnvironmentMinMaxTimeoutStatus
Development02060s⚠️ Too high for multi-tenant
Test02060s⚠️ Causes lock contention
Production21060s⚠️ UNDER-PROVISIONED

Production Bottleneck Analysis

Scenario: 50 Concurrent Users, 5 Tenants

Request Types & Pool Usage:

  1. API Requests (Read-Heavy)

    • Scenario search: 1 connection
    • Forecast queries: 1 connection
    • Constraint validation: 1 connection
    • Expected load: 50 concurrent requests = 50 connections needed
  2. Background Workers (Always Running)

    • Typesense sync worker: 2 connections (fetch + index)
    • Monte Carlo simulation: 3 connections (job queue + config + results)
    • Search job processor: 2 connections
    • Startup bootstrap worker: 2 connections
    • Expected load: ~9 connections minimum
  3. Database Maintenance

    • Migrations: 1 (brief, locks entire DB)
    • Vacuum/cleanup: 1
    • Backup operations: 2
    • Expected load: ~4 connections

Total Required: 50 + 9 + 4 = 63 connections

Current Allocated: 10 connections

Deficit: 53 connections (530% under-provisioned)

Risk Assessment

RiskSeverityDescription
Connection Exhaustion🔴 CRITICALPool exhausts within seconds of load spike
Query Timeouts🔴 CRITICAL60s timeout exceeded, requests fail
Worker Starvation🔴 CRITICALBackground workers can't acquire connections
Cascade Failures🟠 HIGHOne tenant's queries block others (no read replicas)
Migration Deadlock🟠 HIGHCan't deploy updates during high traffic

Multi-Tenancy Isolation Concerns

Current Implementation:

// BaseRepository._applyTenantScope()
query.where({ [this.tenantIdField]: user.tenant_id })

Issues:

  1. No read replicas - All reads/writes go to single primary
  2. No connection pooling per tenant - No resource isolation
  3. No query priority system - All queries equal weight
  4. No rate limiting - Single user can exhaust pool

Workers Using Native pg.Pool

Current Pattern:

// typesenseSyncWorker.js
const pool = new pg.Pool(pgConfig); // Separate from Knex!

Problem: Workers bypass the configured Knex pool

WorkerPool TypeSizeConnection Pattern
typesenseSyncWorkerpg.PoolUnknownUncontrolled
typesenseSearchJobWorkerpg.PoolUnknownUncontrolled
simulationWorkerKnexSharedShared (max 10)
startupBootstrapWorkerKnexSharedShared (max 10)

Part 3: Recommendations for Production Deployment

Immediate Actions (Pre-Deployment)

1. Increase Connection Pool

// knexfile.cjs - production section
production: {
pool: {
min: 5,
max: 50, // Increase from 10
acquireTimeoutMillis: 30000, // Reduce from 60s
idleTimeoutMillis: 30000, // Kill idle connections sooner
reapIntervalMillis: 1000, // Check for idle more frequently
},
}

Rationale:

  • min: 5 ensures connections ready for spikes
  • max: 50 accommodates 50 concurrent users + workers
  • 30s idle timeout prevents connection waste
  • 1s reap interval cleans up faster

2. Consolidate Worker Connections

Replace all worker pg.Pool instances with Knex:

// ❌ BEFORE - Each worker has separate pool
const pool = new pg.Pool(pgConfig);

// ✅ AFTER - Share Knex pool
import db from '../src/dal/knex.js';

async function syncEntity(entityId) {
return db('typesense_queue').where({ entity_id: entityId });
}

Benefit: All database operations share single managed pool

3. Implement Connection Monitoring

// Add to server startup
const db = require('./dal/knex');

db.on('query', (query) => {
if (query.sql.includes('slow_query_marker')) {
logger.warn(`Slow query detected: ${query.sql}`);
}
});

// Log pool status periodically
setInterval(() => {
const pool = db.client.pool;
logger.info({
activeConnections: pool.activeConnectionCount,
availableConnections: pool.availableObjectCount,
waitingRequests: pool.waitingObjectCount,
});
}, 60000);

4. Add Query Timeout Enforcement

// knexfile.cjs
production: {
acquireTimeoutMillis: 30000, // Wait max 30s for connection
// Knex doesn't have per-query timeout, add at app level:
},

// In service layer
const queryWithTimeout = async (query, timeoutMs = 5000) => {
return Promise.race([
query,
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Query timeout')), timeoutMs)
),
]);
};

Medium-Term Actions (Architecture Improvements)

1. Implement Read Replicas

// knexfile.cjs
production: {
client: 'pg',
connection: { /* primary */ },

// Add read replica pool
readReplicas: [
{
host: process.env.DB_REPLICA_1_HOST,
port: process.env.DB_REPLICA_1_PORT,
// ... same credentials as primary
},
],
}

Route read operations:

// Route expensive reads to replicas
async function getForecastHistory(scenarioId) {
const replicaDb = db.useReadReplica(); // hypothetical
return replicaDb('forecasts').where({ scenario_id: scenarioId });
}

2. Add Caching Layer (Redis)

// Reduce database reads by 70-80%
const redis = require('redis');
const client = redis.createClient({
host: process.env.REDIS_HOST,
port: process.env.REDIS_PORT,
});

async function getCachedForecast(scenarioId) {
const cached = await client.get(`forecast:${scenarioId}`);
if (cached) return JSON.parse(cached);

const result = await db('forecasts').where({ id: scenarioId });
await client.setex(`forecast:${scenarioId}`, 3600, JSON.stringify(result));
return result;
}

3. Implement Query Prioritization

// High-priority queries (user-facing): immediate
// Low-priority queries (background): queued

const PriorityQueue = require('p-queue');
const highPriority = new PriorityQueue({ concurrency: 8 });
const lowPriority = new PriorityQueue({ concurrency: 2 });

async function queryDatabase(sql, priority = 'normal') {
const queue = priority === 'high' ? highPriority : lowPriority;
return queue.add(() => db.raw(sql));
}

4. Add Per-Tenant Connection Limits

// Prevent single tenant from monopolizing pool
const tenantConnections = new Map();
const MAX_TENANT_CONNECTIONS = 5;

async function executeWithTenantLimit(tenantId, query) {
const current = tenantConnections.get(tenantId) || 0;

if (current >= MAX_TENANT_CONNECTIONS) {
throw new Error('Tenant connection limit exceeded');
}

tenantConnections.set(tenantId, current + 1);
try {
return await query;
} finally {
tenantConnections.set(tenantId, current);
}
}

Long-Term Actions (Scalability)

1. Database Sharding by Tenant

Tenant A → Database A (dedicated pool)
Tenant B → Database B (dedicated pool)
Tenant C → Database C (dedicated pool)

2. Event Sourcing for Heavy Writes

Replace direct database writes with event log:

API writes → Kafka/RabbitMQ → Event log → Async processors

3. Materialized Views for Analytics

-- Refresh periodically, not on each query
CREATE MATERIALIZED VIEW forecast_stats AS
SELECT tenant_id, scenario_id, avg(mape) as avg_error
FROM forecasts
GROUP BY tenant_id, scenario_id;

CREATE INDEX ON forecast_stats(tenant_id);

Part 4: Load Testing Recommendations

Before Production Deployment

// load-test.js
import autocannon from 'autocannon';

const result = await autocannon({
url: 'http://localhost:3000/api/scenarios',
connections: 100, // 100 concurrent users
duration: 60, // 60 seconds
pipelining: 5, // 5 requests per connection
requests: [
{ path: '/api/scenarios', method: 'GET' },
{ path: '/api/forecasts', method: 'GET' },
{ path: '/api/constraints', method: 'GET' },
],
});

console.log(`
Throughput: ${result.throughput.average.toFixed(0)} req/sec
Latency p99: ${result.latency.p99} ms
Connection errors: ${result.errors}
`);

Success Criteria:

  • ✅ 1000+ requests/second
  • ✅ p99 latency < 500ms
  • ✅ 0 timeouts/errors
  • ✅ Pool never exceeds 80% utilization

Part 5: Monitoring & Alerts

PostgreSQL Metrics to Monitor

-- Active connections
SELECT count(*) FROM pg_stat_activity;

-- Long-running queries
SELECT query, duration FROM pg_stat_activity
WHERE duration > '5 minutes'::interval;

-- Index bloat
SELECT schemaname, tablename, (ROUND(100 * (CASE WHEN otta > 0 THEN sml.relpages - otta ELSE 0 END) / sml.relpages)) AS ratio
FROM pg_class sml JOIN pg_namespace ON sml.relnamespace = pg_namespace.oid;

Application-Level Metrics

// Track pool health
const { Pool } = require('pg');

pool.on('connect', () => {
metrics.gauge('db.pool.size', pool.totalCount);
});

pool.on('error', (err) => {
metrics.increment('db.pool.errors');
logger.error('Unexpected error on idle client', err);
});

// Track slow queries
const slowQueryThreshold = 1000; // 1 second

db.on('query', (query) => {
if (query.duration > slowQueryThreshold) {
metrics.histogram('db.query.duration_slow', query.duration);
logger.warn(`Slow query (${query.duration}ms): ${query.sql}`);
}
});

Alert Thresholds

MetricWarningCritical
Active Connections80% of max95% of max
Query Queue Wait Time> 1 second> 5 seconds
Query Execution Time> 1 second> 5 seconds
Migration Lock Wait> 30 seconds> 60 seconds
Connection Acquire Timeout> 1% of requests> 5% of requests

Summary: Production Readiness Checklist

  • Connection Pool: Increased to min: 5, max: 50
  • Worker Consolidation: All workers use Knex pool
  • Query Timeouts: Implemented per-query 5s timeout
  • Migration Locks: Documented lock release procedure
  • Read Replicas: Evaluated and planned (optional for Phase 1)
  • Caching: Redis layer for hot data (optional for Phase 1)
  • Monitoring: Prometheus/Datadog metrics configured
  • Load Testing: Passed 100+ concurrent user test
  • Documentation: Team trained on monitoring dashboard

References