@@ -3,12 +3,19 @@ import { Worker, WorkerOptions } from 'bullmq';
33import { environmentConfig } from '../config/environment' ;
44import { SubmissionModel } from '../models/submission.model' ;
55import { SecureDockerExecutor } from '../executors/secureDockerExecutor' ;
6+ import { getPerformanceMonitor } from '../services/performanceMonitor' ;
67import { logger } from '../utils/logger' ;
78
89const config = environmentConfig . getConfig ( ) ;
910
10- // Initialize the secure executor
11+ // Enhanced error type for job processing
12+ interface ProcessingError extends Error {
13+ shouldRetry ?: boolean ;
14+ }
15+
16+ // Initialize the secure executor and performance monitor
1117const executor = new SecureDockerExecutor ( ) ;
18+ const performanceMonitor = getPerformanceMonitor ( executor ) ;
1219
1320// Enhanced worker configuration for high throughput
1421const workerConfig : WorkerOptions = {
@@ -27,15 +34,21 @@ const workerConfig: WorkerOptions = {
2734 family : 4 ,
2835 connectTimeout : 10000 ,
2936 commandTimeout : 5000 ,
37+ // Connection pooling for better performance
38+ db : 0 ,
39+ keepAlive : 30000 ,
3040 } ,
31- concurrency : 10 , // Process up to 10 jobs concurrently per worker
41+ concurrency : parseInt ( process . env . WORKER_CONCURRENCY || '20' ) , // Increased from 10
3242 removeOnComplete : { count : 100 } ,
3343 removeOnFail : { count : 50 } ,
44+ maxStalledCount : 1 , // Reduce stalled job retries
45+ stalledInterval : 30000 , // Check for stalled jobs every 30s
3446} ;
3547
3648// Processing function that can be reused
3749const processSubmission = async ( submissionId : string ) => {
3850 logger . info ( `Processing submission: ${ submissionId } ` ) ;
51+ const startTime = Date . now ( ) ;
3952
4053 try {
4154 const submission = await SubmissionModel . findById ( submissionId ) ;
@@ -64,6 +77,10 @@ const processSubmission = async (submissionId: string) => {
6477 memoryLimit : config . executor . memoryLimitMb ,
6578 } ) ;
6679
80+ // Record execution metrics for performance monitoring
81+ const executionTime = Date . now ( ) - startTime ;
82+ performanceMonitor . recordExecution ( result . success , executionTime ) ;
83+
6784 // Update submission with results
6885 submission . status = result . success ? 'completed' : 'failed' ;
6986 if ( ! submission . result ) {
@@ -100,8 +117,28 @@ const processSubmission = async (submissionId: string) => {
100117 ) ;
101118 return { submissionId, success : result . success } ;
102119 } catch ( error ) {
120+ const executionTime = Date . now ( ) - startTime ;
121+ performanceMonitor . recordExecution ( false , executionTime ) ;
122+
103123 logger . error ( `Failed to process submission ${ submissionId } :` , error ) ;
104124
125+ // Enhanced error handling for different error types
126+ let errorMessage = 'Unknown error' ;
127+ let shouldRetry = true ;
128+
129+ if ( error instanceof Error ) {
130+ errorMessage = error . message ;
131+
132+ // Don't retry certain types of errors
133+ if (
134+ error . message . includes ( 'Concurrent execution limit reached' ) ||
135+ error . message . includes ( 'Docker' ) ||
136+ error . message . includes ( 'timeout' )
137+ ) {
138+ shouldRetry = false ;
139+ }
140+ }
141+
105142 try {
106143 const submission = await SubmissionModel . findById ( submissionId ) ;
107144 if ( submission ) {
@@ -110,16 +147,26 @@ const processSubmission = async (submissionId: string) => {
110147 submission . result = {
111148 success : false ,
112149 output : '' ,
113- error : error instanceof Error ? error . message : 'Unknown error' ,
114- executionTime : 0 ,
150+ error : errorMessage ,
151+ executionTime : executionTime ,
115152 memoryUsed : 0 ,
116153 testCasesPassed : 0 ,
117154 totalTestCases : 0 ,
118155 } ;
119156 } else {
120- submission . result . error =
121- error instanceof Error ? error . message : 'Unknown error' ;
157+ submission . result . error = errorMessage ;
158+ submission . result . executionTime = executionTime ;
122159 }
160+
161+ if ( ! submission . timing ) {
162+ submission . timing = {
163+ submittedAt : new Date ( ) ,
164+ completedAt : new Date ( ) ,
165+ } ;
166+ } else {
167+ submission . timing . completedAt = new Date ( ) ;
168+ }
169+
123170 await submission . save ( ) ;
124171 }
125172 } catch ( updateError ) {
@@ -129,7 +176,10 @@ const processSubmission = async (submissionId: string) => {
129176 ) ;
130177 }
131178
132- throw error ;
179+ // Throw with retry flag for job queue handling
180+ const enhancedError = new Error ( errorMessage ) as ProcessingError ;
181+ enhancedError . shouldRetry = shouldRetry ;
182+ throw enhancedError ;
133183 }
134184} ;
135185
@@ -164,7 +214,13 @@ const batchWorker = new Worker(
164214 const { submissionIds } = job . data ;
165215 logger . info ( `Processing batch of ${ submissionIds . length } submissions` ) ;
166216
167- const results = [ ] ;
217+ // Define proper return type for results
218+ const results : Array < {
219+ submissionId : string ;
220+ success : boolean ;
221+ error ?: string ;
222+ } > = [ ] ;
223+
168224 for ( const submissionId of submissionIds ) {
169225 try {
170226 const result = await processSubmission ( submissionId ) ;
@@ -185,9 +241,40 @@ const batchWorker = new Worker(
185241 }
186242) ;
187243
188- // Error handlers
189- submissionWorker . on ( 'failed' , ( job , err ) => {
244+ // Error handlers with circuit breaker logic
245+ let consecutiveFailures = 0 ;
246+ const MAX_CONSECUTIVE_FAILURES = 10 ;
247+ let circuitBreakerOpen = false ;
248+
249+ submissionWorker . on ( 'failed' , async ( job , err ) => {
250+ consecutiveFailures ++ ;
190251 logger . error ( `Submission job ${ job ?. id } failed:` , err ) ;
252+
253+ // Open circuit breaker if too many failures
254+ if ( consecutiveFailures >= MAX_CONSECUTIVE_FAILURES && ! circuitBreakerOpen ) {
255+ circuitBreakerOpen = true ;
256+ logger . error (
257+ `Circuit breaker opened due to ${ consecutiveFailures } consecutive failures`
258+ ) ;
259+
260+ // Pause worker for 30 seconds
261+ await submissionWorker . pause ( ) ;
262+ setTimeout ( async ( ) => {
263+ try {
264+ await submissionWorker . resume ( ) ;
265+ consecutiveFailures = 0 ;
266+ circuitBreakerOpen = false ;
267+ logger . info ( 'Circuit breaker closed, resuming operations' ) ;
268+ } catch ( error ) {
269+ logger . error ( 'Failed to resume worker:' , error ) ;
270+ }
271+ } , 30000 ) ;
272+ }
273+ } ) ;
274+
275+ submissionWorker . on ( 'completed' , ( job ) => {
276+ consecutiveFailures = 0 ; // Reset failure count on success
277+ logger . info ( `Submission job ${ job . id } completed successfully` ) ;
191278} ) ;
192279
193280priorityWorker . on ( 'failed' , ( job , err ) => {
@@ -199,10 +286,36 @@ batchWorker.on('failed', (job, err) => {
199286} ) ;
200287
201288// Success handlers
202- submissionWorker . on ( 'completed' , ( job ) => {
203- logger . info ( `Submission job ${ job . id } completed successfully` ) ;
289+ priorityWorker . on ( 'completed' , ( job ) => {
290+ logger . info ( `Priority job ${ job . id } completed successfully` ) ;
204291} ) ;
205292
293+ batchWorker . on ( 'completed' , ( job ) => {
294+ logger . info ( `Batch job ${ job . id } completed successfully` ) ;
295+ } ) ;
296+
297+ // Health monitoring and load management
298+ setInterval ( async ( ) => {
299+ try {
300+ const health = await performanceMonitor . getHealthStatus ( ) ;
301+
302+ if ( health . status === 'critical' ) {
303+ logger . warn ( 'System under critical load - monitoring worker performance' ) ;
304+
305+ // Log current metrics for debugging
306+ const metrics = performanceMonitor . getMetrics ( ) ;
307+ logger . warn ( 'Current metrics:' , {
308+ totalExecutions : metrics . totalExecutions ,
309+ successRate : metrics . successRate ,
310+ currentLoad : metrics . currentLoad ,
311+ containerCount : metrics . containerCount ,
312+ } ) ;
313+ }
314+ } catch ( error ) {
315+ logger . error ( 'Health check failed in worker:' , error ) ;
316+ }
317+ } , 60000 ) ; // Check every minute
318+
206319logger . info ( '👷 Enhanced workers running with high concurrency support...' ) ;
207320logger . info (
208321 `Workers configuration: concurrency=${ workerConfig . concurrency } , removeOnComplete=${ workerConfig . removeOnComplete } `
0 commit comments