@@ -44,6 +44,9 @@ import {
4444} from "./constants.server" ;
4545import { setInterval } from "node:timers/promises" ;
4646import { tryCatch } from "@trigger.dev/core/utils" ;
47+ import { Worker , type WorkerConcurrencyOptions } from "@trigger.dev/redis-worker" ;
48+ import z from "zod" ;
49+ import { Logger } from "@trigger.dev/core/logger" ;
4750
4851const KEY_PREFIX = "marqs:" ;
4952
@@ -74,6 +77,24 @@ export type MarQSOptions = {
7477 subscriber ?: MessageQueueSubscriber ;
7578 sharedWorkerQueueConsumerIntervalMs ?: number ;
7679 sharedWorkerQueueMaxMessageCount ?: number ;
80+ eagerDequeuingEnabled ?: boolean ;
81+ workerOptions : {
82+ pollIntervalMs ?: number ;
83+ immediatePollIntervalMs ?: number ;
84+ shutdownTimeoutMs ?: number ;
85+ concurrency ?: WorkerConcurrencyOptions ;
86+ enabled ?: boolean ;
87+ } ;
88+ } ;
89+
90+ const workerCatalog = {
91+ processQueueForWorkerQueue : {
92+ schema : z . object ( {
93+ queueKey : z . string ( ) ,
94+ parentQueueKey : z . string ( ) ,
95+ } ) ,
96+ visibilityTimeoutMs : 30_000 ,
97+ } ,
7798} ;
7899
79100/**
@@ -83,6 +104,7 @@ export class MarQS {
83104 private redis : Redis ;
84105 public keys : MarQSKeyProducer ;
85106 #rebalanceWorkers: Array < AsyncWorker > = [ ] ;
107+ private worker : Worker < typeof workerCatalog > ;
86108
87109 constructor ( private readonly options : MarQSOptions ) {
88110 this . redis = options . redis ;
@@ -91,6 +113,29 @@ export class MarQS {
91113
92114 this . #startRebalanceWorkers( ) ;
93115 this . #registerCommands( ) ;
116+
117+ this . worker = new Worker ( {
118+ name : "marqs-worker" ,
119+ redisOptions : {
120+ ...options . redis . options ,
121+ keyPrefix : `${ options . redis . options . keyPrefix } :worker` ,
122+ } ,
123+ catalog : workerCatalog ,
124+ concurrency : options . workerOptions ?. concurrency ,
125+ pollIntervalMs : options . workerOptions ?. pollIntervalMs ?? 1000 ,
126+ immediatePollIntervalMs : options . workerOptions ?. immediatePollIntervalMs ?? 100 ,
127+ shutdownTimeoutMs : options . workerOptions ?. shutdownTimeoutMs ?? 10_000 ,
128+ logger : new Logger ( "MarQSWorker" , "info" ) ,
129+ jobs : {
130+ processQueueForWorkerQueue : async ( job ) => {
131+ await this . #processQueueForWorkerQueue( job . payload . queueKey , job . payload . parentQueueKey ) ;
132+ } ,
133+ } ,
134+ } ) ;
135+
136+ if ( options . workerOptions ?. enabled ) {
137+ this . worker . start ( ) ;
138+ }
94139 }
95140
96141 get name ( ) {
@@ -280,6 +325,21 @@ export class MarQS {
280325 span . setAttribute ( "reserve_recursive_queue" , reserve . recursiveQueue ) ;
281326 }
282327
328+ if ( env . type !== "DEVELOPMENT" && this . options . eagerDequeuingEnabled ) {
329+ // This will move the message to the worker queue so it can be dequeued
330+ await this . worker . enqueueOnce ( {
331+ id : messageQueue , // dedupe by environment, queue, and concurrency key
332+ job : "processQueueForWorkerQueue" ,
333+ payload : {
334+ queueKey : messageQueue ,
335+ parentQueueKey : parentQueue ,
336+ } ,
337+ // Add a small delay to dedupe messages so at most one of these will processed,
338+ // every 500ms per queue, concurrency key, and environment
339+ availableAt : new Date ( Date . now ( ) + 500 ) , // 500ms from now
340+ } ) ;
341+ }
342+
283343 const result = await this . #callEnqueueMessage( messagePayload , reserve ) ;
284344
285345 if ( result ) {
@@ -870,6 +930,64 @@ export class MarQS {
870930 ) ;
871931 }
872932
933+ async #processQueueForWorkerQueue( queueKey : string , parentQueueKey : string ) {
934+ return this . #trace( "processQueueForWorkerQueue" , async ( span ) => {
935+ span . setAttributes ( {
936+ [ SemanticAttributes . QUEUE ] : queueKey ,
937+ [ SemanticAttributes . PARENT_QUEUE ] : parentQueueKey ,
938+ } ) ;
939+
940+ const maxCount = this . options . sharedWorkerQueueMaxMessageCount ?? 10 ;
941+
942+ const dequeuedMessages = await this . #callDequeueMessages( {
943+ messageQueue : queueKey ,
944+ parentQueue : parentQueueKey ,
945+ maxCount,
946+ } ) ;
947+
948+ if ( ! dequeuedMessages || dequeuedMessages . length === 0 ) {
949+ return ;
950+ }
951+
952+ await this . #trace(
953+ "addToWorkerQueue" ,
954+ async ( addToWorkerQueueSpan ) => {
955+ const workerQueueKey = this . keys . sharedWorkerQueueKey ( ) ;
956+
957+ addToWorkerQueueSpan . setAttributes ( {
958+ message_count : dequeuedMessages . length ,
959+ [ SemanticAttributes . PARENT_QUEUE ] : workerQueueKey ,
960+ } ) ;
961+
962+ await this . redis . rpush (
963+ workerQueueKey ,
964+ ...dequeuedMessages . map ( ( message ) => message . messageId )
965+ ) ;
966+ } ,
967+ {
968+ kind : SpanKind . INTERNAL ,
969+ attributes : {
970+ [ SEMATTRS_MESSAGING_OPERATION ] : "receive" ,
971+ [ SEMATTRS_MESSAGING_SYSTEM ] : "marqs" ,
972+ } ,
973+ }
974+ ) ;
975+
976+ // If we dequeued the max count, we need to enqueue another job to dequeue the next batch
977+ if ( dequeuedMessages . length === maxCount ) {
978+ await this . worker . enqueueOnce ( {
979+ id : queueKey ,
980+ job : "processQueueForWorkerQueue" ,
981+ payload : {
982+ queueKey,
983+ parentQueueKey,
984+ } ,
985+ availableAt : new Date ( Date . now ( ) + 500 ) , // 500ms from now
986+ } ) ;
987+ }
988+ } ) ;
989+ }
990+
873991 public async acknowledgeMessage ( messageId : string , reason : string = "unknown" ) {
874992 return this . #trace(
875993 "acknowledgeMessage" ,
@@ -901,6 +1019,20 @@ export class MarQS {
9011019 messageId,
9021020 } ) ;
9031021
1022+ const sharedQueueKey = this . keys . sharedQueueKey ( ) ;
1023+
1024+ if ( this . options . eagerDequeuingEnabled && message . parentQueue === sharedQueueKey ) {
1025+ await this . worker . enqueueOnce ( {
1026+ id : message . queue ,
1027+ job : "processQueueForWorkerQueue" ,
1028+ payload : {
1029+ queueKey : message . queue ,
1030+ parentQueueKey : message . parentQueue ,
1031+ } ,
1032+ availableAt : new Date ( Date . now ( ) + 500 ) , // 500ms from now
1033+ } ) ;
1034+ }
1035+
9041036 await this . options . subscriber ?. messageAcked ( message ) ;
9051037 } ,
9061038 {
@@ -2482,5 +2614,17 @@ function getMarQSClient() {
24822614 subscriber : concurrencyTracker ,
24832615 sharedWorkerQueueConsumerIntervalMs : env . MARQS_SHARED_WORKER_QUEUE_CONSUMER_INTERVAL_MS ,
24842616 sharedWorkerQueueMaxMessageCount : env . MARQS_SHARED_WORKER_QUEUE_MAX_MESSAGE_COUNT ,
2617+ eagerDequeuingEnabled : env . MARQS_SHARED_WORKER_QUEUE_EAGER_DEQUEUE_ENABLED === "1" ,
2618+ workerOptions : {
2619+ enabled : env . MARQS_WORKER_ENABLED === "1" ,
2620+ pollIntervalMs : env . MARQS_WORKER_POLL_INTERVAL_MS ,
2621+ immediatePollIntervalMs : env . MARQS_WORKER_IMMEDIATE_POLL_INTERVAL_MS ,
2622+ shutdownTimeoutMs : env . MARQS_WORKER_SHUTDOWN_TIMEOUT_MS ,
2623+ concurrency : {
2624+ workers : env . MARQS_WORKER_CONCURRENCY_LIMIT ,
2625+ tasksPerWorker : env . MARQS_WORKER_CONCURRENCY_TASKS_PER_WORKER ,
2626+ limit : env . MARQS_WORKER_CONCURRENCY_LIMIT ,
2627+ } ,
2628+ } ,
24852629 } ) ;
24862630}
0 commit comments