@@ -19,7 +19,6 @@ use opentelemetry::{
1919 metrics:: { Counter , Histogram , UpDownCounter } ,
2020} ;
2121use rand:: { Rng , RngCore , distributions:: Uniform } ;
22- use rand_chacha:: ChaChaRng ;
2322use serde:: de:: DeserializeOwned ;
2423use sqlx:: {
2524 Acquire , Either ,
@@ -195,8 +194,6 @@ struct ScheduleDefinition {
195194}
196195
197196pub struct QueueWorker {
198- rng : ChaChaRng ,
199- clock : Box < dyn Clock + Send > ,
200197 listener : PgListener ,
201198 registration : Worker ,
202199 am_i_leader : bool ,
@@ -278,8 +275,6 @@ impl QueueWorker {
278275 let cancellation_guard = cancellation_token. clone ( ) . drop_guard ( ) ;
279276
280277 Ok ( Self {
281- rng,
282- clock,
283278 listener,
284279 registration,
285280 am_i_leader : false ,
@@ -397,6 +392,9 @@ impl QueueWorker {
397392 async fn shutdown ( & mut self ) -> Result < ( ) , QueueRunnerError > {
398393 tracing:: info!( "Shutting down worker" ) ;
399394
395+ let clock = self . state . clock ( ) ;
396+ let mut rng = self . state . rng ( ) ;
397+
400398 // Start a transaction on the existing PgListener connection
401399 let txn = self
402400 . listener
@@ -421,13 +419,13 @@ impl QueueWorker {
421419
422420 // Wait for all the jobs to finish
423421 self . tracker
424- . process_jobs ( & mut self . rng , & self . clock , & mut repo, true )
422+ . process_jobs ( & mut rng, clock, & mut repo, true )
425423 . await ?;
426424
427425 // Tell the other workers we're shutting down
428426 // This also releases the leader election lease
429427 repo. queue_worker ( )
430- . shutdown ( & self . clock , & self . registration )
428+ . shutdown ( clock, & self . registration )
431429 . await ?;
432430
433431 repo. into_inner ( )
@@ -440,12 +438,12 @@ impl QueueWorker {
440438
441439 #[ tracing:: instrument( name = "worker.wait_until_wakeup" , skip_all) ]
442440 async fn wait_until_wakeup ( & mut self ) -> Result < ( ) , QueueRunnerError > {
441+ let mut rng = self . state . rng ( ) ;
442+
443443 // This is to make sure we wake up every second to do the maintenance tasks
444444 // We add a little bit of random jitter to the duration, so that we don't get
445445 // fully synced workers waking up at the same time after each notification
446- let sleep_duration = self
447- . rng
448- . sample ( Uniform :: new ( MIN_SLEEP_DURATION , MAX_SLEEP_DURATION ) ) ;
446+ let sleep_duration = rng. sample ( Uniform :: new ( MIN_SLEEP_DURATION , MAX_SLEEP_DURATION ) ) ;
449447 let wakeup_sleep = tokio:: time:: sleep ( sleep_duration) ;
450448
451449 tokio:: select! {
@@ -490,7 +488,9 @@ impl QueueWorker {
490488 ) ]
491489 async fn tick ( & mut self ) -> Result < ( ) , QueueRunnerError > {
492490 tracing:: debug!( "Tick" ) ;
493- let now = self . clock . now ( ) ;
491+ let clock = self . state . clock ( ) ;
492+ let mut rng = self . state . rng ( ) ;
493+ let now = clock. now ( ) ;
494494
495495 // Start a transaction on the existing PgListener connection
496496 let txn = self
@@ -505,25 +505,25 @@ impl QueueWorker {
505505 if now - self . last_heartbeat >= chrono:: Duration :: minutes ( 1 ) {
506506 tracing:: info!( "Sending heartbeat" ) ;
507507 repo. queue_worker ( )
508- . heartbeat ( & self . clock , & self . registration )
508+ . heartbeat ( clock, & self . registration )
509509 . await ?;
510510 self . last_heartbeat = now;
511511 }
512512
513513 // Remove any dead worker leader leases
514514 repo. queue_worker ( )
515- . remove_leader_lease_if_expired ( & self . clock )
515+ . remove_leader_lease_if_expired ( clock)
516516 . await ?;
517517
518518 // Try to become (or stay) the leader
519519 let leader = repo
520520 . queue_worker ( )
521- . try_get_leader_lease ( & self . clock , & self . registration )
521+ . try_get_leader_lease ( clock, & self . registration )
522522 . await ?;
523523
524524 // Process any job task which finished
525525 self . tracker
526- . process_jobs ( & mut self . rng , & self . clock , & mut repo, false )
526+ . process_jobs ( & mut rng, clock, & mut repo, false )
527527 . await ?;
528528
529529 // Compute how many jobs we should fetch at most
@@ -538,7 +538,7 @@ impl QueueWorker {
538538 let queues = self . tracker . queues ( ) ;
539539 let jobs = repo
540540 . queue_job ( )
541- . reserve ( & self . clock , & self . registration , & queues, max_jobs_to_fetch)
541+ . reserve ( clock, & self . registration , & queues, max_jobs_to_fetch)
542542 . await ?;
543543
544544 for Job {
@@ -592,6 +592,9 @@ impl QueueWorker {
592592 return Err ( QueueRunnerError :: NotLeader ) ;
593593 }
594594
595+ let clock = self . state . clock ( ) ;
596+ let mut rng = self . state . rng ( ) ;
597+
595598 // Start a transaction on the existing PgListener connection
596599 let txn = self
597600 . listener
@@ -633,7 +636,7 @@ impl QueueWorker {
633636 // Look at the state of schedules in the database
634637 let schedules_status = repo. queue_schedule ( ) . list ( ) . await ?;
635638
636- let now = self . clock . now ( ) ;
639+ let now = clock. now ( ) ;
637640 for schedule in & self . schedules {
638641 // Find the schedule status from the database
639642 let Some ( schedule_status) = schedules_status
@@ -670,8 +673,8 @@ impl QueueWorker {
670673
671674 repo. queue_job ( )
672675 . schedule_later (
673- & mut self . rng ,
674- & self . clock ,
676+ & mut rng,
677+ clock,
675678 schedule. queue_name ,
676679 schedule. payload . clone ( ) ,
677680 serde_json:: json!( { } ) ,
@@ -684,16 +687,13 @@ impl QueueWorker {
684687 // We also check if the worker is dead, and if so, we shutdown all the dead
685688 // workers that haven't checked in the last two minutes
686689 repo. queue_worker ( )
687- . shutdown_dead_workers ( & self . clock , Duration :: minutes ( 2 ) )
690+ . shutdown_dead_workers ( clock, Duration :: minutes ( 2 ) )
688691 . await ?;
689692
690693 // TODO: mark tasks those workers had as lost
691694
692695 // Mark all the scheduled jobs as available
693- let scheduled = repo
694- . queue_job ( )
695- . schedule_available_jobs ( & self . clock )
696- . await ?;
696+ let scheduled = repo. queue_job ( ) . schedule_available_jobs ( clock) . await ?;
697697 match scheduled {
698698 0 => { }
699699 1 => tracing:: info!( "One scheduled job marked as available" ) ,
0 commit comments