-
Notifications
You must be signed in to change notification settings - Fork 0
TicketHive Level 3 Implementation Plan
For a better learning experience, this plan has been split into:
-
LEVEL_3_MVP_PLAN.md - Milestones 0-6 β Start here! Get async processing working end-to-end β ~3-5 days, produces a demo-able system β API returns 202, workers process jobs, optimistic locking working
-
LEVEL_3_PRODUCTION_PLAN.md - Milestones 7-10 β Production hardening after MVP complete β ~3-5 days, adds SSE, rate limiting, circuit breakers, monitoring β Handles 10K+ requests, production-ready
Why this division?
- β Faster feedback loop (see it working sooner)
- β Clear learning checkpoints (MVP = demo, Production = interview-ready)
- β Can pause between phases if needed
- β Better portfolio presentation ("I built MVP, then hardened for production")
This document contains the complete reference material with all milestones, architecture decisions, and technical context. Use the split documents above for step-by-step implementation.
Current State (Level 2):
- Synchronous API processing with
FOR UPDATEpessimistic locking - 1-2% timeout rate under 1000 concurrent requests
- Direct database operations in API service
- Immediate response (201/409/503)
Target State (Level 3):
- Asynchronous request processing via job queues
- <100ms API response time returning 202 Accepted
- Worker processes handle booking logic separately
- Optimistic locking with versioning (no
FOR UPDATE) - Server-Sent Events for status updates
- Separate BullMQ dashboard service for monitoring
- Rate limiting & circuit breaker protection
- Configurable retry strategy via environment
- Zero timeout errors, 10x throughput improvement
- Handles 10,000+ concurrent requests
βββββββββββββββ 1. POST /book ββββββββββββββββββββββ
β β βββββββββββββββββββββββΊ β β
β Client β (with payload) β API Service β
β β β (/apps/api) β
ββββββββ¬βββββββ βββββββββββββ¬βββββββββ
β β
β β 2. Rate limit check
β β 3. Queue depth check
β β 4. Circuit breaker check
β β 5. Create booking job
β β - Validate with Zod
β β - Generate jobId
β 8. SSE (if not completed) β 6. Return 202 + jobId (<100ms)
β βββββββββββββββββββββββββββββββββββββββββββ€
β β 7. Push to BullMQ Queue
β βΌ
β ββββββββββββββββββββββ
β β Redis β
β β - Queue depth β
β β - Job persistenceβ
β βββββββββββββ¬βββββββββ
β β
β β 9. Worker pulls job
β βΌ
β ββββββββββββββββββββββ
β β Worker Service β
β β (/apps/worker) β
β βββββββββββββ¬βββββββββ
β β
β β 10. Optimistic locking
β β (version check)
β β 11. Database update
β βΌ
β ββββββββββββββββββββββ
β β PostgreSQL β
β β β
β βββββββββββββ¬βββββββββ
β β
β β 12. QueueEvents publish
β βΌ
β ββββββββββββββββββββββ
βββββββββββββββββββββββββββββββββββ€ QueueEvents β
β (Redis Streams) β
ββββββββββββββββββββββ
/packages
/database (Shared PostgreSQL client)
/types (Shared Zod schemas + TS types)
/lib (Shared utilities, errors, Redis client)
βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Monitoring Stack β
β β’ Dashboard service (apps/dashboard) β
β β’ Rate limiter metrics β
β β’ Circuit breaker metrics β
β β’ Queue depth alerts β
β β’ Version conflict rate alerts β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββ
For detailed architectural explanations, edge case analysis, and design pattern rationale, see:
- ARCHITECTURAL_CONTEXT.md - Educational material explaining the "why" behind Level 3 decisions
Key topics covered:
- The "Fast Worker" race condition and SSE edge cases
- Why QueueEvents over raw Redis Pub/Sub for horizontal scaling
- Technical decision rationale (BullMQ, ioredis, optimistic locking, etc.)
- Error handling philosophy (hard fail vs graceful degradation)
- Performance expectations and success metrics
Objective: Reorganize the existing Level 2 monolith into a monorepo structure BEFORE introducing Level 3 complexity. This prevents debugging nightmares during the transition.
** Tasks:**
-
Create Monorepo Structure
/apps /api (Move existing src/* here) /worker (Will be populated in Milestone 4) /packages /database (Extract db.ts, schema, migrations) /types (Extract all TypeScript interfaces) /lib (Extract shared utilities, errors, auth) -
Extract Shared Database Layer
- Move
src/lib/db.tsβpackages/database/src/index.ts - Move database initialization logic
- Update imports to use package reference
- Ensure connection pooling works from both API and Worker
- Move
-
Extract Shared Types
- Move
src/types/index.tsβpackages/types/src/index.ts - Extract all interfaces: Event, Booking, User, etc.
- Create Zod validation schemas for API payloads
- Ensure types compile in both apps
- Move
-
Extract Shared Utilities
- Move
src/lib/errors.tsβpackages/lib/src/errors.ts - Move
src/lib/errorHandler.tsβpackages/lib/src/errorHandler.ts - Move
src/lib/auth.tsβpackages/lib/src/auth.ts - Move
src/lib/env.tsβpackages/lib/src/env.ts
- Move
-
Update API Application
- Move all route handlers to
apps/api/src/routes/ - Move all services to
apps/api/src/services/ - Move middleware to
apps/api/src/middleware/ - Update all import paths to use
@ticket-hive/database,@ticket-hive/types, etc.
- Move all route handlers to
-
Configure Build System
- Add
turbo.jsonfor build pipeline orchestration - Update root
package.jsonwith workspaces configuration - Add individual
package.jsonfiles for each package/app - Ensure TypeScript paths resolve correctly
- Add
-
Update Docker Configuration
- Modify
Dockerfilefor monorepo multi-stage builds - Update
docker-compose.ymlto mount correct volumes - Ensure hot-reload works for development
- Modify
-
Verification
- Run existing Level 2 load tests:
npm run test:load - Verify exactly 100 bookings, zero overbookings
- Confirm all existing functionality works
- Do not proceed to Milestone 1 until this passes
- Run existing Level 2 load tests:
Expected Output:
- β Existing Level 2 code runs in new monorepo structure
- β All tests pass without modification
- β Imports resolve correctly across packages
- β Docker Compose starts all services successfully
- β Zero functional changes to Level 2 logic
Files Modified/Created:
-
/apps/api/src/*(moved from rootsrc/) -
/packages/database/package.json(new) -
/packages/types/package.json(new) -
/packages/lib/package.json(new) -
/apps/api/package.json(new) -
/apps/worker/package.json(new, empty for now) - Root
package.json(add workspaces) -
turbo.json(new, build orchestration) -
Dockerfile(update for monorepo)
Validation:
# After restructure
docker compose up -d
npm run test:load
# Should show same results as Level 2:
# - 100 bookings created
# - 0 overbookings
# - 1-2% timeout rate (expected for Level 2)Objective: Add Redis service and BullMQ dependencies to enable queue-based processing. Builds ON TOP of the monorepo structure from Milestone 0.
** Tasks:**
-
Docker Compose Updates
- Add Redis service to
compose.yaml - Configure Redis ports (6379)
- Add Redis healthcheck
- Add Redis dependency to API service
- Add Redis service to
-
Package Dependencies
# In root directory npm install bullmq ioredis npm install -D @types/ioredis -
Environment Configuration
- Add Redis configuration to
packages/lib/src/env.ts:REDIS_HOST: z.string().default("localhost"), REDIS_PORT: z.number().default(6379), REDIS_PASSWORD: z.string().optional(), // Retry strategy configuration WORKER_MAX_RETRIES: z.number().default(3), WORKER_RETRY_DELAY_MS: z.number().default(100), WORKER_RETRY_MAX_DELAY_MS: z.number().default(1000), WORKER_CONCURRENCY: z.number().default(5), REDIS_QUEUE_MAX_DEPTH: z.number().default(1000), // Circuit breaker CIRCUIT_BREAKER_TIMEOUT: z.number().default(3000), CIRCUIT_BREAKER_ERROR_THRESHOLD: z.number().default(50), CIRCUIT_BREAKER_RESET_TIMEOUT: z.number().default(30000),
- Update
.env.examplewith Redis defaults
- Add Redis configuration to
-
Redis Connection Setup
- Create
packages/lib/src/redis.tswith connection factory - Implement connection retry logic (3 attempts)
- Export shared Redis instance for BullMQ
- Import from shared lib package, not directly from src
- Create
Expected Output:
- β Redis container starts with Docker Compose
- β Application connects to Redis successfully
- β
Healthcheck passes:
docker compose exec redis redis-cli ping - β No breaking changes to existing Level 2 API logic
- β All imports resolve via packages
Files Modified/Created:
-
compose.yaml(add Redis service) - Root
package.json(add bullmq, ioredis, zod) -
packages/lib/src/env.ts(add Redis and retry env vars) -
packages/lib/src/redis.ts(new file, shared Redis client)
Validation:
docker compose up -d
# Redis should be healthy
docker compose logs redis
# Should show: Ready to accept connectionsObjective: Add optimistic concurrency control by introducing a version column to events.
** Tasks:**
-
Schema Changes
- Add
version INT DEFAULT 0 NOT NULLtoeventstable - Create migration script in
packages/database/scripts/migrate-level3.ts - Backfill existing events:
UPDATE events SET version = 0 - Add unique constraint for safety:
UNIQUE(id, version)
- Add
-
Update Database Initialization
- Modify
packages/database/src/db.tsinitializeDatabase() - Add version column to CREATE TABLE statement
- Update timestamp:
updated_at TIMESTAMP DEFAULT NOW()for optimistic locking
- Modify
-
Type Definitions
- Update
packages/types/src/index.tsEvent interface - Add
version: numberfield to Event type - Update all related type guards
- Update
-
Event Service Updates
- Modify
apps/api/src/services/eventService.ts - Add
getEventWithVersion()for worker consumption - Update
getEventById()to return version
- Modify
Expected Output:
- β All events have version = 0 after migration
- β New events get version = 0 automatically
- β Type safety maintained throughout codebase
- β No impact on Level 2 functionality yet
Files Modified/Created:
-
packages/database/src/db.ts(add version column) -
packages/types/src/index.ts(add version to types) -
packages/database/scripts/migrate-level3.ts(new migration script) -
apps/api/src/services/eventService.ts(add version support)
Validation:
-- After migration
SELECT id, name, version FROM events LIMIT 5;
-- Should show version = 0 for all eventsObjective: Define and implement the booking job data structure with strict Zod validation for API-Worker contract enforcement.
** Tasks:**
-
Job Data Zod Schema (Shared Contract) Create
packages/types/src/bookingJob.ts:import { z } from 'zod'; export const BookingJobSchema = z.object({ userId: z.string().uuid(), eventId: z.string().uuid(), timestamp: z.number().int().positive(), // Add idempotency key (for Level 4 compatibility) idempotencyKey: z.string().uuid().optional() }); export type BookingJobData = z.infer<typeof BookingJobSchema>;
-
Queue Configuration
- Create
packages/lib/src/queues.ts - Define queue names:
bookingQueue,notificationQueue - Configure BullMQ with Redis connection (using shared Redis client)
- Set default job options:
{ attempts: 3, backoff: { type: 'exponential', delay: 100 }, timeout: 30000, // 30 second job timeout removeOnComplete: { age: 3600 }, // Keep for 1 hour removeOnFail: { age: 24 * 3600 } // Keep for 24 hours }
- Create
-
Job Producer Logic (API)
- Create
apps/api/src/services/queueService.ts - Implement
createBookingJob()function - Validate payload with Zod schema BEFORE queueing
- Return job ID immediately
export async function createBookingJob(data: BookingJobData): Promise<string> { // Validate against shared schema const validatedData = BookingJobSchema.parse(data); const job = await bookingQueue.add('process-booking', validatedData, { jobId: `booking-${validatedData.idempotencyKey || uuid()}`, }); return job.id; }
- Create
-
Job Consumer Setup (Worker)
- Create
apps/worker/src/processors/bookingProcessor.ts - Register queue processor with BullMQ
- Set concurrency: 5 workers (configurable via env:
WORKER_CONCURRENCY=5) - Validate job data with Zod schema on consumption
export const bookingProcessor = async (job: Job<BookingJobData>) => { // Validate at consume time (defense in depth) const data = BookingJobSchema.parse(job.data); // Process booking... };
- Create
Expected Output:
- β
Can add jobs to queue:
await bookingQueue.add('booking', data) - β
Jobs appear in Redis:
bull:booking:... - β Worker can receive and log jobs
- β Job lifecycle events tracked (waiting, active, completed, failed)
- β Invalid job data rejected at API and Worker boundaries
- β Type safety enforced across service boundary
Files Modified/Created:
-
packages/types/src/bookingJob.ts(NEW - shared schema) -
packages/lib/src/queues.ts(queue definitions with validation) -
apps/api/src/services/queueService.ts(job producers with validation) -
apps/worker/src/processors/bookingProcessor.ts(job consumers with validation) -
apps/worker/src/index.ts(worker entry point) -
apps/api/src/lib/dashboard.ts(BullMQ dashboard)
Validation:
# Check Redis for queued jobs
docker compose exec redis redis-cli KEYS "bull:*"
# Should show: bull:booking:id
# Test validation
# API: Try to send invalid data β should reject before queue
# Worker: Try to process corrupted job β should fail validateObjective: Create a separate worker service that processes booking jobs independently.
** Tasks:**
-
Worker Service Creation
- Create
apps/worker/src/index.ts(worker entry point) - Import and start all queue processors
- Add graceful shutdown handling (SIGTERM, SIGINT)
- Add worker health endpoint (
/health) - Set concurrency from environment:
env.WORKER_CONCURRENCY
- Create
-
Docker Service Setup
- Add
workerservice tocompose.yaml - Set startup command:
node apps/worker/dist/index.js(production) ornode --watch --experimental-transform-types apps/worker/src/index.ts(dev) - Share same environment variables as API
- Add volume mounts for hot-reload in development
- Add
-
Booking Processor (Skeleton)
- Create
apps/worker/src/processors/bookingProcessor.tswith stub implementation - Extract
userId,eventIdfrom job data (already validated by Zod) - Log job receipt and queue depth monitoring
- Add placeholder for optimistic locking logic (Milestone 8)
- Handle graceful failure: no database calls yet
- IMPORTANT: Do NOT implement actual booking logic here yet
- Create
-
Database Connection Management
- Worker needs direct PostgreSQL access
- Import shared database client from
packages/database - Ensure connection pool separate from API (different env vars if needed)
Expected Output:
- β Worker container starts and connects to Redis
- β Worker polls queue for jobs
- β Worker logs show job receipt (no processing yet)
- β
Worker can be scaled:
docker compose up -d --scale worker=3 - β Graceful shutdown works (completes current job before exiting)
- β No optimistic locking implementation yet (that comes in Milestone 8)
Files Modified/Created:
-
apps/worker/src/index.ts(worker entry) -
apps/worker/src/processors/bookingProcessor.ts(skeleton) -
compose.yaml(add worker service) -
apps/api/src/middleware/worker-health.ts(health monitoring)
Validation:
# Start worker
docker compose up -d worker
# Check worker logs
docker compose logs -f worker
# Should see: "Worker listening for booking jobs..."
# Jobs should be logged as received but not processed
# Test scaling
docker compose up -d --scale worker=3
docker compose logs worker
# Should see 3 worker instances processingObjective: Implement booking logic in workers using optimistic locking with version numbers. This MUST come before API migration.
CRITICAL ORDERING NOTE: This milestone must be completed BEFORE Milestone 6 (API migration) to prevent deploying a system where the API creates jobs that workers cannot process.
** Tasks:**
-
Complete Worker Processing Logic
- Update
apps/worker/src/processors/bookingProcessor.tswith full implementation - Use optimistic locking pattern (no FOR UPDATE):
async function processBooking(job: Job<BookingJobData>) { const { userId, eventId } = job.data; // Read event WITHOUT locking const events = await sql` SELECT * FROM events WHERE id = ${eventId} `; const event = events[0]; if (!event) { throw new AppError(ErrorCode.EVENT_NOT_FOUND); } // Optimistic update: version must match const currentVersion = event.version; const result = await sql` UPDATE events SET available_tickets = available_tickets - 1, version = version + 1, updated_at = NOW() WHERE id = ${eventId} AND version = ${currentVersion} AND available_tickets > 0 RETURNING id, version, available_tickets `; // Check if update succeeded if (result.count === 0) { // Either version changed (conflict) or sold out throw new AppError(ErrorCode.EVENT_SOLD_OUT_OR_CONFLICT); } // Create booking record const bookingResult = await sql` INSERT INTO bookings (user_id, event_id, status) VALUES (${userId}, ${eventId}, 'CONFIRMED') RETURNING id `; return { success: true, bookingId: bookingResult[0].id, eventId, remainingTickets: result[0].available_tickets }; }
- Update
-
Retry Strategy with Configurable Backoff
// In packages/lib/src/queues.ts export const bookingQueue = new Queue('booking', { connection: redis, defaultJobOptions: { attempts: env.WORKER_MAX_RETRIES, backoff: { type: 'exponential', delay: env.WORKER_RETRY_DELAY_MS }, timeout: 30000, removeOnComplete: { age: 3600 }, removeOnFail: { age: 24 * 3600 } } });
- Add jitter to prevent thundering herd:
const jitter = Math.random() * 100; // 0-100ms random const delay = Math.min( delay * 2 + jitter, env.WORKER_RETRY_MAX_DELAY_MS );
-
Handle Version Conflicts
- Retry on version conflict (BullMQ handles this automatically)
- Log conflict rate for monitoring
- No custom retry logic needed (BullMQ retry strategy applies)
-
Race Condition Testing
- Run 1000 concurrent requests
- Monitor for "version conflict" errors in failed jobs (expected)
- Verify version conflicts result in retry, not failure
- Verify exactly 100 bookings created
- Verify available_tickets = 0 (not negative)
- Check version numbers: final version should be 100
-
Performance Comparison
- Measure throughput vs Level 2
- Verify worker processing time (~200-500ms avg)
- Monitor retry rate (should be <5% under 1000 concurrent)
Expected Output:
- β
No
FOR UPDATEqueries in worker codebase - β Version checking prevents overbooking
- β Retry logic handles conflicts gracefully
- β Worker successfully processes booking jobs end-to-end
- β Data integrity: 100 bookings, 0 available tickets
- β API still uses Level 2 synchronous transactions (not migrated yet)
Files Modified/Created:
-
apps/worker/src/processors/bookingProcessor.ts(complete implementation) -
packages/lib/src/queues.ts(retry configuration with env vars) -
packages/database/src/events.ts(version update queries) -
scripts/benchmark-level2-vs-level3.ts(performance comparison)
Validation:
# Test worker processing directly
curl -X POST http://localhost:3000/api/v1/bookings \
-H "Authorization: Bearer $TOKEN" \
-d '{"eventId": "..."}'
# Should still return 201 (still using Level 2)
# Check worker logs
docker compose logs -f worker
# Should show jobs being processed (if any exist)
# Run load test to verify locking
npm run test:load
# Should show exactly 100 bookings
# available_tickets = 0
# Version conflicts < 5%
# (Worker will process Level 2 jobs if queue exists)Objective: Provide real-time status updates to clients via Server-Sent Events, using BullMQ QueueEvents for reliable horizontal scaling.
** Tasks:**
-
SSE Endpoint Setup
// GET /api/v1/bookings/status/:jobId app.get('/api/v1/bookings/status/:jobId', (req, res) => { res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', }); });
-
BullMQ QueueEvents Integration
- Create
apps/api/src/services/notificationService.ts - Use BullMQ's
QueueEvents(NOT raw Redis Pub/Sub):
import { QueueEvents } from 'bullmq'; const queueEvents = new QueueEvents('booking'); queueEvents.on('completed', ({ jobId, returnvalue }) => { // Broadcast to all API instances // Only instance with open SSE connection sends to client }); queueEvents.on('failed', ({ jobId, failedReason }) => { // Handle failure events });
- QueueEvents ensures ALL API instances receive job updates
- Each API instance checks if it holds the SSE connection for that jobId
- Eliminates manual Redis Pub/Sub complexity
- Create
-
Connection Management
// Track active SSE connections per API instance const activeConnections = new Map<string, ServerResponse>(); // On client connect activeConnections.set(jobId, res); // On client disconnect req.on('close', () => { activeConnections.delete(jobId); });
-
Event Flow
Worker β QueueEvents (Redis) β All API Instances β Check if has connection for jobId β Forward to connected client via SSE -
Client Example
- Create
examples/sse-client.html - Demonstrate EventSource API usage
- Show connection, reconnection, and status updates
- Include auth header support
- Create
Expected Output:
- β Client can connect to SSE endpoint
- β Real-time status updates delivered
- β Automatic reconnection on disconnect (EventSource built-in)
- β Multiple clients can listen to same job
- β Works with multiple API instances (QueueEvents broadcasts to all)
- β Client receives missed events if joins late (see Milestone 7)
Files Modified/Created:
-
apps/api/src/routes/booking-status.ts(SSE implementation) -
apps/api/src/services/notificationService.ts(QueueEvents logic) -
examples/sse-client.html(client example) - BullMQ dashboard mounted at
/admin/queues
Validation:
const eventSource = new EventSource('/api/v1/bookings/status/{jobId}');
eventSource.addEventListener('queued', (event) => {
console.log('Status:', JSON.parse(event.data));
});
eventSource.addEventListener('confirmed', (event) => {
console.log('Booking confirmed:', JSON.parse(event.data));
eventSource.close();
});Objective: Handle the race condition where worker finishes before client connects to SSE, ensuring clients always receive final status.
** Tasks:**
-
Add Circuit Breaker for Redis
// packages/lib/src/redis.ts import CircuitBreaker from 'opossum'; const redisCircuitBreaker = new CircuitBreaker( async (operation: () => Promise<any>) => operation(), { timeout: env.CIRCUIT_BREAKER_TIMEOUT, errorThresholdPercentage: env.CIRCUIT_BREAKER_ERROR_THRESHOLD, resetTimeout: env.CIRCUIT_BREAKER_RESET_TIMEOUT, rollingCountTimeout: 10000, rollingCountBuckets: 10, } ); export async function getRedisConnection() { return redisCircuitBreaker.fire(() => { // Redis connection attempt }); }
-
Check State Before Subscribing
// In GET /api/v1/bookings/status/:jobId // 1. Immediately check current job state const job = await bookingQueue.getJob(jobId); if (!job) { res.write(`event: error\ndata: {"message": "Job not found"}\n\n`); return res.end(); } // 2. If already completed, send result immediately if (job.returnvalue) { const result = job.returnvalue; if (result.success) { res.write(`event: confirmed\ndata: ${JSON.stringify(result)}\n\n`); } else { res.write(`event: failed\ndata: ${JSON.stringify(result)}\n\n`); } return res.end(); } // 3. If failed, send failure reason if (job.failedReason) { res.write(`event: failed\ndata: ${JSON.stringify({ error: job.failedReason })}\n\n`); return res.end(); } // 4. Only subscribe if job is still active (waiting/processing) res.write(`event: queued/processing\ndata: ${JSON.stringify({ status: job.status })}\n\n`); // Now subscribe to QueueEvents for updates const queueEvents = new QueueEvents('booking'); const onCompleted = ({ jobId: completedId, returnvalue }) => { if (completedId === jobId) { res.write(`event: confirmed\ndata: ${JSON.stringify(returnvalue)}\n\n`); res.end(); cleanup(); } }; const onFailed = ({ jobId: failedId, failedReason }) => { if (failedId === jobId) { res.write(`event: failed\ndata: ${JSON.stringify({ error: failedReason })}\n\n`); res.end(); cleanup(); } }; queueEvents.on('completed', onCompleted); queueEvents.on('failed', onFailed); // Cleanup on disconnect const cleanup = () => { queueEvents.off('completed', onCompleted); queueEvents.off('failed', onFailed); activeConnections.delete(jobId); }; req.on('close', cleanup);
-
Hard Fail on Redis Unavailability
// In queue service if (redisCircuitBreaker.opened) { throw new AppError( ErrorCode.REDIS_UNAVAILABLE, 'Queue temporarily unavailable' ); }
-
Scenarios Handled
- Fast Worker: Worker finished at t=10ms, client connects at t=50ms β Job state check returns completed β Client receives result immediately
- Normal Case: Worker still processing β Client subscribes β Receives updates via QueueEvents
- Late Join: Client retries after disconnect β State check catches them up
- Job Failed: State check or event notification sends failure reason
- Redis Down: Circuit breaker returns 503 immediately, no hanging
-
Event Types
-
event: queued- Job received, waiting for worker -
event: processing- Worker picked up job (job.started) -
event: confirmed- Booking successful (with bookingId) -
event: failed- Booking failed (reason: sold out, error) -
event: error- System error (job not found, etc.)
-
-
Testing the Race Condition
// Test case: Worker completes in <100ms it('should return confirmed immediately if worker finished early', async () => { // Create job that completes instantly const job = await bookingQueue.add('instant-booking', data); await worker.processJob(job); // Simulate immediate completion // Client connects later await sleep(200); // 200ms delay const response = await request(app) .get(`/api/v1/bookings/status/${job.id}`) .set('Accept', 'text/event-stream'); // Should immediately return confirmed, not hang expect(response.text).toContain('event: confirmed'); });
Expected Output:
- β Client receives status even if worker finished before connection
- β No hanging connections waiting for missed events
- β Works reliably under load with fast workers
- β Scales horizontally (QueueEvents ensures all API instances receive updates)
- β Circuit breaker protects against Redis failures
- β Returns 503 immediately if Redis unavailable (hard fail)
Files Modified/Created:
-
apps/api/src/routes/booking-status.ts(enhanced with state check) -
apps/api/src/services/notificationService.ts(QueueEvents handlers) -
packages/lib/src/redis.ts(add circuit breaker) -
tests/integration/sse-race-condition.test.ts(new test)
Validation:
# Manual test
1. Create booking request β get jobId
2. Check worker logs to see it completed quickly
3. Wait 2-3 seconds
4. Connect to SSE endpoint
5. Should immediately receive 'confirmed' event
# (No hanging, no waiting)
# Test Redis failure
docker compose stop redis
curl -X POST http://localhost:3000/api/v1/bookings \
-H "Authorization: Bearer $TOKEN" \
-d '{"eventId": "..."}'
# Expected: 503 Service Unavailable, not hang
docker compose start redisObjective: Replace FOR UPDATE with optimistic locking using version numbers.
** Tasks:**
-
Worker Processing Logic (in Worker Processor)
async function processBooking(job: Job<BookingJobData>) { const { userId, eventId } = job.data; // Read event WITHOUT locking const events = await sql` SELECT * FROM events WHERE id = ${eventId} `; const event = events[0]; if (!event) { throw new AppError(ErrorCode.EVENT_NOT_FOUND); } // Optimistic update: version must match const currentVersion = event.version; const result = await sql` UPDATE events SET available_tickets = available_tickets - 1, version = version + 1, updated_at = NOW() WHERE id = ${eventId} AND version = ${currentVersion} AND available_tickets > 0 RETURNING id, version, available_tickets `; // Check if update succeeded if (result.count === 0) { // Either version changed (conflict) or sold out // Let BullMQ retry logic handle it throw new AppError(ErrorCode.EVENT_SOLD_OUT_OR_CONFLICT); } // Create booking record const bookingResult = await sql` INSERT INTO bookings (user_id, event_id, status) VALUES (${userId}, ${eventId}, 'CONFIRMED') RETURNING id `; return { success: true, bookingId: bookingResult[0].id, eventId, remainingTickets: result[0].available_tickets }; }
-
Retry Strategy in BullMQ
// In packages/lib/src/queues.ts export const bookingQueue = new Queue('booking', { connection: redis, defaultJobOptions: { attempts: 3, backoff: { type: 'exponential', delay: 100 // Start with 100ms, then 200ms, then 400ms } } });
-
Race Condition Testing
- Run 1000 concurrent requests
- Monitor for "version conflict" errors in failed jobs (expected)
- Verify version conflicts result in retry, not failure
- Verify exactly 100 bookings created
- Verify available_tickets = 0 (not negative)
- Check version numbers: final version should be 100
-
Performance Comparison
- Measure throughput vs Level 2
- Verify <100ms API response time
- Check worker processing time (~200-500ms avg)
- Monitor retry rate (should be <5% under 1000 concurrent)
Expected Output:
- β
No
FOR UPDATEqueries in codebase - β Version checking prevents overbooking
- β Retry logic handles conflicts gracefully
- β Throughput 5-10x higher than Level 2
- β Zero timeout errors
- β Data integrity: 100 bookings, 0 available tickets
Files Modified/Created:
-
apps/worker/src/processors/bookingProcessor.ts(optimistic locking logic) -
packages/lib/src/queues.ts(retry configuration) -
packages/database/src/events.ts(version update queries) -
scripts/benchmark-level2-vs-level3.ts(performance comparison)
Validation:
npm run test:load
# Should show:
# - Zero timeouts (0%)
# - <100ms API response time
# - 100 bookings exactly
# - available_tickets = 0
# - Final version = 100Objective: Complete end-to-end testing, validate no graceful degradation, and optimize performance.
** Tasks:**
-
Load Test Updates for Async Behavior
- Modify
tests/load-test.tsfor async behavior - Flow: POST /book β Get JobID β Connect to SSE β Wait for completion
- Measure API response time (should be <100ms)
- Measure total booking time (API + worker + SSE)
- Test with 1000, 5000, 10000 concurrent requests
- Measure rate limiting effectiveness
- Monitor circuit breaker state changes
- Modify
-
Error Handling Matrix (NO GRACEFUL DEGRADATION)
| Scenario | Worker Behavior | API Response | Client SSE Event | |----------|----------------|--------------|------------------| | Valid booking | Success | 202 Accepted | confirmed | | Event sold out | Fail (no retry) | 202 Accepted | failed (409) | | Invalid eventId | Fail | 202 Accepted | failed (404) | | Version conflict | Retry (max env.WORKER_MAX_RETRIES) | 202 Accepted | confirmed | | Worker crash | Job re-queued | 202 Accepted | processing β confirmed | | Rate limit exceeded | N/A | 429 Too Many Requests | N/A | | Queue depth exceeded | N/A | 503 Queue Full | N/A | | Redis down | Cannot queue job | 503 Service Unavailable (circuit open) | N/A |IMPORTANT - No Graceful Degradation:
- If Redis is down, API returns 503 immediately via circuit breaker
- No fallback to Level 2 synchronous transactions
- Reason: Prevents database overload during Redis failures
- Client responsibility: Retry with exponential backoff
- Monitor Redis health separately
-
Performance Benchmarks
- Level 2 Baseline: 200-300 req/s, 800-1500ms latency, 1-2% timeouts
- Level 3 Target: 2000-5000 req/s, <100ms latency, 0% timeouts
- Worker Processing: 200-500ms avg per job
- SSE Delivery: Near-instant after worker completion
- Rate Limiting: 10 req/min per user enforced
- Queue Depth: <50 avg under 10K requests
- Monitor Redis memory usage and queue depth
- Monitor version conflict rate (<5% target)
- Monitor circuit breaker state changes
-
Configuration Tuning
- Adjust
WORKER_CONCURRENCYbased on CPU cores - Tune
WORKER_MAX_RETRIESif conflict rate >5% - Adjust
REDIS_QUEUE_MAX_DEPTHbased on load tests - Set
CIRCUIT_BREAKER_ERROR_THRESHOLDappropriately - Document production configurations
- Adjust
-
Documentation Updates
- Update
README.mdwith Level 3 architecture - Document monorepo structure (/apps, /packages)
- Explain SSE state-check pattern
- Document "no graceful degradation" decision
- Document rate limiting and circuit breaker rationale
- Create troubleshooting guide
- Add deployment guide for multiple workers
- Update
-
Production Readiness
- Add structured logging (Pino)
- Add metrics collection (queue depth, processing time, conflict rate, circuit breaker state)
- Set up alerts for high queue depth (>1000)
- Set up alerts for high conflict rate (>10%)
- Set up alerts for circuit breaker opening
- Load testing with production-like configuration
- Security audit (rate limiting, circuit breaker)
Expected Output:
- β Zero race conditions detected
- β Throughput 10x improvement over Level 2
- β Zero timeout errors
- β SSE reliably delivers status updates
- β Load tests pass consistently at 10K requests
- β Redis queue depth stays manageable
- β Rate limiting prevents abuse
- β Circuit breaker protects against Redis failures
- β Complete documentation including no-degradation decision
Files Modified/Created:
-
tests/load-test.ts(update for async + SSE + metrics) -
tests/stress-test-10k.ts(new, 10K request test) -
tests/rate-limit.test.ts(new, rate limiting validation) -
docs/level3-performance.md(performance results) -
docs/troubleshooting.md(debugging guide) -
docs/no-degradation-decision.md(explains hard fail rationale) -
docs/configuration-guide.md(environment variable tuning) -
README.md(reorganized for monorepo)
Validation:
# Final validation
npm run test:load
# Expected output:
# π LOAD TEST RESULTS - Level 3 (Queue + Optimistic Locking)
# Total Requests: 10000
# Successful Bookings: 100 (1%)
# Sold Out Rejections: 9800-9900 (98-99%)
# Rate Limited: 0 (0%)
# Timeout Errors: 0 β
# API Avg Response: 45ms β
# Worker Avg Processing: 350ms β
# Race Conditions: 0 β
# Retries (version conflict): < 5% of successful bookings
# Circuit Breaker: Closed (healthy)
# Check queue depth during test
docker compose exec redis redis-cli LLEN "bull:booking:waiting"
# Should remain low (< 50) even under 10K requestsHard Fail Validation:
# Test Redis failure scenario
docker compose stop redis
curl -X POST http://localhost:3000/api/v1/bookings \
-H "Authorization: Bearer $TOKEN" \
-d '{"eventId": "..."}'
# Expected: 503 Service Unavailable immediately
# NOT: synchronous processing fallback or hanging
# Reason: Circuit breaker opens, protects database
docker compose start redis
# Circuit breaker should close after reset timeout| Metric | Level 2 | Level 3 Target | Validation Method |
|---|---|---|---|
| Throughput (req/s) | 200-300 | 5,000-10,000 | Load test (10K req) |
| API Response Time | 800-1500ms | <100ms | Response timing |
| Timeout Rate | 1-2% | 0% | Error counting |
| Race Conditions | 0 | 0 | Database verification |
| Worker Processing | N/A | 200-500ms avg | Job logs |
| Data Integrity | β | β | Booking count vs tickets |
| Scalability | Vertical | Horizontal | Multiple workers |
| Queue Depth | N/A | <50 avg | BullMQ dashboard |
| Job Failure Rate | N/A | <0.1% | BullMQ metrics |
| Version Conflicts | N/A | <5% | Retry count |
| SSE Reliability | N/A | 100% | State check tests |
- BullMQ: Actively maintained, TypeScript support, better performance
- Bull: Legacy, fewer features, slower development
- Decision: Use BullMQ for modern architecture
- BullMQ Requirement: BullMQ is built specifically on ioredis and requires it for connection handling (see docs)
- node-redis: Officially recommended by Redis for new projects, but NOT compatible with BullMQ
- Decision: Use ioredis because BullMQ mandates it - no choice if we want BullMQ
- Separation: API and Worker as separate apps
- Shared code: Database, types, utilities in packages
- Independent scaling: Deploy API and Worker separately
- Clear boundaries: Enforces clean architecture
- Decision: Reorganized from monolith to monorepo (Milestone 0)
- SSE: HTTP-based, simpler implementation, auto-reconnection
- WebSockets: Bidirectional (not needed), more complex
- Decision: SSE fits unidirectional serverβclient updates, scales better
- Optimistic: Higher throughput, scales better, simpler
- Distributed Locks: Redlock complexity, lower performance
- Decision: Optimistic locking sufficient for booking tickets
- Runtime Validation: Enforces contract between API and Worker
- Type Safety: Single source of truth for job data structure
- Prevents Silent Failures: Invalid data caught at boundaries
- Decision: Zod schemas in
packages/typesvalidate producer and consumer
- Max Attempts: 3 tries for version conflicts
- Backoff: Exponential (100ms β 200ms β 400ms)
- Rationale: Balance between success rate and latency
- Decision: If Redis unavailable, return 503. No fallback to Level 2.
-
Rationale:
- Level 2 synchronous transactions would overload database if Redis fails under load
- Fallback logic adds complexity and new failure modes
- Monitoring Redis health separately is more reliable
- Forces infrastructure reliability for Redis
- Client Handling: Return 503 β client retries with exponential backoff
- Alternative: Use circuit breaker to return 503 immediately, not hang
/apps
/api
/src/routes # HTTP endpoints
/src/services # Queue producers, business logic
/src/middleware # Auth, rate limiting
/src/lib # App-specific utilities
dist/ # Compiled output
package.json
tsconfig.json
/worker
/src/processors # Queue consumers
/src/lib # Worker-specific utilities
dist/
package.json
tsconfig.json
/packages
/database
/src # PostgreSQL client, migrations
package.json # Exports db client, schemas
/types
/src # Zod schemas, TypeScript interfaces
package.json # Exports all schemas and types
/lib
/src # Redis client, queues, errors, auth
package.json # Shared utilities used by both apps
# Root
turbo.json # Build pipeline definition
package.json # Workspace root
docker-compose.yml
Dockerfile
| Risk | Probability | Impact | Mitigation |
|---|---|---|---|
| Redis becomes bottleneck | Medium | High | Add Redis clustering, increase workers |
| Worker crashes lose jobs | Low | Medium | BullMQ persistence, automatic retry |
| SSE connections overload | Low | Medium | Connection pooling, TTL on jobs |
| Version conflicts too high | Low | Medium | Tune retry strategy, measure conflict rate |
| Increased complexity | High | Medium | Comprehensive testing, documentation |
| Monorepo build issues | Low | Low | Use Turborepo, clear package boundaries |
| BullMQ dashboard security | Medium | High | Add auth middleware to /admin/queues |
| "Fast Worker" race condition | High | High | State check pattern (Milestone 7) |
| Type mismatches APIβWorker | Medium | High | Zod schema validation (Milestone 3) |
| Redis failure causes downtime | Low | High | Hard fail (503) + monitoring + alerts |
Each milestone must include:
- README update with new architecture diagrams
- Code comments explaining Level 3 patterns (use template below)
- API Documentation for new endpoints and SSE usage
- Migration guide for Level 2 β Level 3
- Testing guide with expected outputs
- Performance results comparison Level 2 vs Level 3
Example Comment Template:
/**
* Level 3 Implementation: Async Queue-Based Processing with SSE
*
* Architecture: /apps/api, /apps/worker, /packages/*
*
* How it works:
* 1. API receives POST /book, validates with Zod β returns 202 + jobId
* 2. BullMQ stores job in Redis, dashboard tracks status
* 3. Worker pulls job, validates with Zod, uses optimistic locking
* 4. Version conflict β retry (max 3, exponential backoff)
* 5. Success/failure published via QueueEvents β SSE notifies client
* 6. Fast worker handled: state check before subscribe
*
* Trade-offs:
* β
Pros: <100ms API, 10K+ req/s, horizontal scaling, zero timeouts
* β οΈ Cons: Eventual consistency, added complexity, Redis dependency
*
* Why monorepo: Clean separation, independent scaling, shared packages
* Why Zod schemas: Runtime validation prevents API-Worker contract breaches
* Why QueueEvents: Built-in horizontal scaling, replaces raw Redis Pub/Sub
* Why no degradation: Hard fail safer than database overload fallback
* Why BullMQ dashboard: Real-time queue monitoring essential at scale
*
* Service Boundary: Zod schema is the contract. Always validate.
*/- Milestone 0 complete: Monorepo structure (/apps, /packages) verified
- All packages build independently:
turbo run build - Docker Compose starts all services: API, Worker, Redis, PostgreSQL
- Hot-reload works for development
- POST /api/v1/bookings returns 202 Accepted with jobId (<100ms)
- GET /api/v1/bookings/status/:jobId supports SSE with proper headers
- SSE handles "Fast Worker" race condition (check state before subscribe)
- Worker processes jobs with optimistic locking (no FOR UPDATE)
- Version conflicts retry automatically (max 3, exponential backoff)
- QueueEvents used for horizontal scaling (not raw Redis Pub/Sub)
- BullMQ dashboard functional at /admin/queues with auth
- Load test achieves zero overbookings in 10,000+ concurrent requests
- Exactly 100 bookings for 100 ticket event
- available_tickets never negative
- Final version equals number of bookings
- Zod schemas in packages/types validate all job data
- API validates before queueing (producer validation)
- Worker validates before processing (consumer validation)
- Invalid data rejected at boundaries with clear errors
- Zero timeout errors at 10,000 concurrent requests
- API response time <100ms (p95, p99)
- Worker processes 5+ jobs concurrently per instance
- Queue depth remains <50 under load
- Worker processing time 200-500ms average
- Worker scales horizontally (tested with 5+ instances)
- Jobs persist across worker restarts
- Graceful shutdown completes current jobs
- No graceful degradation: Returns 503 if Redis unavailable
- README.md updated with monorepo structure
- API documentation includes SSE usage examples
- Migration guide: Level 2 β Level 3
- Troubleshooting guide covers common issues
- Performance benchmarks documented (Level 2 vs Level 3)
- docs/no-degradation-decision.md explains hard fail rationale
- Unit tests for queue producers and consumers
- Integration test: API β Queue β Worker β SSE
- Load test validates 10K concurrent requests
- Test for "Fast Worker" race condition
- Test Redis failure returns 503 (not fallback)
- Test version conflict triggers retry
Before Starting:
- Review this plan thoroughly
- Create development branch:
git checkout -b level-3-implementation - Ensure Level 2 load tests pass on main:
npm run test:load - Backup database or snapshot current state
Phase 1 - Foundation:
- Milestone 0: Restructure to monorepo
- Verify existing tests still pass
- Commit: "Milestone 0: Monorepo restructure complete"
Phase 2 - Infrastructure:
- Milestone 1: Redis & BullMQ setup
- Verify Redis connection and health
- Commit: "Milestone 1: Redis infrastructure"
Phase 3 - Schema:
- Milestone 2: Add version column
- Run migration, verify data
- Commit: "Milestone 2: Event versioning"
Phase 4 - Queue:
- Milestone 3: Create Zod schemas and queues
- Test job creation and validation
- Commit: "Milestone 3: Queue architecture with Zod contracts"
Phase 5 - Worker:
- Milestone 4: Create worker service
- Test worker processes jobs
- Commit: "Milestone 4: Worker service"
Phase 6 - API Migration:
- Milestone 5: Migrate booking endpoint to async
- Test 202 response and job creation
- Commit: "Milestone 5: Async booking endpoint"
Phase 7 - SSE:
- Milestone 6: SSE with QueueEvents
- Test QueueEvents broadcast
- Commit: "Milestone 6: SSE implementation"
Phase 8 - Race Condition:
- Milestone 7: Add state check pattern
- Test "Fast Worker" scenario
- Commit: "Milestone 7: Robust SSE race condition fix"
Phase 9 - Locking:
- Milestone 8: Implement optimistic locking
- Remove FOR UPDATE queries
- Test version conflicts trigger retry
- Commit: "Milestone 8: Optimistic locking"
Phase 10 - Validation:
- Milestone 9: Full integration testing
- Run 10K load test
- Document performance results
- Test Redis failure returns 503
- Commit: "Milestone 9: Integration & performance validation"
Final Steps:
- Complete all documentation
- Code review (check for any remaining FOR UPDATE)
- Security audit (BullMQ dashboard auth)
- Merge to main with PR description linking to this plan
- Tag release: v3.0.0
- Skipping Milestone 0: Don't add Redis to monolithic structure. Restructure first.
- Removing FOR UPDATE too early: Keep it until optimistic locking is fully tested (Milestone 5 FIRST)
- Wrong milestone order: Implement optimistic locking (Milestone 5) BEFORE API migration (Milestone 6)
- Raw Redis Pub/Sub: Use BullMQ QueueEvents for horizontal scaling
- Missing Zod validation: Always validate at both producer and consumer
- No SSE state check: Without it, "Fast Worker" race condition causes lost updates
- Implementing graceful degradation: Hard fail is safer and simpler for Level 3
- Forgetting auth on BullMQ dashboard: Exposes sensitive queue data
- Not testing with multiple workers: Single worker hides concurrency bugs
- Ignoring version conflict rate: Should be <5%, tune retry if higher
- No circuit breaker: Redis failures cause cascading timeouts
- No rate limiting: Abuse vectors and queue overflow risks
- Dashboard in API: Security risk and violates separation of concerns
- Hardcoded retry config: Production tuning requires code changes
- Incomplete cleanup: SSE connections leak memory without disconnect handlers
Objective: Create a separate dashboard service for monitoring queues, decoupled from API service. For production, dashboard should be disabled or heavily secured.
** Tasks:**
-
Create Dashboard Service
- Create
apps/dashboard/src/index.ts(dashboard entry point) - Mount BullMQ dashboard at root path
/ - Add authentication middleware (require admin role)
- Use shared Redis connection from
packages/lib
- Create
-
Docker Service Setup
# Add to compose.yaml dashboard: build: context: . target: development ports: - "3001:3001" # Separate port, not exposed publicly environment: PORT: 3001 REDIS_HOST: redis # Same Redis config as API/Worker depends_on: - redis profiles: - monitoring # Only start when explicitly requested
-
Security Considerations
- Dashboard ONLY starts with
--profile monitoringflag:
docker compose --profile monitoring up dashboard
- In production: Remove dashboard from compose entirely OR add VPN-only access
- Alternative: Use external monitoring (DataDog, New Relic) instead of dashboard
- Exposing queue data publicly is a security risk
- Dashboard ONLY starts with
-
Authentication Required
// apps/dashboard/src/middleware/require-admin.ts app.use('/admin', verifyJWT, requireAdmin);
Expected Output:
- β
Dashboard accessible at
http://localhost:3001(when started) - β Shows queue depth, job status, processing times
- β Requires admin authentication
- β Does NOT start by default (opt-in with profile)
- β API service does NOT mount dashboard
Files Modified/Created:
-
apps/dashboard/src/index.ts(dashboard entry) -
apps/dashboard/package.json(new) -
compose.yaml(add dashboard service with profile) -
apps/dashboard/src/middleware/require-admin.ts(auth)
Validation:
# Start dashboard explicitly
docker compose --profile monitoring up -d dashboard
# Dashboard should be running
curl http://localhost:3001
# Should show BullMQ dashboard (requires auth)
# Verify API does NOT have dashboard
curl http://localhost:3000/admin/queues
# Should return 404 Not Found
# Stop monitoring services
docker compose --profile monitoring downProduction Recommendation:
# In production compose override, remove dashboard entirely
# Or restrict to internal network only
services:
dashboard:
profiles:
- never # Never start in production
# Or use internal network:
networks:
- internal
ports: [] # No external portsIf you get stuck:
- Check
docs/SPECS.md- canonical requirements source - Review
AGENTS.md- project context and patterns - Check milestone-specific validation steps
- Run tests after each milestone (don't skip verification)
- Document any deviations from this plan
Key Files Reference:
- Queue config:
packages/lib/src/queues.ts - Zod schemas:
packages/types/src/ - SSE logic:
apps/api/src/routes/booking-status.ts - Worker processor:
apps/worker/src/processors/bookingProcessor.ts - No degradation doc:
docs/no-degradation-decision.md
Plan last updated: 2025-11-26 Version: 3.0 (incorporates all recommended changes)