88 trace ,
99 Tracer ,
1010} from "@opentelemetry/api" ;
11+ import { type RedisOptions } from "@internal/redis" ;
1112import {
1213 SEMATTRS_MESSAGE_ID ,
1314 SEMATTRS_MESSAGING_SYSTEM ,
@@ -44,6 +45,9 @@ import {
4445} from "./constants.server" ;
4546import { setInterval } from "node:timers/promises" ;
4647import { tryCatch } from "@trigger.dev/core/utils" ;
48+ import { Worker , type WorkerConcurrencyOptions } from "@trigger.dev/redis-worker" ;
49+ import z from "zod" ;
50+ import { Logger } from "@trigger.dev/core/logger" ;
4751
4852const KEY_PREFIX = "marqs:" ;
4953
@@ -74,6 +78,25 @@ export type MarQSOptions = {
7478 subscriber ?: MessageQueueSubscriber ;
7579 sharedWorkerQueueConsumerIntervalMs ?: number ;
7680 sharedWorkerQueueMaxMessageCount ?: number ;
81+ eagerDequeuingEnabled ?: boolean ;
82+ workerOptions : {
83+ pollIntervalMs ?: number ;
84+ immediatePollIntervalMs ?: number ;
85+ shutdownTimeoutMs ?: number ;
86+ concurrency ?: WorkerConcurrencyOptions ;
87+ enabled ?: boolean ;
88+ redisOptions : RedisOptions ;
89+ } ;
90+ } ;
91+
92+ const workerCatalog = {
93+ processQueueForWorkerQueue : {
94+ schema : z . object ( {
95+ queueKey : z . string ( ) ,
96+ parentQueueKey : z . string ( ) ,
97+ } ) ,
98+ visibilityTimeoutMs : 30_000 ,
99+ } ,
77100} ;
78101
79102/**
@@ -83,6 +106,7 @@ export class MarQS {
83106 private redis : Redis ;
84107 public keys : MarQSKeyProducer ;
85108 #rebalanceWorkers: Array < AsyncWorker > = [ ] ;
109+ private worker : Worker < typeof workerCatalog > ;
86110
87111 constructor ( private readonly options : MarQSOptions ) {
88112 this . redis = options . redis ;
@@ -91,6 +115,26 @@ export class MarQS {
91115
92116 this . #startRebalanceWorkers( ) ;
93117 this . #registerCommands( ) ;
118+
119+ this . worker = new Worker ( {
120+ name : "marqs-worker" ,
121+ redisOptions : options . workerOptions . redisOptions ,
122+ catalog : workerCatalog ,
123+ concurrency : options . workerOptions ?. concurrency ,
124+ pollIntervalMs : options . workerOptions ?. pollIntervalMs ?? 1000 ,
125+ immediatePollIntervalMs : options . workerOptions ?. immediatePollIntervalMs ?? 100 ,
126+ shutdownTimeoutMs : options . workerOptions ?. shutdownTimeoutMs ?? 10_000 ,
127+ logger : new Logger ( "MarQSWorker" , "info" ) ,
128+ jobs : {
129+ processQueueForWorkerQueue : async ( job ) => {
130+ await this . #processQueueForWorkerQueue( job . payload . queueKey , job . payload . parentQueueKey ) ;
131+ } ,
132+ } ,
133+ } ) ;
134+
135+ if ( options . workerOptions ?. enabled ) {
136+ this . worker . start ( ) ;
137+ }
94138 }
95139
96140 get name ( ) {
@@ -280,6 +324,21 @@ export class MarQS {
280324 span . setAttribute ( "reserve_recursive_queue" , reserve . recursiveQueue ) ;
281325 }
282326
327+ if ( env . type !== "DEVELOPMENT" && this . options . eagerDequeuingEnabled ) {
328+ // This will move the message to the worker queue so it can be dequeued
329+ await this . worker . enqueueOnce ( {
330+ id : messageQueue , // dedupe by environment, queue, and concurrency key
331+ job : "processQueueForWorkerQueue" ,
332+ payload : {
333+ queueKey : messageQueue ,
334+ parentQueueKey : parentQueue ,
335+ } ,
336+ // Add a small delay to dedupe messages so at most one of these will processed,
337+ // every 500ms per queue, concurrency key, and environment
338+ availableAt : new Date ( Date . now ( ) + 500 ) , // 500ms from now
339+ } ) ;
340+ }
341+
283342 const result = await this . #callEnqueueMessage( messagePayload , reserve ) ;
284343
285344 if ( result ) {
@@ -870,6 +929,64 @@ export class MarQS {
870929 ) ;
871930 }
872931
932+ async #processQueueForWorkerQueue( queueKey : string , parentQueueKey : string ) {
933+ return this . #trace( "processQueueForWorkerQueue" , async ( span ) => {
934+ span . setAttributes ( {
935+ [ SemanticAttributes . QUEUE ] : queueKey ,
936+ [ SemanticAttributes . PARENT_QUEUE ] : parentQueueKey ,
937+ } ) ;
938+
939+ const maxCount = this . options . sharedWorkerQueueMaxMessageCount ?? 10 ;
940+
941+ const dequeuedMessages = await this . #callDequeueMessages( {
942+ messageQueue : queueKey ,
943+ parentQueue : parentQueueKey ,
944+ maxCount,
945+ } ) ;
946+
947+ if ( ! dequeuedMessages || dequeuedMessages . length === 0 ) {
948+ return ;
949+ }
950+
951+ await this . #trace(
952+ "addToWorkerQueue" ,
953+ async ( addToWorkerQueueSpan ) => {
954+ const workerQueueKey = this . keys . sharedWorkerQueueKey ( ) ;
955+
956+ addToWorkerQueueSpan . setAttributes ( {
957+ message_count : dequeuedMessages . length ,
958+ [ SemanticAttributes . PARENT_QUEUE ] : workerQueueKey ,
959+ } ) ;
960+
961+ await this . redis . rpush (
962+ workerQueueKey ,
963+ ...dequeuedMessages . map ( ( message ) => message . messageId )
964+ ) ;
965+ } ,
966+ {
967+ kind : SpanKind . INTERNAL ,
968+ attributes : {
969+ [ SEMATTRS_MESSAGING_OPERATION ] : "receive" ,
970+ [ SEMATTRS_MESSAGING_SYSTEM ] : "marqs" ,
971+ } ,
972+ }
973+ ) ;
974+
975+ // If we dequeued the max count, we need to enqueue another job to dequeue the next batch
976+ if ( dequeuedMessages . length === maxCount ) {
977+ await this . worker . enqueueOnce ( {
978+ id : queueKey ,
979+ job : "processQueueForWorkerQueue" ,
980+ payload : {
981+ queueKey,
982+ parentQueueKey,
983+ } ,
984+ availableAt : new Date ( Date . now ( ) + 500 ) , // 500ms from now
985+ } ) ;
986+ }
987+ } ) ;
988+ }
989+
873990 public async acknowledgeMessage ( messageId : string , reason : string = "unknown" ) {
874991 return this . #trace(
875992 "acknowledgeMessage" ,
@@ -901,6 +1018,20 @@ export class MarQS {
9011018 messageId,
9021019 } ) ;
9031020
1021+ const sharedQueueKey = this . keys . sharedQueueKey ( ) ;
1022+
1023+ if ( this . options . eagerDequeuingEnabled && message . parentQueue === sharedQueueKey ) {
1024+ await this . worker . enqueueOnce ( {
1025+ id : message . queue ,
1026+ job : "processQueueForWorkerQueue" ,
1027+ payload : {
1028+ queueKey : message . queue ,
1029+ parentQueueKey : message . parentQueue ,
1030+ } ,
1031+ availableAt : new Date ( Date . now ( ) + 500 ) , // 500ms from now
1032+ } ) ;
1033+ }
1034+
9041035 await this . options . subscriber ?. messageAcked ( message ) ;
9051036 } ,
9061037 {
@@ -2482,5 +2613,26 @@ function getMarQSClient() {
24822613 subscriber : concurrencyTracker ,
24832614 sharedWorkerQueueConsumerIntervalMs : env . MARQS_SHARED_WORKER_QUEUE_CONSUMER_INTERVAL_MS ,
24842615 sharedWorkerQueueMaxMessageCount : env . MARQS_SHARED_WORKER_QUEUE_MAX_MESSAGE_COUNT ,
2616+ eagerDequeuingEnabled : env . MARQS_SHARED_WORKER_QUEUE_EAGER_DEQUEUE_ENABLED === "1" ,
2617+ workerOptions : {
2618+ enabled : env . MARQS_WORKER_ENABLED === "1" ,
2619+ pollIntervalMs : env . MARQS_WORKER_POLL_INTERVAL_MS ,
2620+ immediatePollIntervalMs : env . MARQS_WORKER_IMMEDIATE_POLL_INTERVAL_MS ,
2621+ shutdownTimeoutMs : env . MARQS_WORKER_SHUTDOWN_TIMEOUT_MS ,
2622+ concurrency : {
2623+ workers : env . MARQS_WORKER_COUNT ,
2624+ tasksPerWorker : env . MARQS_WORKER_CONCURRENCY_TASKS_PER_WORKER ,
2625+ limit : env . MARQS_WORKER_CONCURRENCY_LIMIT ,
2626+ } ,
2627+ redisOptions : {
2628+ keyPrefix : KEY_PREFIX ,
2629+ port : env . REDIS_PORT ?? undefined ,
2630+ host : env . REDIS_HOST ?? undefined ,
2631+ username : env . REDIS_USERNAME ?? undefined ,
2632+ password : env . REDIS_PASSWORD ?? undefined ,
2633+ enableAutoPipelining : true ,
2634+ ...( env . REDIS_TLS_DISABLED === "true" ? { } : { tls : { } } ) ,
2635+ } ,
2636+ } ,
24852637 } ) ;
24862638}
0 commit comments