@@ -15,7 +15,7 @@ export type ReleaseConcurrencyQueueRetryOptions = {
1515
1616export type ReleaseConcurrencyQueueOptions < T > = {
1717 redis : RedisOptions ;
18- executor : ( releaseQueue : T , runId : string ) => Promise < void > ;
18+ executor : ( releaseQueue : T , releaserId : string ) => Promise < void > ;
1919 keys : {
2020 fromDescriptor : ( releaseQueue : T ) => string ;
2121 toDescriptor : ( releaseQueue : string ) => T ;
@@ -89,7 +89,7 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
8989 * If there is no token available, then we'll add the operation to a queue
9090 * and wait until the token is available.
9191 */
92- public async attemptToRelease ( releaseQueueDescriptor : T , runId : string ) {
92+ public async attemptToRelease ( releaseQueueDescriptor : T , releaserId : string ) {
9393 const maxTokens = await this . #callMaxTokens( releaseQueueDescriptor ) ;
9494
9595 if ( maxTokens === 0 ) {
@@ -104,13 +104,13 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
104104 this . #queueKey( releaseQueue ) ,
105105 this . #metadataKey( releaseQueue ) ,
106106 releaseQueue ,
107- runId ,
107+ releaserId ,
108108 String ( maxTokens ) ,
109109 String ( Date . now ( ) )
110110 ) ;
111111
112112 if ( ! ! result ) {
113- await this . #callExecutor( releaseQueueDescriptor , runId , {
113+ await this . #callExecutor( releaseQueueDescriptor , releaserId , {
114114 retryCount : 0 ,
115115 lastAttempt : Date . now ( ) ,
116116 } ) ;
@@ -161,28 +161,28 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
161161 }
162162
163163 await Promise . all (
164- result . map ( ( [ queue , runId , metadata ] ) => {
164+ result . map ( ( [ queue , releaserId , metadata ] ) => {
165165 const itemMetadata = QueueItemMetadata . parse ( JSON . parse ( metadata ) ) ;
166166 const releaseQueueDescriptor = this . keys . toDescriptor ( queue ) ;
167- return this . #callExecutor( releaseQueueDescriptor , runId , itemMetadata ) ;
167+ return this . #callExecutor( releaseQueueDescriptor , releaserId , itemMetadata ) ;
168168 } )
169169 ) ;
170170
171171 return true ;
172172 }
173173
174- async #callExecutor( releaseQueueDescriptor : T , runId : string , metadata : QueueItemMetadata ) {
174+ async #callExecutor( releaseQueueDescriptor : T , releaserId : string , metadata : QueueItemMetadata ) {
175175 try {
176- this . logger . info ( "Executing run:" , { releaseQueueDescriptor, runId } ) ;
176+ this . logger . info ( "Executing run:" , { releaseQueueDescriptor, releaserId } ) ;
177177
178- await this . options . executor ( releaseQueueDescriptor , runId ) ;
178+ await this . options . executor ( releaseQueueDescriptor , releaserId ) ;
179179 } catch ( error ) {
180180 this . logger . error ( "Error executing run:" , { error } ) ;
181181
182182 if ( metadata . retryCount >= this . maxRetries ) {
183183 this . logger . error ( "Max retries reached:" , {
184184 releaseQueueDescriptor,
185- runId ,
185+ releaserId ,
186186 retryCount : metadata . retryCount ,
187187 } ) ;
188188
@@ -194,10 +194,10 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
194194 this . #queueKey( releaseQueue ) ,
195195 this . #metadataKey( releaseQueue ) ,
196196 releaseQueue ,
197- runId
197+ releaserId
198198 ) ;
199199
200- this . logger . info ( "Returned token:" , { releaseQueueDescriptor, runId } ) ;
200+ this . logger . info ( "Returned token:" , { releaseQueueDescriptor, releaserId } ) ;
201201
202202 return ;
203203 }
@@ -216,7 +216,7 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
216216 this . #queueKey( releaseQueue ) ,
217217 this . #metadataKey( releaseQueue ) ,
218218 releaseQueue ,
219- runId ,
219+ releaserId ,
220220 JSON . stringify ( updatedMetadata ) ,
221221 this . #calculateBackoffScore( updatedMetadata )
222222 ) ;
@@ -282,7 +282,7 @@ local queueKey = KEYS[3]
282282local metadataKey = KEYS[4]
283283
284284local releaseQueue = ARGV[1]
285- local runId = ARGV[2]
285+ local releaserId = ARGV[2]
286286local maxTokens = tonumber(ARGV[3])
287287local score = ARGV[4]
288288
@@ -292,10 +292,10 @@ local currentTokens = tonumber(redis.call("GET", bucketKey) or maxTokens)
292292-- If we have enough tokens, then consume them
293293if currentTokens >= 1 then
294294 redis.call("SET", bucketKey, currentTokens - 1)
295- redis.call("ZREM", queueKey, runId )
295+ redis.call("ZREM", queueKey, releaserId )
296296
297297 -- Clean up metadata when successfully consuming
298- redis.call("HDEL", metadataKey, runId )
298+ redis.call("HDEL", metadataKey, releaserId )
299299
300300 -- Get queue length after removing the item
301301 local queueLength = redis.call("ZCARD", queueKey)
@@ -311,14 +311,14 @@ if currentTokens >= 1 then
311311end
312312
313313-- If we don't have enough tokens, then we need to add the operation to the queue
314- redis.call("ZADD", queueKey, score, runId )
314+ redis.call("ZADD", queueKey, score, releaserId )
315315
316316-- Initialize or update metadata
317317local metadata = cjson.encode({
318318 retryCount = 0,
319319 lastAttempt = tonumber(score)
320320})
321- redis.call("HSET", metadataKey, runId , metadata)
321+ redis.call("HSET", metadataKey, releaserId , metadata)
322322
323323-- Remove from the master queue
324324redis.call("ZREM", masterQueuesKey, releaseQueue)
@@ -400,14 +400,14 @@ redis.call("SET", bucketKey, currentTokens - itemsToProcess)
400400
401401-- Remove the items from the queue and add to results
402402for i = 1, itemsToProcess do
403- local runId = items[i]
404- redis.call("ZREM", queueKey, runId )
403+ local releaserId = items[i]
404+ redis.call("ZREM", queueKey, releaserId )
405405
406406 -- Get metadata before removing it
407- local metadata = redis.call("HGET", metadataKey, runId )
408- redis.call("HDEL", metadataKey, runId )
407+ local metadata = redis.call("HGET", metadataKey, releaserId )
408+ redis.call("HDEL", metadataKey, releaserId )
409409
410- table.insert(results, { queueName, runId , metadata })
410+ table.insert(results, { queueName, releaserId , metadata })
411411end
412412
413413-- Get remaining queue length
@@ -434,7 +434,7 @@ local queueKey = KEYS[3]
434434local metadataKey = KEYS[4]
435435
436436local releaseQueue = ARGV[1]
437- local runId = ARGV[2]
437+ local releaserId = ARGV[2]
438438local metadata = ARGV[3]
439439local score = ARGV[4]
440440
@@ -444,10 +444,10 @@ local remainingTokens = currentTokens + 1
444444redis.call("SET", bucketKey, remainingTokens)
445445
446446-- Add the item back to the queue
447- redis.call("ZADD", queueKey, score, runId )
447+ redis.call("ZADD", queueKey, score, releaserId )
448448
449449-- Add the metadata back to the item
450- redis.call("HSET", metadataKey, runId , metadata)
450+ redis.call("HSET", metadataKey, releaserId , metadata)
451451
452452-- Update the master queue
453453local queueLength = redis.call("ZCARD", queueKey)
@@ -470,15 +470,15 @@ local queueKey = KEYS[3]
470470local metadataKey = KEYS[4]
471471
472472local releaseQueue = ARGV[1]
473- local runId = ARGV[2]
473+ local releaserId = ARGV[2]
474474
475475-- Return the token to the bucket
476476local currentTokens = tonumber(redis.call("GET", bucketKey))
477477local remainingTokens = currentTokens + 1
478478redis.call("SET", bucketKey, remainingTokens)
479479
480480-- Clean up metadata
481- redis.call("HDEL", metadataKey, runId )
481+ redis.call("HDEL", metadataKey, releaserId )
482482
483483-- Update the master queue based on remaining queue length
484484local queueLength = redis.call("ZCARD", queueKey)
@@ -502,7 +502,7 @@ declare module "@internal/redis" {
502502 queueKey : string ,
503503 metadataKey : string ,
504504 releaseQueue : string ,
505- runId : string ,
505+ releaserId : string ,
506506 maxTokens : string ,
507507 score : string ,
508508 callback ?: Callback < string >
@@ -532,7 +532,7 @@ declare module "@internal/redis" {
532532 queueKey : string ,
533533 metadataKey : string ,
534534 releaseQueue : string ,
535- runId : string ,
535+ releaserId : string ,
536536 metadata : string ,
537537 score : string ,
538538 callback ?: Callback < void >
@@ -544,7 +544,7 @@ declare module "@internal/redis" {
544544 queueKey : string ,
545545 metadataKey : string ,
546546 releaseQueue : string ,
547- runId : string ,
547+ releaserId : string ,
548548 callback ?: Callback < void >
549549 ) : Result < void , Context > ;
550550 }
0 commit comments