diff --git a/packages/redis-worker/src/queue.ts b/packages/redis-worker/src/queue.ts index 697b82ff5c..975c484dc2 100644 --- a/packages/redis-worker/src/queue.ts +++ b/packages/redis-worker/src/queue.ts @@ -27,6 +27,7 @@ export type QueueItem = { visibilityTimeoutMs: number; attempt: number; timestamp: Date; + deduplicationKey?: string; }; export type AnyQueueItem = { @@ -36,6 +37,7 @@ export type AnyQueueItem = { visibilityTimeoutMs: number; attempt: number; timestamp: Date; + deduplicationKey?: string; }; export class SimpleQueue { @@ -98,11 +100,13 @@ export class SimpleQueue { }): Promise { try { const score = availableAt ? availableAt.getTime() : Date.now(); + const deduplicationKey = nanoid(); const serializedItem = JSON.stringify({ job, item, visibilityTimeoutMs, attempt, + deduplicationKey, }); const result = await this.redis.enqueueItem( @@ -136,7 +140,7 @@ export class SimpleQueue { return []; } - const dequeuedItems = []; + const dequeuedItems: Array> = []; for (const [id, serializedItem, score] of results) { const parsedItem = JSON.parse(serializedItem) as any; @@ -186,6 +190,7 @@ export class SimpleQueue { visibilityTimeoutMs, attempt: parsedItem.attempt ?? 0, timestamp, + deduplicationKey: parsedItem.deduplicationKey, }); } @@ -200,14 +205,26 @@ export class SimpleQueue { } } - async ack(id: string): Promise { + async ack(id: string, deduplicationKey?: string): Promise { try { - await this.redis.ackItem(`queue`, `items`, id); + const result = await this.redis.ackItem(`queue`, `items`, id, deduplicationKey ?? ""); + if (result !== 1) { + this.logger.debug( + `SimpleQueue ${this.name}.ack(): ack operation returned ${result}. This means it was not removed from the queue.`, + { + queue: this.name, + id, + deduplicationKey, + result, + } + ); + } } catch (e) { this.logger.error(`SimpleQueue ${this.name}.ack(): error acknowledging item`, { queue: this.name, error: e, id, + deduplicationKey, }); throw e; } @@ -367,15 +384,32 @@ export class SimpleQueue { this.redis.defineCommand("ackItem", { numberOfKeys: 2, lua: ` - local queue = KEYS[1] - local items = KEYS[2] + local queueKey = KEYS[1] + local itemsKey = KEYS[2] local id = ARGV[1] + local deduplicationKey = ARGV[2] - redis.call('ZREM', queue, id) - redis.call('HDEL', items, id) + -- Get the item from the hash + local item = redis.call('HGET', itemsKey, id) + if not item then + return -1 + end + -- Only check deduplicationKey if a non-empty one was passed in + if deduplicationKey and deduplicationKey ~= "" then + local success, parsed = pcall(cjson.decode, item) + if success then + if parsed.deduplicationKey and parsed.deduplicationKey ~= deduplicationKey then + return 0 + end + end + end + + -- Remove from sorted set and hash + redis.call('ZREM', queueKey, id) + redis.call('HDEL', itemsKey, id) return 1 - `, + `, }); this.redis.defineCommand("moveToDeadLetterQueue", { @@ -468,6 +502,7 @@ declare module "@internal/redis" { queue: string, items: string, id: string, + deduplicationKey: string, callback?: Callback ): Result; diff --git a/packages/redis-worker/src/worker.test.ts b/packages/redis-worker/src/worker.test.ts index 1768f39107..8b604be8ae 100644 --- a/packages/redis-worker/src/worker.test.ts +++ b/packages/redis-worker/src/worker.test.ts @@ -250,4 +250,302 @@ describe("Worker", () => { await redisClient.quit(); } ); + + redisTest( + "Should process a job with the same ID only once when rescheduled", + { timeout: 30_000 }, + async ({ redisContainer }) => { + const processedPayloads: string[] = []; + + const worker = new Worker({ + name: "test-worker", + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + catalog: { + testJob: { + schema: z.object({ value: z.string() }), + visibilityTimeoutMs: 5000, + retry: { maxAttempts: 3 }, + }, + }, + jobs: { + testJob: async ({ payload }) => { + await new Promise((resolve) => setTimeout(resolve, 30)); // Simulate work + processedPayloads.push(payload.value); + }, + }, + concurrency: { + workers: 1, + tasksPerWorker: 1, + }, + pollIntervalMs: 10, // Ensure quick polling to detect the scheduled item + logger: new Logger("test", "log"), + }).start(); + + // Unique ID to use for both enqueues + const testJobId = "duplicate-job-id"; + + // Enqueue the first item immediately + await worker.enqueue({ + id: testJobId, + job: "testJob", + payload: { value: "first-attempt" }, + availableAt: new Date(Date.now() + 50), + }); + + // Enqueue another item with the same ID but scheduled 50ms in the future + await worker.enqueue({ + id: testJobId, + job: "testJob", + payload: { value: "second-attempt" }, + availableAt: new Date(Date.now() + 50), + }); + + // Wait enough time for both jobs to be processed if they were going to be + await new Promise((resolve) => setTimeout(resolve, 300)); + + // Verify that only one job was processed (the second one should have replaced the first) + expect(processedPayloads.length).toBe(1); + + // Verify that the second job's payload was the one processed + expect(processedPayloads[0]).toBe("second-attempt"); + + await worker.stop(); + } + ); + + redisTest( + "Should process second job with same ID when enqueued during first job execution with future availableAt", + { timeout: 30_000 }, + async ({ redisContainer }) => { + const processedPayloads: string[] = []; + const jobStarted: string[] = []; + let firstJobCompleted = false; + + const worker = new Worker({ + name: "test-worker", + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + catalog: { + testJob: { + schema: z.object({ value: z.string() }), + visibilityTimeoutMs: 5000, + retry: { maxAttempts: 3 }, + }, + }, + jobs: { + testJob: async ({ payload }) => { + // Record when the job starts processing + jobStarted.push(payload.value); + + if (payload.value === "first-attempt") { + // First job takes a long time to process + await new Promise((resolve) => setTimeout(resolve, 1_000)); + firstJobCompleted = true; + } + + // Record when the job completes + processedPayloads.push(payload.value); + }, + }, + concurrency: { + workers: 1, + tasksPerWorker: 1, + }, + pollIntervalMs: 10, + logger: new Logger("test", "log"), + }).start(); + + const testJobId = "long-running-job-id"; + + // Queue the first job + await worker.enqueue({ + id: testJobId, + job: "testJob", + payload: { value: "first-attempt" }, + }); + + // Verify initial queue size + const size1 = await worker.queue.size({ includeFuture: true }); + expect(size1).toBe(1); + + // Wait until we know the first job has started processing + while (jobStarted.length === 0) { + await new Promise((resolve) => setTimeout(resolve, 10)); + } + + // Now that first job is running, queue second job with same ID + // Set availableAt to be 1.5 seconds in the future (after first job completes) + await worker.enqueue({ + id: testJobId, + job: "testJob", + payload: { value: "second-attempt" }, + availableAt: new Date(Date.now() + 1500), + }); + + // Verify queue size after second enqueue + const size2 = await worker.queue.size({ includeFuture: true }); + const size2Present = await worker.queue.size({ includeFuture: false }); + expect(size2).toBe(1); // Should still be 1 as it's the same ID + + // Wait for the first job to complete + while (!firstJobCompleted) { + await new Promise((resolve) => setTimeout(resolve, 10)); + } + + // Check queue size right after first job completes + const size3 = await worker.queue.size({ includeFuture: true }); + const size3Present = await worker.queue.size({ includeFuture: false }); + + // Wait long enough for the second job to become available and potentially run + await new Promise((resolve) => setTimeout(resolve, 2000)); + + // Final queue size + const size4 = await worker.queue.size({ includeFuture: true }); + const size4Present = await worker.queue.size({ includeFuture: false }); + + // First job should have run + expect(processedPayloads).toContain("first-attempt"); + + // These assertions should fail - demonstrating the bug + // The second job should run after its availableAt time, but doesn't because + // the ack from the first job removed it from Redis entirely + expect(jobStarted).toContain("second-attempt"); + expect(processedPayloads).toContain("second-attempt"); + expect(processedPayloads.length).toBe(2); + + await worker.stop(); + } + ); + + redisTest( + "Should properly remove future-scheduled job after completion", + { timeout: 30_000 }, + async ({ redisContainer }) => { + const processedPayloads: string[] = []; + + const worker = new Worker({ + name: "test-worker", + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + catalog: { + testJob: { + schema: z.object({ value: z.string() }), + visibilityTimeoutMs: 5000, + retry: { maxAttempts: 3 }, + }, + }, + jobs: { + testJob: async ({ payload }) => { + processedPayloads.push(payload.value); + }, + }, + concurrency: { + workers: 1, + tasksPerWorker: 1, + }, + pollIntervalMs: 10, + logger: new Logger("test", "debug"), // Use debug to see all logs + }).start(); + + // Schedule a job 500ms in the future + await worker.enqueue({ + id: "future-job", + job: "testJob", + payload: { value: "test" }, + availableAt: new Date(Date.now() + 500), + }); + + // Verify it's in the future queue + const initialSize = await worker.queue.size(); + const initialSizeWithFuture = await worker.queue.size({ includeFuture: true }); + expect(initialSize).toBe(0); + expect(initialSizeWithFuture).toBe(1); + + // Wait for job to be processed + await new Promise((resolve) => setTimeout(resolve, 1000)); + + // Verify job was processed + expect(processedPayloads).toContain("test"); + + // Verify queue is completely empty + const finalSize = await worker.queue.size(); + const finalSizeWithFuture = await worker.queue.size({ includeFuture: true }); + expect(finalSize).toBe(0); + expect(finalSizeWithFuture).toBe(0); + + await worker.stop(); + } + ); + + redisTest( + "Should properly remove immediate job after completion", + { timeout: 30_000 }, + async ({ redisContainer }) => { + const processedPayloads: string[] = []; + + const worker = new Worker({ + name: "test-worker", + redisOptions: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + password: redisContainer.getPassword(), + }, + catalog: { + testJob: { + schema: z.object({ value: z.string() }), + visibilityTimeoutMs: 5000, + retry: { maxAttempts: 3 }, + }, + }, + jobs: { + testJob: async ({ payload }) => { + processedPayloads.push(payload.value); + }, + }, + concurrency: { + workers: 1, + tasksPerWorker: 1, + }, + pollIntervalMs: 10, + logger: new Logger("test", "debug"), // Use debug to see all logs + }).start(); + + // Enqueue a job to run immediately + await worker.enqueue({ + id: "immediate-job", + job: "testJob", + payload: { value: "test" }, + }); + + // Verify it's in the present queue + const initialSize = await worker.queue.size(); + const initialSizeWithFuture = await worker.queue.size({ includeFuture: true }); + expect(initialSize).toBe(1); + expect(initialSizeWithFuture).toBe(1); + + // Wait for job to be processed + await new Promise((resolve) => setTimeout(resolve, 1000)); + + // Verify job was processed + expect(processedPayloads).toContain("test"); + + // Verify queue is completely empty + const finalSize = await worker.queue.size(); + const finalSizeWithFuture = await worker.queue.size({ includeFuture: true }); + expect(finalSize).toBe(0); + expect(finalSizeWithFuture).toBe(0); + + await worker.stop(); + } + ); }); diff --git a/packages/redis-worker/src/worker.ts b/packages/redis-worker/src/worker.ts index f78d493fae..13e4fd85f1 100644 --- a/packages/redis-worker/src/worker.ts +++ b/packages/redis-worker/src/worker.ts @@ -28,6 +28,7 @@ type JobHandler = (param payload: z.infer; visibilityTimeoutMs: number; attempt: number; + deduplicationKey?: string; }) => Promise; export type WorkerConcurrencyOptions = { @@ -345,7 +346,7 @@ class Worker { * Processes a single item. */ private async processItem( - { id, job, item, visibilityTimeoutMs, attempt, timestamp }: AnyQueueItem, + { id, job, item, visibilityTimeoutMs, attempt, timestamp, deduplicationKey }: AnyQueueItem, batchSize: number, workerId: string ): Promise { @@ -362,7 +363,7 @@ class Worker { async () => { await this.withHistogram( this.metrics.jobDuration, - handler({ id, payload: item, visibilityTimeoutMs, attempt }), + handler({ id, payload: item, visibilityTimeoutMs, attempt, deduplicationKey }), { worker_id: workerId, batch_size: batchSize, @@ -372,7 +373,7 @@ class Worker { ); // On success, acknowledge the item. - await this.queue.ack(id); + await this.queue.ack(id, deduplicationKey); }, { kind: SpanKind.CONSUMER,