@@ -43,7 +43,7 @@ import {
4343 MARQS_RETRY_PRIORITY_TIMESTAMP_OFFSET ,
4444 MARQS_SCHEDULED_REQUEUE_AVAILABLE_AT_THRESHOLD_IN_MS ,
4545} from "./constants.server" ;
46- import { setInterval } from "node:timers/promises" ;
46+ import { setInterval as setIntervalAsync } from "node:timers/promises" ;
4747import { tryCatch } from "@trigger.dev/core/utils" ;
4848import { Worker , type WorkerConcurrencyOptions } from "@trigger.dev/redis-worker" ;
4949import z from "zod" ;
@@ -78,6 +78,8 @@ export type MarQSOptions = {
7878 subscriber ?: MessageQueueSubscriber ;
7979 sharedWorkerQueueConsumerIntervalMs ?: number ;
8080 sharedWorkerQueueMaxMessageCount ?: number ;
81+ sharedWorkerQueueCooloffPeriodMs ?: number ;
82+ sharedWorkerQueueCooloffCountThreshold ?: number ;
8183 eagerDequeuingEnabled ?: boolean ;
8284 workerOptions : {
8385 pollIntervalMs ?: number ;
@@ -107,6 +109,9 @@ export class MarQS {
107109 public keys : MarQSKeyProducer ;
108110 #rebalanceWorkers: Array < AsyncWorker > = [ ] ;
109111 private worker : Worker < typeof workerCatalog > ;
112+ private queueDequeueCooloffPeriod : Map < string , number > = new Map ( ) ;
113+ private queueDequeueCooloffCounts : Map < string , number > = new Map ( ) ;
114+ private clearCooloffPeriodInterval : NodeJS . Timeout ;
110115
111116 constructor ( private readonly options : MarQSOptions ) {
112117 this . redis = options . redis ;
@@ -116,6 +121,12 @@ export class MarQS {
116121 this . #startRebalanceWorkers( ) ;
117122 this . #registerCommands( ) ;
118123
124+ // This will prevent these cooloff maps from growing indefinitely
125+ this . clearCooloffPeriodInterval = setInterval ( ( ) => {
126+ this . queueDequeueCooloffCounts . clear ( ) ;
127+ this . queueDequeueCooloffPeriod . clear ( ) ;
128+ } , 60_000 * 10 ) ; // 10 minutes
129+
119130 this . worker = new Worker ( {
120131 name : "marqs-worker" ,
121132 redisOptions : options . workerOptions . redisOptions ,
@@ -737,7 +748,7 @@ export class MarQS {
737748 let processedCount = 0 ;
738749
739750 try {
740- for await ( const _ of setInterval (
751+ for await ( const _ of setIntervalAsync (
741752 this . options . sharedWorkerQueueConsumerIntervalMs ?? 500 ,
742753 null ,
743754 {
@@ -821,6 +832,7 @@ export class MarQS {
821832 let attemptedEnvs = 0 ;
822833 let attemptedQueues = 0 ;
823834 let messageCount = 0 ;
835+ let coolOffPeriodCount = 0 ;
824836
825837 // Try each queue in order, attempt to dequeue a message from each queue, keep going until we've tried all the queues
826838 for ( const env of envQueues ) {
@@ -829,6 +841,20 @@ export class MarQS {
829841 for ( const messageQueue of env . queues ) {
830842 attemptedQueues ++ ;
831843
844+ const cooloffPeriod = this . queueDequeueCooloffPeriod . get ( messageQueue ) ;
845+
846+ // If the queue is in a cooloff period, skip attempting to dequeue from it
847+ if ( cooloffPeriod ) {
848+ // If the cooloff period is still active, skip attempting to dequeue from it
849+ if ( cooloffPeriod > Date . now ( ) ) {
850+ coolOffPeriodCount ++ ;
851+ continue ;
852+ } else {
853+ // If the cooloff period is over, delete the cooloff period and attempt to dequeue from the queue
854+ this . queueDequeueCooloffPeriod . delete ( messageQueue ) ;
855+ }
856+ }
857+
832858 await this . #trace(
833859 "attemptDequeue" ,
834860 async ( attemptDequeueSpan ) => {
@@ -862,10 +888,32 @@ export class MarQS {
862888 ) ;
863889
864890 if ( ! messages || messages . length === 0 ) {
891+ const cooloffCount = this . queueDequeueCooloffCounts . get ( messageQueue ) ?? 0 ;
892+
893+ const cooloffCountThreshold = Math . max (
894+ 10 ,
895+ this . options . sharedWorkerQueueCooloffCountThreshold ?? 10
896+ ) ; // minimum of 10
897+
898+ if ( cooloffCount >= cooloffCountThreshold ) {
899+ // If no messages were dequeued, set a cooloff period for the queue
900+ // This is to prevent the queue from being dequeued too frequently
901+ // and to give other queues a chance to dequeue messages more frequently
902+ this . queueDequeueCooloffPeriod . set (
903+ messageQueue ,
904+ Date . now ( ) + ( this . options . sharedWorkerQueueCooloffPeriodMs ?? 10_000 ) // defaults to 10 seconds
905+ ) ;
906+ this . queueDequeueCooloffCounts . delete ( messageQueue ) ;
907+ } else {
908+ this . queueDequeueCooloffCounts . set ( messageQueue , cooloffCount + 1 ) ;
909+ }
910+
865911 attemptDequeueSpan . setAttribute ( "message_count" , 0 ) ;
866912 return null ; // Try next queue if no message was dequeued
867913 }
868914
915+ this . queueDequeueCooloffCounts . delete ( messageQueue ) ;
916+
869917 messageCount += messages . length ;
870918
871919 attemptDequeueSpan . setAttribute ( "message_count" , messages . length ) ;
@@ -916,6 +964,7 @@ export class MarQS {
916964 span . setAttribute ( "attempted_queues" , attemptedQueues ) ;
917965 span . setAttribute ( "attempted_envs" , attemptedEnvs ) ;
918966 span . setAttribute ( "message_count" , messageCount ) ;
967+ span . setAttribute ( "cooloff_period_count" , coolOffPeriodCount ) ;
919968
920969 return ;
921970 } ,
0 commit comments