@@ -28,6 +28,7 @@ type JobHandler<Catalog extends WorkerCatalog, K extends keyof Catalog> = (param
2828 payload : z . infer < Catalog [ K ] [ "schema" ] > ;
2929 visibilityTimeoutMs : number ;
3030 attempt : number ;
31+ deduplicationKey ?: string ;
3132} ) => Promise < void > ;
3233
3334export type WorkerConcurrencyOptions = {
@@ -345,7 +346,7 @@ class Worker<TCatalog extends WorkerCatalog> {
345346 * Processes a single item.
346347 */
347348 private async processItem (
348- { id, job, item, visibilityTimeoutMs, attempt, timestamp } : AnyQueueItem ,
349+ { id, job, item, visibilityTimeoutMs, attempt, timestamp, deduplicationKey } : AnyQueueItem ,
349350 batchSize : number ,
350351 workerId : string
351352 ) : Promise < void > {
@@ -362,7 +363,7 @@ class Worker<TCatalog extends WorkerCatalog> {
362363 async ( ) => {
363364 await this . withHistogram (
364365 this . metrics . jobDuration ,
365- handler ( { id, payload : item , visibilityTimeoutMs, attempt } ) ,
366+ handler ( { id, payload : item , visibilityTimeoutMs, attempt, deduplicationKey } ) ,
366367 {
367368 worker_id : workerId ,
368369 batch_size : batchSize ,
@@ -372,7 +373,7 @@ class Worker<TCatalog extends WorkerCatalog> {
372373 ) ;
373374
374375 // On success, acknowledge the item.
375- await this . queue . ack ( id ) ;
376+ await this . queue . ack ( id , deduplicationKey ) ;
376377 } ,
377378 {
378379 kind : SpanKind . CONSUMER ,
0 commit comments