11import { Callback , createRedisClient , Redis , Result , type RedisOptions } from "@internal/redis" ;
2- import { Tracer } from "@internal/tracing" ;
2+ import { startSpan , Tracer } from "@internal/tracing" ;
33import { Logger } from "@trigger.dev/core/logger" ;
4- import { setInterval } from "node:timers/promises" ;
54import { z } from "zod" ;
5+ import { setInterval } from "node:timers/promises" ;
6+ import { flattenAttributes } from "@trigger.dev/core/v3" ;
67
78export type ReleaseConcurrencyQueueRetryOptions = {
89 maxRetries ?: number ;
@@ -81,6 +82,7 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
8182
8283 if ( ! options . disableConsumers ) {
8384 this . #startConsumers( ) ;
85+ this . #startMetricsProducer( ) ;
8486 }
8587 }
8688
@@ -397,6 +399,30 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
397399 }
398400 }
399401
402+ async #startMetricsProducer( ) {
403+ try {
404+ // Produce metrics every 60 seconds, using a tracer span
405+ for await ( const _ of setInterval ( 60_000 ) ) {
406+ const metrics = await this . getQueueMetrics ( ) ;
407+ this . logger . info ( "Queue metrics:" , { metrics } ) ;
408+
409+ await startSpan (
410+ this . options . tracer ,
411+ "ReleaseConcurrencyTokenBucketQueue.metrics" ,
412+ async ( span ) => { } ,
413+ {
414+ attributes : {
415+ ...flattenAttributes ( metrics , "queues" ) ,
416+ forceRecording : true ,
417+ } ,
418+ }
419+ ) ;
420+ }
421+ } catch ( error ) {
422+ this . logger . error ( "Error starting metrics producer:" , { error } ) ;
423+ }
424+ }
425+
400426 #calculateBackoffScore( item : QueueItemMetadata ) : string {
401427 const delay = Math . min (
402428 this . backoff . maxDelay ,
@@ -405,6 +431,137 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
405431 return String ( Date . now ( ) + delay ) ;
406432 }
407433
434+ async getQueueMetrics ( ) : Promise <
435+ Array < { releaseQueue : string ; currentTokens : number ; queueLength : number } >
436+ > {
437+ const streamRedis = this . redis . duplicate ( ) ;
438+ const queuePattern = `${ this . keyPrefix } *:queue` ;
439+ const stream = streamRedis . scanStream ( {
440+ match : queuePattern ,
441+ type : "zset" ,
442+ count : 100 ,
443+ } ) ;
444+
445+ let resolvePromise : (
446+ value : Array < { releaseQueue : string ; currentTokens : number ; queueLength : number } >
447+ ) => void ;
448+ let rejectPromise : ( reason ?: any ) => void ;
449+
450+ const promise = new Promise <
451+ Array < { releaseQueue : string ; currentTokens : number ; queueLength : number } >
452+ > ( ( resolve , reject ) => {
453+ resolvePromise = resolve ;
454+ rejectPromise = reject ;
455+ } ) ;
456+
457+ const metrics : Map <
458+ string ,
459+ { releaseQueue : string ; currentTokens : number ; queueLength : number }
460+ > = new Map ( ) ;
461+
462+ async function getMetricsForKeys ( queueKeys : string [ ] ) {
463+ if ( queueKeys . length === 0 ) {
464+ return [ ] ;
465+ }
466+
467+ const pipeline = streamRedis . pipeline ( ) ;
468+
469+ queueKeys . forEach ( ( queueKey ) => {
470+ const releaseQueue = queueKey
471+ . replace ( ":queue" , "" )
472+ . replace ( streamRedis . options . keyPrefix ?? "" , "" ) ;
473+ const bucketKey = `${ releaseQueue } :bucket` ;
474+
475+ pipeline . get ( bucketKey ) ;
476+ pipeline . zcard ( `${ releaseQueue } :queue` ) ;
477+ } ) ;
478+
479+ const result = await pipeline . exec ( ) ;
480+
481+ if ( ! result ) {
482+ return [ ] ;
483+ }
484+
485+ const results = result . map ( ( [ resultError , queueLengthOrCurrentTokens ] ) => {
486+ if ( resultError ) {
487+ return null ;
488+ }
489+
490+ return queueLengthOrCurrentTokens ? Number ( queueLengthOrCurrentTokens ) : 0 ;
491+ } ) ;
492+
493+ // Now zip the results with the queue keys
494+ const zippedResults = queueKeys . map ( ( queueKey , index ) => {
495+ const releaseQueue = queueKey
496+ . replace ( ":queue" , "" )
497+ . replace ( streamRedis . options . keyPrefix ?? "" , "" ) ;
498+
499+ // Current tokens are at indexes 0, 2, 4, 6, etc.
500+ // Queue length are at indexes 1, 3, 5, 7, etc.
501+
502+ const currentTokens = results [ index * 2 ] ;
503+ const queueLength = results [ index * 2 + 1 ] ;
504+
505+ if ( typeof currentTokens !== "number" || typeof queueLength !== "number" ) {
506+ return null ;
507+ }
508+
509+ return {
510+ releaseQueue,
511+ currentTokens : currentTokens ,
512+ queueLength : queueLength ,
513+ } ;
514+ } ) ;
515+
516+ return zippedResults . filter ( ( result ) => result !== null ) ;
517+ }
518+
519+ stream . on ( "end" , ( ) => {
520+ streamRedis . quit ( ) ;
521+ resolvePromise ( Array . from ( metrics . values ( ) ) ) ;
522+ } ) ;
523+
524+ stream . on ( "error" , ( error ) => {
525+ this . logger . error ( "Error getting queue metrics:" , { error } ) ;
526+
527+ stream . pause ( ) ;
528+ streamRedis . quit ( ) ;
529+ rejectPromise ( error ) ;
530+ } ) ;
531+
532+ stream . on ( "data" , async ( keys ) => {
533+ stream . pause ( ) ;
534+
535+ const uniqueKeys = Array . from ( new Set < string > ( keys ) ) ;
536+
537+ if ( uniqueKeys . length === 0 ) {
538+ stream . resume ( ) ;
539+ return ;
540+ }
541+
542+ const unresolvedKeys = uniqueKeys . filter ( ( key ) => ! metrics . has ( key ) ) ;
543+
544+ if ( unresolvedKeys . length === 0 ) {
545+ stream . resume ( ) ;
546+ return ;
547+ }
548+
549+ this . logger . debug ( "Fetching queue metrics for keys" , { keys : uniqueKeys } ) ;
550+
551+ await getMetricsForKeys ( unresolvedKeys ) . then ( ( results ) => {
552+ results . forEach ( ( result ) => {
553+ if ( result ) {
554+ metrics . set ( result . releaseQueue , result ) ;
555+ }
556+ } ) ;
557+
558+ stream . resume ( ) ;
559+ } ) ;
560+ } ) ;
561+
562+ return promise ;
563+ }
564+
408565 #registerCommands( ) {
409566 this . redis . defineCommand ( "consumeToken" , {
410567 numberOfKeys : 4 ,
@@ -424,7 +581,7 @@ local currentTokens = tonumber(redis.call("GET", bucketKey) or maxTokens)
424581
425582-- If we have enough tokens, then consume them
426583if currentTokens >= 1 then
427- newCurrentTokens = currentTokens - 1
584+ local newCurrentTokens = currentTokens - 1
428585
429586 redis.call("SET", bucketKey, newCurrentTokens)
430587 redis.call("ZREM", queueKey, releaserId)
0 commit comments