@@ -27,6 +27,7 @@ export type QueueItem<TMessageCatalog extends MessageCatalogSchema> = {
2727 visibilityTimeoutMs : number ;
2828 attempt : number ;
2929 timestamp : Date ;
30+ deduplicationKey ?: string ;
3031} ;
3132
3233export type AnyQueueItem = {
@@ -36,6 +37,7 @@ export type AnyQueueItem = {
3637 visibilityTimeoutMs : number ;
3738 attempt : number ;
3839 timestamp : Date ;
40+ deduplicationKey ?: string ;
3941} ;
4042
4143export class SimpleQueue < TMessageCatalog extends MessageCatalogSchema > {
@@ -98,11 +100,13 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
98100 } ) : Promise < void > {
99101 try {
100102 const score = availableAt ? availableAt . getTime ( ) : Date . now ( ) ;
103+ const deduplicationKey = nanoid ( ) ;
101104 const serializedItem = JSON . stringify ( {
102105 job,
103106 item,
104107 visibilityTimeoutMs,
105108 attempt,
109+ deduplicationKey,
106110 } ) ;
107111
108112 const result = await this . redis . enqueueItem (
@@ -136,7 +140,7 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
136140 return [ ] ;
137141 }
138142
139- const dequeuedItems = [ ] ;
143+ const dequeuedItems : Array < QueueItem < TMessageCatalog > > = [ ] ;
140144
141145 for ( const [ id , serializedItem , score ] of results ) {
142146 const parsedItem = JSON . parse ( serializedItem ) as any ;
@@ -186,6 +190,7 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
186190 visibilityTimeoutMs,
187191 attempt : parsedItem . attempt ?? 0 ,
188192 timestamp,
193+ deduplicationKey : parsedItem . deduplicationKey ,
189194 } ) ;
190195 }
191196
@@ -200,14 +205,22 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
200205 }
201206 }
202207
203- async ack ( id : string ) : Promise < void > {
208+ async ack ( id : string , deduplicationKey ?: string ) : Promise < void > {
204209 try {
205- await this . redis . ackItem ( `queue` , `items` , id ) ;
210+ const result = await this . redis . ackItem ( `queue` , `items` , id , deduplicationKey ?? "" ) ;
211+ if ( result === 0 ) {
212+ this . logger . error ( `SimpleQueue ${ this . name } .ack(): ack operation returned 0` , {
213+ queue : this . name ,
214+ id,
215+ deduplicationKey,
216+ } ) ;
217+ }
206218 } catch ( e ) {
207219 this . logger . error ( `SimpleQueue ${ this . name } .ack(): error acknowledging item` , {
208220 queue : this . name ,
209221 error : e ,
210222 id,
223+ deduplicationKey,
211224 } ) ;
212225 throw e ;
213226 }
@@ -367,15 +380,32 @@ export class SimpleQueue<TMessageCatalog extends MessageCatalogSchema> {
367380 this . redis . defineCommand ( "ackItem" , {
368381 numberOfKeys : 2 ,
369382 lua : `
370- local queue = KEYS[1]
371- local items = KEYS[2]
383+ local queueKey = KEYS[1]
384+ local itemsKey = KEYS[2]
372385 local id = ARGV[1]
386+ local deduplicationKey = ARGV[2]
373387
374- redis.call('ZREM', queue, id)
375- redis.call('HDEL', items, id)
388+ -- Get the item from the hash
389+ local item = redis.call('HGET', itemsKey, id)
390+ if not item then
391+ return -1
392+ end
376393
394+ -- Only check deduplicationKey if a non-empty one was passed in
395+ if deduplicationKey and deduplicationKey ~= "" then
396+ local success, parsed = pcall(cjson.decode, item)
397+ if success then
398+ if parsed.deduplicationKey and parsed.deduplicationKey ~= deduplicationKey then
399+ return 0
400+ end
401+ end
402+ end
403+
404+ -- Remove from sorted set and hash
405+ redis.call('ZREM', queueKey, id)
406+ redis.call('HDEL', itemsKey, id)
377407 return 1
378- ` ,
408+ ` ,
379409 } ) ;
380410
381411 this . redis . defineCommand ( "moveToDeadLetterQueue" , {
@@ -468,6 +498,7 @@ declare module "@internal/redis" {
468498 queue : string ,
469499 items : string ,
470500 id : string ,
501+ deduplicationKey : string ,
471502 callback ?: Callback < number >
472503 ) : Result < number , Context > ;
473504
0 commit comments