@@ -166,6 +166,9 @@ impl<'c> QueueWorkerRepository for PgQueueWorkerRepository<'c> {
166166 clock : & dyn Clock ,
167167 threshold : Duration ,
168168 ) -> Result < ( ) , Self :: Error > {
169+ // Here the threshold is usually set to a few minutes, so we don't need to use
170+ // the database time, as we can assume worker clocks have less than a minute
171+ // skew between each other, else other things would break
169172 let now = clock. now ( ) ;
170173 sqlx:: query!(
171174 r#"
@@ -194,15 +197,15 @@ impl<'c> QueueWorkerRepository for PgQueueWorkerRepository<'c> {
194197 ) ]
195198 async fn remove_leader_lease_if_expired (
196199 & mut self ,
197- clock : & dyn Clock ,
200+ _clock : & dyn Clock ,
198201 ) -> Result < ( ) , Self :: Error > {
199- let now = clock. now ( ) ;
202+ // `expires_at` is a rare exception where we use the database time, as this
203+ // would be very sensitive to clock skew between workers
200204 sqlx:: query!(
201205 r#"
202206 DELETE FROM queue_leader
203- WHERE expires_at < $1
207+ WHERE expires_at < NOW()
204208 "# ,
205- now,
206209 )
207210 . traced ( )
208211 . execute ( & mut * self . conn )
@@ -226,22 +229,23 @@ impl<'c> QueueWorkerRepository for PgQueueWorkerRepository<'c> {
226229 worker : & Worker ,
227230 ) -> Result < bool , Self :: Error > {
228231 let now = clock. now ( ) ;
229- let ttl = Duration :: seconds ( 5 ) ;
230232 // The queue_leader table is meant to only have a single row, which conflicts on
231233 // the `active` column
232234
233235 // If there is a conflict, we update the `expires_at` column ONLY IF the current
234236 // leader is ourselves.
237+
238+ // `expires_at` is a rare exception where we use the database time, as this
239+ // would be very sensitive to clock skew between workers
235240 let res = sqlx:: query!(
236241 r#"
237242 INSERT INTO queue_leader (elected_at, expires_at, queue_worker_id)
238- VALUES ($1, $2 , $3 )
243+ VALUES ($1, NOW() + INTERVAL '5 seconds' , $2 )
239244 ON CONFLICT (active)
240245 DO UPDATE SET expires_at = EXCLUDED.expires_at
241- WHERE queue_leader.queue_worker_id = $3
246+ WHERE queue_leader.queue_worker_id = $2
242247 "# ,
243248 now,
244- now + ttl,
245249 Uuid :: from( worker. id)
246250 )
247251 . traced ( )
0 commit comments