@@ -30,6 +30,7 @@ import {
3030 type Result ,
3131} from "@internal/redis" ;
3232import { MessageNotFoundError } from "./errors.js" ;
33+ import { tryCatch } from "@trigger.dev/core" ;
3334
3435const SemanticAttributes = {
3536 QUEUE : "runqueue.queue" ,
@@ -51,7 +52,6 @@ export type RunQueueOptions = {
5152 verbose ?: boolean ;
5253 logger ?: Logger ;
5354 retryOptions ?: RetryOptions ;
54- maxDequeueLoopAttempts ?: number ;
5555} ;
5656
5757type DequeuedMessage = {
@@ -78,7 +78,6 @@ export class RunQueue {
7878 private redis : Redis ;
7979 public keys : RunQueueKeyProducer ;
8080 private queueSelectionStrategy : RunQueueSelectionStrategy ;
81- private maxDequeueLoopAttempts : number ;
8281
8382 constructor ( private readonly options : RunQueueOptions ) {
8483 this . retryOptions = options . retryOptions ?? defaultRetrySettings ;
@@ -94,7 +93,6 @@ export class RunQueue {
9493
9594 this . keys = options . keys ;
9695 this . queueSelectionStrategy = options . queueSelectionStrategy ;
97- this . maxDequeueLoopAttempts = options . maxDequeueLoopAttempts ?? 10 ;
9896
9997 this . subscriber = createRedisClient ( options . redis , {
10098 onError : ( error ) => {
@@ -396,55 +394,45 @@ export class RunQueue {
396394
397395 let attemptedEnvs = 0 ;
398396 let attemptedQueues = 0 ;
399- let dequeueLoopAttempts = 0 ;
400397
401398 const messages : DequeuedMessage [ ] = [ ] ;
402399
403- // Each env starts with its list of candidate queues
404- const tenantQueues : Record < string , string [ ] > = { } ;
405-
406- // Initialize tenantQueues with the queues for each env
407400 for ( const env of envQueues ) {
408- tenantQueues [ env . envId ] = [ ...env . queues ] ; // Create a copy of the queues array
409- }
410-
411- // Continue until we've hit max count or all tenants have empty queue lists
412- while (
413- messages . length < maxCount &&
414- Object . values ( tenantQueues ) . some ( ( queues ) => queues . length > 0 ) &&
415- dequeueLoopAttempts < this . maxDequeueLoopAttempts
416- ) {
417- dequeueLoopAttempts ++ ;
401+ attemptedEnvs ++ ;
418402
419- for ( const env of envQueues ) {
420- attemptedEnvs ++ ;
421-
422- // Skip if this tenant has no more queues
423- if ( tenantQueues [ env . envId ] . length === 0 ) {
424- continue ;
425- }
426-
427- // Pop the next queue (using round-robin order)
428- const queue = tenantQueues [ env . envId ] . shift ( ) ! ;
403+ for ( const queue of env . queues ) {
429404 attemptedQueues ++ ;
430405
431406 // Attempt to dequeue from this queue
432- const message = await this . #callDequeueMessage( {
433- messageQueue : queue ,
434- } ) ;
407+ const [ error , message ] = await tryCatch (
408+ this . #callDequeueMessage( {
409+ messageQueue : queue ,
410+ } )
411+ ) ;
412+
413+ if ( error ) {
414+ this . logger . error (
415+ `[dequeueMessageInSharedQueue][${ this . name } ] Failed to dequeue from queue ${ queue } ` ,
416+ {
417+ error,
418+ }
419+ ) ;
420+ }
435421
436422 if ( message ) {
437423 messages . push ( message ) ;
438- // Re-add this queue at the end, since it might have more messages
439- tenantQueues [ env . envId ] . push ( queue ) ;
440424 }
441- // If message is null, do not re-add the queue in this cycle
442425
443- // If we've reached maxCount, break out of the loop
426+ // If we've reached maxCount, we don't want to look at this env anymore
444427 if ( messages . length >= maxCount ) {
445428 break ;
446429 }
447430 }
431+
432+ // If we've reached maxCount, we're completely done
433+ if ( messages . length >= maxCount ) {
434+ break ;
435+ }
448436 }
449437
450438 span . setAttributes ( {
0 commit comments