@@ -596,4 +596,133 @@ describe("PrismaQueue", () => {
596596 void queue . stop ( ) ;
597597 } ) ;
598598 } ) ;
599+
600+ describe ( "stop behavior" , ( ) => {
601+ let queue : PrismaQueue < EmailJobPayload , EmailJobResult > ;
602+ beforeAll ( ( ) => {
603+ queue = createEmailQueue ( { pollInterval : 100 , jobInterval : 10 } ) ;
604+ } ) ;
605+ beforeEach ( async ( ) => {
606+ await prisma . queueJob . deleteMany ( ) ;
607+ } ) ;
608+ afterEach ( async ( ) => {
609+ await queue . stop ( ) ;
610+ } ) ;
611+
612+ it ( "should wait for in-flight jobs to complete before returning" , async ( ) => {
613+ let jobStarted = false ;
614+ let jobFinished = false ;
615+
616+ queue . worker = vi . fn ( async ( _job ) => {
617+ jobStarted = true ;
618+ await waitFor ( 500 ) ; // Long-running job
619+ jobFinished = true ;
620+ return { code : "200" } ;
621+ } ) ;
622+
623+ await queue . enqueue ( { email :
"[email protected] " } ) ; 624+ void queue . start ( ) ;
625+
626+ // Wait for job to start
627+ await waitFor ( 150 ) ;
628+ expect ( jobStarted ) . toBe ( true ) ;
629+ expect ( jobFinished ) . toBe ( false ) ;
630+
631+ // Stop should wait for the job to complete
632+ const stopPromise = queue . stop ( ) ;
633+
634+ // Job should still be running
635+ expect ( jobFinished ) . toBe ( false ) ;
636+
637+ // Wait for stop to complete
638+ await stopPromise ;
639+
640+ // Job should now be finished
641+ expect ( jobFinished ) . toBe ( true ) ;
642+ expect ( queue . worker ) . toHaveBeenCalledTimes ( 1 ) ;
643+ } ) ;
644+
645+ it ( "should handle stopping with multiple concurrent jobs" , async ( ) => {
646+ const concurrentQueue = createEmailQueue ( { pollInterval : 100 , jobInterval : 10 , maxConcurrency : 3 } ) ;
647+ const jobsStarted : number [ ] = [ ] ;
648+ const jobsFinished : number [ ] = [ ] ;
649+
650+ concurrentQueue . worker = vi . fn ( async ( job : EmailJob ) => {
651+ const jobNum = job . payload . email . includes ( "job1" ) ? 1 : job . payload . email . includes ( "job2" ) ? 2 : 3 ;
652+ jobsStarted . push ( jobNum ) ;
653+ await waitFor ( 300 ) ;
654+ jobsFinished . push ( jobNum ) ;
655+ return { code : "200" } ;
656+ } ) ;
657+
658+ // Enqueue 3 jobs
659+ await Promise . all ( [
660+ concurrentQueue . enqueue ( { email :
"[email protected] " } ) , 661+ concurrentQueue . enqueue ( { email :
"[email protected] " } ) , 662+ concurrentQueue . enqueue ( { email :
"[email protected] " } ) , 663+ ] ) ;
664+
665+ void concurrentQueue . start ( ) ;
666+
667+ // Wait for jobs to start
668+ await waitFor ( 150 ) ;
669+ expect ( jobsStarted . length ) . toBeGreaterThan ( 0 ) ;
670+
671+ // Stop should wait for all jobs
672+ await concurrentQueue . stop ( ) ;
673+
674+ // All started jobs should be finished
675+ expect ( jobsFinished . length ) . toBe ( jobsStarted . length ) ;
676+ expect ( jobsStarted . length ) . toBeGreaterThanOrEqual ( 1 ) ;
677+ } ) ;
678+
679+ it ( "should respect custom timeout option" , async ( ) => {
680+ const slowQueue = createEmailQueue ( { pollInterval : 100 , jobInterval : 10 } ) ;
681+
682+ slowQueue . worker = vi . fn ( async ( _job ) => {
683+ await waitFor ( 5000 ) ; // 5 second job
684+ return { code : "200" } ;
685+ } ) ;
686+
687+ await slowQueue . enqueue ( { email :
"[email protected] " } ) ; 688+ void slowQueue . start ( ) ;
689+
690+ await waitFor ( 150 ) ; // Let job start
691+
692+ // Use a short timeout (1 second)
693+ const startTime = Date . now ( ) ;
694+ await slowQueue . stop ( { timeout : 1000 } ) ;
695+ const stopDuration = Date . now ( ) - startTime ;
696+
697+ // Should timeout around 1 second (give some margin)
698+ expect ( stopDuration ) . toBeLessThan ( 1500 ) ;
699+ expect ( stopDuration ) . toBeGreaterThan ( 900 ) ;
700+ } , 10000 ) ;
701+
702+ it ( "should wait longer with increased timeout" , async ( ) => {
703+ const slowQueue = createEmailQueue ( { pollInterval : 100 , jobInterval : 10 } ) ;
704+ let jobCompleted = false ;
705+
706+ slowQueue . worker = vi . fn ( async ( _job ) => {
707+ await waitFor ( 1500 ) ; // 1.5 second job
708+ jobCompleted = true ;
709+ return { code : "200" } ;
710+ } ) ;
711+
712+ await slowQueue . enqueue ( { email :
"[email protected] " } ) ; 713+ void slowQueue . start ( ) ;
714+
715+ await waitFor ( 150 ) ; // Let job start
716+
717+ // Use a longer timeout that should allow job to complete
718+ await slowQueue . stop ( { timeout : 5000 } ) ;
719+
720+ // Job should have completed
721+ expect ( jobCompleted ) . toBe ( true ) ;
722+ } , 10000 ) ;
723+
724+ afterAll ( async ( ) => {
725+ await queue . stop ( ) ;
726+ } ) ;
727+ } ) ;
599728} ) ;
0 commit comments