@@ -713,6 +713,73 @@ impl QueueWorker {
713713
714714 Ok ( ( ) )
715715 }
716+
717+ /// Process all the pending jobs in the queue.
718+ /// This should only be called in tests!
719+ ///
720+ /// # Errors
721+ ///
722+ /// This function can fail if the database connection fails.
723+ pub async fn process_all_jobs_in_tests ( & mut self ) -> Result < ( ) , QueueRunnerError > {
724+ // I swear, I'm the leader!
725+ self . am_i_leader = true ;
726+
727+ // First, perform the leader duties. This will make sure that we schedule
728+ // recurring jobs.
729+ self . perform_leader_duties ( ) . await ?;
730+
731+ let clock = self . state . clock ( ) ;
732+ let mut rng = self . state . rng ( ) ;
733+
734+ // Grab the connection from the PgListener
735+ let txn = self
736+ . listener
737+ . begin ( )
738+ . await
739+ . map_err ( QueueRunnerError :: StartTransaction ) ?;
740+ let mut repo = PgRepository :: from_conn ( txn) ;
741+
742+ // Spawn all the jobs in the database
743+ let queues = self . tracker . queues ( ) ;
744+ let jobs = repo
745+ . queue_job ( )
746+ // I really hope that we don't spawn more than 10k jobs in tests
747+ . reserve ( clock, & self . registration , & queues, 10_000 )
748+ . await ?;
749+
750+ for Job {
751+ id,
752+ queue_name,
753+ payload,
754+ metadata,
755+ attempt,
756+ } in jobs
757+ {
758+ let cancellation_token = self . cancellation_token . child_token ( ) ;
759+ let start = Instant :: now ( ) ;
760+ let context = JobContext {
761+ id,
762+ metadata,
763+ queue_name,
764+ attempt,
765+ start,
766+ cancellation_token,
767+ } ;
768+
769+ self . tracker . spawn_job ( self . state . clone ( ) , context, payload) ;
770+ }
771+
772+ self . tracker
773+ . process_jobs ( & mut rng, clock, & mut repo, true )
774+ . await ?;
775+
776+ repo. into_inner ( )
777+ . commit ( )
778+ . await
779+ . map_err ( QueueRunnerError :: CommitTransaction ) ?;
780+
781+ Ok ( ( ) )
782+ }
716783}
717784
718785/// Tracks running jobs
0 commit comments