@@ -4,6 +4,7 @@ import { expect } from "vitest";
44import { z } from "zod" ;
55import { SimpleQueue } from "./queue.js" ;
66import { Logger } from "@trigger.dev/core/logger" ;
7+ import { createRedisClient } from "@internal/redis" ;
78
89describe ( "SimpleQueue" , ( ) => {
910 redisTest ( "enqueue/dequeue" , { timeout : 20_000 } , async ( { redisContainer } ) => {
@@ -328,6 +329,7 @@ describe("SimpleQueue", () => {
328329
329330 // Redrive item from DLQ
330331 await queue . redriveFromDeadLetterQueue ( "1" ) ;
332+ await new Promise ( ( resolve ) => setTimeout ( resolve , 200 ) ) ;
331333 expect ( await queue . size ( ) ) . toBe ( 1 ) ;
332334 expect ( await queue . sizeOfDeadLetterQueue ( ) ) . toBe ( 0 ) ;
333335
@@ -357,4 +359,64 @@ describe("SimpleQueue", () => {
357359 await queue . close ( ) ;
358360 }
359361 } ) ;
362+
363+ redisTest ( "cleanup orphaned queue entries" , { timeout : 20_000 } , async ( { redisContainer } ) => {
364+ const queue = new SimpleQueue ( {
365+ name : "test-orphaned" ,
366+ schema : {
367+ test : z . object ( {
368+ value : z . number ( ) ,
369+ } ) ,
370+ } ,
371+ redisOptions : {
372+ host : redisContainer . getHost ( ) ,
373+ port : redisContainer . getPort ( ) ,
374+ password : redisContainer . getPassword ( ) ,
375+ } ,
376+ logger : new Logger ( "test" , "log" ) ,
377+ } ) ;
378+
379+ try {
380+ // First, add a normal item
381+ await queue . enqueue ( { id : "1" , job : "test" , item : { value : 1 } , visibilityTimeoutMs : 2000 } ) ;
382+
383+ const redisClient = createRedisClient ( {
384+ host : redisContainer . getHost ( ) ,
385+ port : redisContainer . getPort ( ) ,
386+ password : redisContainer . getPassword ( ) ,
387+ } ) ;
388+
389+ // Manually add an orphaned item to the queue (without corresponding hash entry)
390+ await redisClient . zadd ( `{queue:test-orphaned:}queue` , Date . now ( ) , "orphaned-id" ) ;
391+
392+ // Verify both items are in the queue
393+ expect ( await queue . size ( ) ) . toBe ( 2 ) ;
394+
395+ // Dequeue should process both items, but only return the valid one
396+ // and clean up the orphaned entry
397+ const dequeued = await queue . dequeue ( 2 ) ;
398+
399+ // Should only get the valid item
400+ expect ( dequeued ) . toHaveLength ( 1 ) ;
401+ expect ( dequeued [ 0 ] ) . toEqual (
402+ expect . objectContaining ( {
403+ id : "1" ,
404+ job : "test" ,
405+ item : { value : 1 } ,
406+ visibilityTimeoutMs : 2000 ,
407+ attempt : 0 ,
408+ timestamp : expect . any ( Date ) ,
409+ } )
410+ ) ;
411+
412+ // The orphaned item should have been removed
413+ expect ( await queue . size ( { includeFuture : true } ) ) . toBe ( 1 ) ;
414+
415+ // Verify the orphaned ID is no longer in the queue
416+ const orphanedScore = await redisClient . zscore ( `{queue:test-orphaned:}queue` , "orphaned-id" ) ;
417+ expect ( orphanedScore ) . toBeNull ( ) ;
418+ } finally {
419+ await queue . close ( ) ;
420+ }
421+ } ) ;
360422} ) ;
0 commit comments