@@ -206,7 +206,7 @@ class Worker<TCatalog extends WorkerCatalog> {
206206
207207 // Launch a number of "worker loops" on the main thread.
208208 for ( let i = 0 ; i < workers ; i ++ ) {
209- this . workerLoops . push ( this . runWorkerLoop ( `worker-${ nanoid ( 12 ) } ` , tasksPerWorker ) ) ;
209+ this . workerLoops . push ( this . runWorkerLoop ( `worker-${ nanoid ( 12 ) } ` , tasksPerWorker , i , workers ) ) ;
210210 }
211211
212212 this . setupShutdownHandlers ( ) ;
@@ -390,14 +390,43 @@ class Worker<TCatalog extends WorkerCatalog> {
390390 * The main loop that each worker runs. It repeatedly polls for items,
391391 * processes them, and then waits before the next iteration.
392392 */
393- private async runWorkerLoop ( workerId : string , taskCount : number ) : Promise < void > {
393+ private async runWorkerLoop (
394+ workerId : string ,
395+ taskCount : number ,
396+ workerIndex : number ,
397+ totalWorkers : number
398+ ) : Promise < void > {
394399 const pollIntervalMs = this . options . pollIntervalMs ?? 1000 ;
395400 const immediatePollIntervalMs = this . options . immediatePollIntervalMs ?? 100 ;
396401
402+ // Calculate the delay between starting each worker loop so that they don't all start at the same time.
403+ const delayBetweenWorkers = this . options . pollIntervalMs ?? 1000 ;
404+ const delay = delayBetweenWorkers * ( totalWorkers - workerIndex ) ;
405+ await Worker . delay ( delay ) ;
406+
407+ this . logger . info ( "Starting worker loop" , {
408+ workerIndex,
409+ totalWorkers,
410+ delay,
411+ workerId,
412+ taskCount,
413+ pollIntervalMs,
414+ immediatePollIntervalMs,
415+ concurrencyOptions : this . concurrency ,
416+ } ) ;
417+
397418 while ( ! this . isShuttingDown ) {
398419 // Check overall load. If at capacity, wait a bit before trying to dequeue more.
399420 if ( this . limiter . activeCount + this . limiter . pendingCount >= this . concurrency . limit ) {
421+ this . logger . debug ( "Worker at capacity, waiting" , {
422+ workerId,
423+ concurrencyOptions : this . concurrency ,
424+ activeCount : this . limiter . activeCount ,
425+ pendingCount : this . limiter . pendingCount ,
426+ } ) ;
427+
400428 await Worker . delay ( pollIntervalMs ) ;
429+
401430 continue ;
402431 }
403432
@@ -412,10 +441,25 @@ class Worker<TCatalog extends WorkerCatalog> {
412441 ) ;
413442
414443 if ( items . length === 0 ) {
444+ this . logger . debug ( "No items to dequeue" , {
445+ workerId,
446+ concurrencyOptions : this . concurrency ,
447+ activeCount : this . limiter . activeCount ,
448+ pendingCount : this . limiter . pendingCount ,
449+ } ) ;
450+
415451 await Worker . delay ( pollIntervalMs ) ;
416452 continue ;
417453 }
418454
455+ this . logger . info ( "Dequeued items" , {
456+ workerId,
457+ itemCount : items . length ,
458+ concurrencyOptions : this . concurrency ,
459+ activeCount : this . limiter . activeCount ,
460+ pendingCount : this . limiter . pendingCount ,
461+ } ) ;
462+
419463 // Schedule each item using the limiter.
420464 for ( const item of items ) {
421465 this . limiter ( ( ) => this . processItem ( item as AnyQueueItem , items . length , workerId ) ) . catch (
@@ -433,6 +477,8 @@ class Worker<TCatalog extends WorkerCatalog> {
433477 // Wait briefly before immediately polling again since we processed items
434478 await Worker . delay ( immediatePollIntervalMs ) ;
435479 }
480+
481+ this . logger . info ( "Worker loop finished" , { workerId } ) ;
436482 }
437483
438484 /**
0 commit comments