From a0aeb2d4efeb3c9c345619ff6c0b0e22909a7f17 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Mon, 7 Oct 2024 12:07:09 +0200 Subject: [PATCH 1/6] New job queue: worker registration and leader election --- ...0f3f086e4e62c6de9d6864a6a11a2470ebe62.json | 15 ++ ...0dc74505b22c681322bd99b62c2a540c6cd35.json | 15 ++ ...5b531b9873f4139eadcbf1450e726b9a27379.json | 15 ++ ...d356a4ed86fd33400066e422545ffc55f9aa9.json | 16 ++ ...327d03b29fe413d57cce21c67b6d539f59e7d.json | 15 ++ ...84163ae08e16a8e07c6ecbca4fd8cb10da8a5.json | 14 ++ .../20241004075132_queue_worker.sql | 37 +++ crates/storage-pg/src/lib.rs | 1 + crates/storage-pg/src/queue/mod.rs | 8 + crates/storage-pg/src/queue/worker.rs | 237 ++++++++++++++++++ crates/storage-pg/src/repository.rs | 7 + crates/storage/src/lib.rs | 1 + crates/storage/src/queue/mod.rs | 10 + crates/storage/src/queue/worker.rs | 130 ++++++++++ crates/storage/src/repository.rs | 17 ++ crates/tasks/src/lib.rs | 24 +- crates/tasks/src/new_queue.rs | 81 ++++++ 17 files changed, 639 insertions(+), 4 deletions(-) create mode 100644 crates/storage-pg/.sqlx/query-12c4577701416a9dc23708c46700f3f086e4e62c6de9d6864a6a11a2470ebe62.json create mode 100644 crates/storage-pg/.sqlx/query-5f2199865fae3a969bb37429dd70dc74505b22c681322bd99b62c2a540c6cd35.json create mode 100644 crates/storage-pg/.sqlx/query-6bd38759f569fcf972924d12f565b531b9873f4139eadcbf1450e726b9a27379.json create mode 100644 crates/storage-pg/.sqlx/query-8defee03b9ed60c2b8cc6478e34d356a4ed86fd33400066e422545ffc55f9aa9.json create mode 100644 crates/storage-pg/.sqlx/query-966ca0f7eebd2896c007b2fd6e9327d03b29fe413d57cce21c67b6d539f59e7d.json create mode 100644 crates/storage-pg/.sqlx/query-ed8bcbcd4b7f93f654670cf077f84163ae08e16a8e07c6ecbca4fd8cb10da8a5.json create mode 100644 crates/storage-pg/migrations/20241004075132_queue_worker.sql create mode 100644 crates/storage-pg/src/queue/mod.rs create mode 100644 crates/storage-pg/src/queue/worker.rs create mode 100644 crates/storage/src/queue/mod.rs create mode 100644 crates/storage/src/queue/worker.rs create mode 100644 crates/tasks/src/new_queue.rs diff --git a/crates/storage-pg/.sqlx/query-12c4577701416a9dc23708c46700f3f086e4e62c6de9d6864a6a11a2470ebe62.json b/crates/storage-pg/.sqlx/query-12c4577701416a9dc23708c46700f3f086e4e62c6de9d6864a6a11a2470ebe62.json new file mode 100644 index 000000000..dce1983fe --- /dev/null +++ b/crates/storage-pg/.sqlx/query-12c4577701416a9dc23708c46700f3f086e4e62c6de9d6864a6a11a2470ebe62.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO queue_workers (queue_worker_id, registered_at, last_seen_at)\n VALUES ($1, $2, $2)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Timestamptz" + ] + }, + "nullable": [] + }, + "hash": "12c4577701416a9dc23708c46700f3f086e4e62c6de9d6864a6a11a2470ebe62" +} diff --git a/crates/storage-pg/.sqlx/query-5f2199865fae3a969bb37429dd70dc74505b22c681322bd99b62c2a540c6cd35.json b/crates/storage-pg/.sqlx/query-5f2199865fae3a969bb37429dd70dc74505b22c681322bd99b62c2a540c6cd35.json new file mode 100644 index 000000000..364a1c6b6 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-5f2199865fae3a969bb37429dd70dc74505b22c681322bd99b62c2a540c6cd35.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE queue_workers\n SET shutdown_at = $2\n WHERE queue_worker_id = $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Timestamptz" + ] + }, + "nullable": [] + }, + "hash": "5f2199865fae3a969bb37429dd70dc74505b22c681322bd99b62c2a540c6cd35" +} diff --git a/crates/storage-pg/.sqlx/query-6bd38759f569fcf972924d12f565b531b9873f4139eadcbf1450e726b9a27379.json b/crates/storage-pg/.sqlx/query-6bd38759f569fcf972924d12f565b531b9873f4139eadcbf1450e726b9a27379.json new file mode 100644 index 000000000..4898fc432 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-6bd38759f569fcf972924d12f565b531b9873f4139eadcbf1450e726b9a27379.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE queue_workers\n SET shutdown_at = $1\n WHERE shutdown_at IS NULL\n AND last_seen_at < $2\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Timestamptz", + "Timestamptz" + ] + }, + "nullable": [] + }, + "hash": "6bd38759f569fcf972924d12f565b531b9873f4139eadcbf1450e726b9a27379" +} diff --git a/crates/storage-pg/.sqlx/query-8defee03b9ed60c2b8cc6478e34d356a4ed86fd33400066e422545ffc55f9aa9.json b/crates/storage-pg/.sqlx/query-8defee03b9ed60c2b8cc6478e34d356a4ed86fd33400066e422545ffc55f9aa9.json new file mode 100644 index 000000000..9195a9d4d --- /dev/null +++ b/crates/storage-pg/.sqlx/query-8defee03b9ed60c2b8cc6478e34d356a4ed86fd33400066e422545ffc55f9aa9.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO queue_leader (elected_at, expires_at, queue_worker_id)\n VALUES ($1, $2, $3)\n ON CONFLICT (active)\n DO UPDATE SET expires_at = EXCLUDED.expires_at\n WHERE queue_leader.queue_worker_id = $3\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Timestamptz", + "Timestamptz", + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "8defee03b9ed60c2b8cc6478e34d356a4ed86fd33400066e422545ffc55f9aa9" +} diff --git a/crates/storage-pg/.sqlx/query-966ca0f7eebd2896c007b2fd6e9327d03b29fe413d57cce21c67b6d539f59e7d.json b/crates/storage-pg/.sqlx/query-966ca0f7eebd2896c007b2fd6e9327d03b29fe413d57cce21c67b6d539f59e7d.json new file mode 100644 index 000000000..3e1fb3580 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-966ca0f7eebd2896c007b2fd6e9327d03b29fe413d57cce21c67b6d539f59e7d.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE queue_workers\n SET last_seen_at = $2\n WHERE queue_worker_id = $1 AND shutdown_at IS NULL\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Timestamptz" + ] + }, + "nullable": [] + }, + "hash": "966ca0f7eebd2896c007b2fd6e9327d03b29fe413d57cce21c67b6d539f59e7d" +} diff --git a/crates/storage-pg/.sqlx/query-ed8bcbcd4b7f93f654670cf077f84163ae08e16a8e07c6ecbca4fd8cb10da8a5.json b/crates/storage-pg/.sqlx/query-ed8bcbcd4b7f93f654670cf077f84163ae08e16a8e07c6ecbca4fd8cb10da8a5.json new file mode 100644 index 000000000..af6213a8a --- /dev/null +++ b/crates/storage-pg/.sqlx/query-ed8bcbcd4b7f93f654670cf077f84163ae08e16a8e07c6ecbca4fd8cb10da8a5.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM queue_leader\n WHERE expires_at < $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Timestamptz" + ] + }, + "nullable": [] + }, + "hash": "ed8bcbcd4b7f93f654670cf077f84163ae08e16a8e07c6ecbca4fd8cb10da8a5" +} diff --git a/crates/storage-pg/migrations/20241004075132_queue_worker.sql b/crates/storage-pg/migrations/20241004075132_queue_worker.sql new file mode 100644 index 000000000..07b49d22d --- /dev/null +++ b/crates/storage-pg/migrations/20241004075132_queue_worker.sql @@ -0,0 +1,37 @@ +-- Copyright 2024 New Vector Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only +-- Please see LICENSE in the repository root for full details. + +-- This table stores informations about worker, mostly to track their health +CREATE TABLE queue_workers ( + queue_worker_id UUID NOT NULL PRIMARY KEY, + + -- When the worker was registered + registered_at TIMESTAMP WITH TIME ZONE NOT NULL, + + -- When the worker was last seen + last_seen_at TIMESTAMP WITH TIME ZONE NOT NULL, + + -- When the worker was shut down + shutdown_at TIMESTAMP WITH TIME ZONE +); + +-- This single-row table stores the leader of the queue +-- The leader is responsible for running maintenance tasks +CREATE UNLOGGED TABLE queue_leader ( + -- This makes the row unique + active BOOLEAN NOT NULL DEFAULT TRUE UNIQUE, + + -- When the leader was elected + elected_at TIMESTAMP WITH TIME ZONE NOT NULL, + + -- Until when the lease is valid + expires_at TIMESTAMP WITH TIME ZONE NOT NULL, + + -- The worker ID of the leader + queue_worker_id UUID NOT NULL REFERENCES queue_workers (queue_worker_id), + + -- This, combined with the unique constraint, makes sure we only ever have a single row + CONSTRAINT queue_leader_active CHECK (active IS TRUE) +); diff --git a/crates/storage-pg/src/lib.rs b/crates/storage-pg/src/lib.rs index aa7fadd55..e16303278 100644 --- a/crates/storage-pg/src/lib.rs +++ b/crates/storage-pg/src/lib.rs @@ -166,6 +166,7 @@ pub mod app_session; pub mod compat; pub mod job; pub mod oauth2; +pub mod queue; pub mod upstream_oauth2; pub mod user; diff --git a/crates/storage-pg/src/queue/mod.rs b/crates/storage-pg/src/queue/mod.rs new file mode 100644 index 000000000..b6ba8295e --- /dev/null +++ b/crates/storage-pg/src/queue/mod.rs @@ -0,0 +1,8 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +//! A module containing the PostgreSQL implementation of the job queue + +pub mod worker; diff --git a/crates/storage-pg/src/queue/worker.rs b/crates/storage-pg/src/queue/worker.rs new file mode 100644 index 000000000..2aaacc64b --- /dev/null +++ b/crates/storage-pg/src/queue/worker.rs @@ -0,0 +1,237 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +//! A module containing the PostgreSQL implementation of the +//! [`QueueWorkerRepository`]. + +use async_trait::async_trait; +use chrono::Duration; +use mas_storage::{ + queue::{QueueWorkerRepository, Worker}, + Clock, +}; +use rand::RngCore; +use sqlx::PgConnection; +use ulid::Ulid; +use uuid::Uuid; + +use crate::{DatabaseError, ExecuteExt}; + +/// An implementation of [`QueueWorkerRepository`] for a PostgreSQL connection. +pub struct PgQueueWorkerRepository<'c> { + conn: &'c mut PgConnection, +} + +impl<'c> PgQueueWorkerRepository<'c> { + /// Create a new [`PgQueueWorkerRepository`] from an active PostgreSQL + /// connection. + #[must_use] + pub fn new(conn: &'c mut PgConnection) -> Self { + Self { conn } + } +} + +#[async_trait] +impl QueueWorkerRepository for PgQueueWorkerRepository<'_> { + type Error = DatabaseError; + + #[tracing::instrument( + name = "db.queue_worker.register", + skip_all, + fields( + worker.id, + db.query.text, + ), + err, + )] + async fn register( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &dyn Clock, + ) -> Result { + let now = clock.now(); + let worker_id = Ulid::from_datetime_with_source(now.into(), rng); + tracing::Span::current().record("worker.id", tracing::field::display(worker_id)); + + sqlx::query!( + r#" + INSERT INTO queue_workers (queue_worker_id, registered_at, last_seen_at) + VALUES ($1, $2, $2) + "#, + Uuid::from(worker_id), + now, + ) + .traced() + .execute(&mut *self.conn) + .await?; + + Ok(Worker { id: worker_id }) + } + + #[tracing::instrument( + name = "db.queue_worker.heartbeat", + skip_all, + fields( + %worker.id, + db.query.text, + ), + err, + )] + async fn heartbeat( + &mut self, + clock: &dyn Clock, + worker: Worker, + ) -> Result { + let now = clock.now(); + let res = sqlx::query!( + r#" + UPDATE queue_workers + SET last_seen_at = $2 + WHERE queue_worker_id = $1 AND shutdown_at IS NULL + "#, + Uuid::from(worker.id), + now, + ) + .traced() + .execute(&mut *self.conn) + .await?; + + // If no row was updated, the worker was shutdown so we return an error + DatabaseError::ensure_affected_rows(&res, 1)?; + + Ok(worker) + } + + #[tracing::instrument( + name = "db.queue_worker.shutdown", + skip_all, + fields( + %worker.id, + db.query.text, + ), + err, + )] + async fn shutdown(&mut self, clock: &dyn Clock, worker: Worker) -> Result<(), Self::Error> { + let now = clock.now(); + let res = sqlx::query!( + r#" + UPDATE queue_workers + SET shutdown_at = $2 + WHERE queue_worker_id = $1 + "#, + Uuid::from(worker.id), + now, + ) + .traced() + .execute(&mut *self.conn) + .await?; + + DatabaseError::ensure_affected_rows(&res, 1)?; + + Ok(()) + } + + #[tracing::instrument( + name = "db.queue_worker.shutdown_dead_workers", + skip_all, + fields( + db.query.text, + ), + err, + )] + async fn shutdown_dead_workers( + &mut self, + clock: &dyn Clock, + threshold: Duration, + ) -> Result<(), Self::Error> { + let now = clock.now(); + sqlx::query!( + r#" + UPDATE queue_workers + SET shutdown_at = $1 + WHERE shutdown_at IS NULL + AND last_seen_at < $2 + "#, + now, + now - threshold, + ) + .traced() + .execute(&mut *self.conn) + .await?; + + Ok(()) + } + + #[tracing::instrument( + name = "db.queue_worker.remove_leader_lease_if_expired", + skip_all, + fields( + db.query.text, + ), + err, + )] + async fn remove_leader_lease_if_expired( + &mut self, + clock: &dyn Clock, + ) -> Result<(), Self::Error> { + let now = clock.now(); + sqlx::query!( + r#" + DELETE FROM queue_leader + WHERE expires_at < $1 + "#, + now, + ) + .traced() + .execute(&mut *self.conn) + .await?; + + Ok(()) + } + + #[tracing::instrument( + name = "db.queue_worker.try_get_leader_lease", + skip_all, + fields( + %worker.id, + db.query.text, + ), + err, + )] + async fn try_get_leader_lease( + &mut self, + clock: &dyn Clock, + worker: &Worker, + ) -> Result { + let now = clock.now(); + let ttl = Duration::seconds(5); + // The queue_leader table is meant to only have a single row, which conflicts on + // the `active` column + + // If there is a conflict, we update the `expires_at` column ONLY IF the current + // leader is ourselves. + let res = sqlx::query!( + r#" + INSERT INTO queue_leader (elected_at, expires_at, queue_worker_id) + VALUES ($1, $2, $3) + ON CONFLICT (active) + DO UPDATE SET expires_at = EXCLUDED.expires_at + WHERE queue_leader.queue_worker_id = $3 + "#, + now, + now + ttl, + Uuid::from(worker.id) + ) + .traced() + .execute(&mut *self.conn) + .await?; + + // We can then detect whether we are the leader or not by checking how many rows + // were affected by the upsert + let am_i_the_leader = res.rows_affected() == 1; + + Ok(am_i_the_leader) + } +} diff --git a/crates/storage-pg/src/repository.rs b/crates/storage-pg/src/repository.rs index 284f5e2dc..99580467c 100644 --- a/crates/storage-pg/src/repository.rs +++ b/crates/storage-pg/src/repository.rs @@ -40,6 +40,7 @@ use crate::{ PgOAuth2ClientRepository, PgOAuth2DeviceCodeGrantRepository, PgOAuth2RefreshTokenRepository, PgOAuth2SessionRepository, }, + queue::worker::PgQueueWorkerRepository, upstream_oauth2::{ PgUpstreamOAuthLinkRepository, PgUpstreamOAuthProviderRepository, PgUpstreamOAuthSessionRepository, @@ -263,4 +264,10 @@ where fn job<'c>(&'c mut self) -> Box + 'c> { Box::new(PgJobRepository::new(self.conn.as_mut())) } + + fn queue_worker<'c>( + &'c mut self, + ) -> Box + 'c> { + Box::new(PgQueueWorkerRepository::new(self.conn.as_mut())) + } } diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index f2f699ba6..30dc553de 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -120,6 +120,7 @@ pub mod app_session; pub mod compat; pub mod job; pub mod oauth2; +pub mod queue; pub mod upstream_oauth2; pub mod user; diff --git a/crates/storage/src/queue/mod.rs b/crates/storage/src/queue/mod.rs new file mode 100644 index 000000000..4ca97ec5e --- /dev/null +++ b/crates/storage/src/queue/mod.rs @@ -0,0 +1,10 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +//! A module containing repositories for the job queue + +mod worker; + +pub use self::worker::{QueueWorkerRepository, Worker}; diff --git a/crates/storage/src/queue/worker.rs b/crates/storage/src/queue/worker.rs new file mode 100644 index 000000000..dfb9699e6 --- /dev/null +++ b/crates/storage/src/queue/worker.rs @@ -0,0 +1,130 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +//! Repository to interact with workers in the job queue + +use async_trait::async_trait; +use chrono::Duration; +use rand_core::RngCore; +use ulid::Ulid; + +use crate::{repository_impl, Clock}; + +/// A worker is an entity which can execute jobs. +pub struct Worker { + /// The ID of the worker. + pub id: Ulid, +} + +/// A [`QueueWorkerRepository`] is used to schedule jobs to be executed by a +/// worker. +#[async_trait] +pub trait QueueWorkerRepository: Send + Sync { + /// The error type returned by the repository. + type Error; + + /// Register a new worker. + /// + /// Returns a reference to the worker. + /// + /// # Errors + /// + /// Returns an error if the underlying repository fails. + async fn register( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &dyn Clock, + ) -> Result; + + /// Send a heartbeat for the given worker. + /// + /// Returns the updated worker. + /// + /// # Errors + /// + /// Returns an error if the underlying repository fails or if the worker was + /// shutdown. + async fn heartbeat(&mut self, clock: &dyn Clock, worker: Worker) + -> Result; + + /// Mark the given worker as shutdown. + /// + /// # Errors + /// + /// Returns an error if the underlying repository fails. + async fn shutdown(&mut self, clock: &dyn Clock, worker: Worker) -> Result<(), Self::Error>; + + /// Find dead workers and shut them down. + /// + /// # Errors + /// + /// Returns an error if the underlying repository fails. + async fn shutdown_dead_workers( + &mut self, + clock: &dyn Clock, + threshold: Duration, + ) -> Result<(), Self::Error>; + + /// Remove the leader lease if it is expired, sending a notification to + /// trigger a new leader election. + /// + /// # Errors + /// + /// Returns an error if the underlying repository fails. + async fn remove_leader_lease_if_expired( + &mut self, + clock: &dyn Clock, + ) -> Result<(), Self::Error>; + + /// Try to get the leader lease, renewing it if we already have it + /// + /// Returns `true` if we got the leader lease, `false` if we didn't + /// + /// # Errors + /// + /// Returns an error if the underlying repository fails. + async fn try_get_leader_lease( + &mut self, + clock: &dyn Clock, + worker: &Worker, + ) -> Result; +} + +repository_impl!(QueueWorkerRepository: + async fn register( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &dyn Clock, + ) -> Result; + + async fn heartbeat( + &mut self, + clock: &dyn Clock, + worker: Worker, + ) -> Result; + + async fn shutdown( + &mut self, + clock: &dyn Clock, + worker: Worker, + ) -> Result<(), Self::Error>; + + async fn shutdown_dead_workers( + &mut self, + clock: &dyn Clock, + threshold: Duration, + ) -> Result<(), Self::Error>; + + async fn remove_leader_lease_if_expired( + &mut self, + clock: &dyn Clock, + ) -> Result<(), Self::Error>; + + async fn try_get_leader_lease( + &mut self, + clock: &dyn Clock, + worker: &Worker, + ) -> Result; +); diff --git a/crates/storage/src/repository.rs b/crates/storage/src/repository.rs index a78d51d1d..55d19d281 100644 --- a/crates/storage/src/repository.rs +++ b/crates/storage/src/repository.rs @@ -18,6 +18,7 @@ use crate::{ OAuth2AccessTokenRepository, OAuth2AuthorizationGrantRepository, OAuth2ClientRepository, OAuth2DeviceCodeGrantRepository, OAuth2RefreshTokenRepository, OAuth2SessionRepository, }, + queue::QueueWorkerRepository, upstream_oauth2::{ UpstreamOAuthLinkRepository, UpstreamOAuthProviderRepository, UpstreamOAuthSessionRepository, @@ -191,6 +192,9 @@ pub trait RepositoryAccess: Send { /// Get a [`JobRepository`] fn job<'c>(&'c mut self) -> Box + 'c>; + + /// Get a [`QueueWorkerRepository`] + fn queue_worker<'c>(&'c mut self) -> Box + 'c>; } /// Implementations of the [`RepositoryAccess`], [`RepositoryTransaction`] and @@ -211,6 +215,7 @@ mod impls { OAuth2ClientRepository, OAuth2DeviceCodeGrantRepository, OAuth2RefreshTokenRepository, OAuth2SessionRepository, }, + queue::QueueWorkerRepository, upstream_oauth2::{ UpstreamOAuthLinkRepository, UpstreamOAuthProviderRepository, UpstreamOAuthSessionRepository, @@ -405,6 +410,12 @@ mod impls { fn job<'c>(&'c mut self) -> Box + 'c> { Box::new(MapErr::new(self.inner.job(), &mut self.mapper)) } + + fn queue_worker<'c>( + &'c mut self, + ) -> Box + 'c> { + Box::new(MapErr::new(self.inner.queue_worker(), &mut self.mapper)) + } } impl RepositoryAccess for Box { @@ -527,5 +538,11 @@ mod impls { fn job<'c>(&'c mut self) -> Box + 'c> { (**self).job() } + + fn queue_worker<'c>( + &'c mut self, + ) -> Box + 'c> { + (**self).queue_worker() + } } } diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index 8d012bdae..52e9683cc 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -10,8 +10,8 @@ use apalis_core::{executor::TokioExecutor, layers::extensions::Extension, monito use mas_email::Mailer; use mas_matrix::HomeserverConnection; use mas_router::UrlBuilder; -use mas_storage::{BoxClock, BoxRepository, SystemClock}; -use mas_storage_pg::{DatabaseError, PgRepository}; +use mas_storage::{BoxClock, BoxRepository, RepositoryError, SystemClock}; +use mas_storage_pg::PgRepository; use rand::SeedableRng; use sqlx::{Pool, Postgres}; use tracing::debug; @@ -21,6 +21,7 @@ use crate::storage::PostgresStorageFactory; mod database; mod email; mod matrix; +mod new_queue; mod recovery; mod storage; mod user; @@ -74,8 +75,11 @@ impl State { rand_chacha::ChaChaRng::from_rng(rand::thread_rng()).expect("failed to seed rng") } - pub async fn repository(&self) -> Result { - let repo = PgRepository::from_pool(self.pool()).await?.boxed(); + pub async fn repository(&self) -> Result { + let repo = PgRepository::from_pool(self.pool()) + .await + .map_err(RepositoryError::from_error)? + .boxed(); Ok(repo) } @@ -156,5 +160,17 @@ pub async fn init( // TODO: we might want to grab the join handle here factory.listen().await?; debug!(?monitor, "workers registered"); + + // TODO: this is just spawning the task in the background, we probably actually + // want to wrap that in a structure, and handle graceful shutdown correctly + tokio::spawn(async move { + if let Err(e) = self::new_queue::run(state).await { + tracing::error!( + error = &e as &dyn std::error::Error, + "Failed to run new queue" + ); + } + }); + Ok(monitor) } diff --git a/crates/tasks/src/new_queue.rs b/crates/tasks/src/new_queue.rs new file mode 100644 index 000000000..eabf17aa6 --- /dev/null +++ b/crates/tasks/src/new_queue.rs @@ -0,0 +1,81 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +use chrono::Duration; +use mas_storage::{RepositoryAccess, RepositoryError}; + +use crate::State; + +pub async fn run(state: State) -> Result<(), RepositoryError> { + let span = tracing::info_span!("worker.init", worker.id = tracing::field::Empty); + let guard = span.enter(); + let mut repo = state.repository().await?; + let mut rng = state.rng(); + let clock = state.clock(); + + let mut worker = repo.queue_worker().register(&mut rng, &clock).await?; + span.record("worker.id", tracing::field::display(worker.id)); + repo.save().await?; + + tracing::info!("Registered worker"); + drop(guard); + + let mut was_i_the_leader = false; + + // Record when we last sent a heartbeat + let mut last_heartbeat = clock.now(); + + loop { + // This is to make sure we wake up every second to do the maintenance tasks + // Later we might wait on other events, like a PG notification + let wakeup_sleep = tokio::time::sleep(std::time::Duration::from_secs(1)); + wakeup_sleep.await; + + let span = tracing::info_span!("worker.tick", %worker.id); + let _guard = span.enter(); + + tracing::debug!("Tick"); + let now = clock.now(); + let mut repo = state.repository().await?; + + // We send a heartbeat every minute, to avoid writing to the database too often + // on a logged table + if now - last_heartbeat >= chrono::Duration::minutes(1) { + tracing::info!("Sending heartbeat"); + worker = repo.queue_worker().heartbeat(&clock, worker).await?; + last_heartbeat = now; + } + + // Remove any dead worker leader leases + repo.queue_worker() + .remove_leader_lease_if_expired(&clock) + .await?; + + // Try to become (or stay) the leader + let am_i_the_leader = repo + .queue_worker() + .try_get_leader_lease(&clock, &worker) + .await?; + + // Log any changes in leadership + if !was_i_the_leader && am_i_the_leader { + tracing::info!("I'm the leader now"); + } else if was_i_the_leader && !am_i_the_leader { + tracing::warn!("I am no longer the leader"); + } + was_i_the_leader = am_i_the_leader; + + // The leader does all the maintenance work + if am_i_the_leader { + // We also check if the worker is dead, and if so, we shutdown all the dead + // workers that haven't checked in the last two minutes + repo.queue_worker() + .shutdown_dead_workers(&clock, Duration::minutes(2)) + .await?; + } + + repo.save().await?; + } +} From 63632645196562671514b5a07537f2330f65fa6b Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Wed, 9 Oct 2024 10:28:59 +0200 Subject: [PATCH 2/6] Make the worker heartbeat take a worker reference --- crates/storage-pg/src/queue/worker.rs | 8 ++------ crates/storage/src/queue/worker.rs | 9 +++------ crates/tasks/src/new_queue.rs | 4 ++-- 3 files changed, 7 insertions(+), 14 deletions(-) diff --git a/crates/storage-pg/src/queue/worker.rs b/crates/storage-pg/src/queue/worker.rs index 2aaacc64b..5fa784cb1 100644 --- a/crates/storage-pg/src/queue/worker.rs +++ b/crates/storage-pg/src/queue/worker.rs @@ -79,11 +79,7 @@ impl QueueWorkerRepository for PgQueueWorkerRepository<'_> { ), err, )] - async fn heartbeat( - &mut self, - clock: &dyn Clock, - worker: Worker, - ) -> Result { + async fn heartbeat(&mut self, clock: &dyn Clock, worker: &Worker) -> Result<(), Self::Error> { let now = clock.now(); let res = sqlx::query!( r#" @@ -101,7 +97,7 @@ impl QueueWorkerRepository for PgQueueWorkerRepository<'_> { // If no row was updated, the worker was shutdown so we return an error DatabaseError::ensure_affected_rows(&res, 1)?; - Ok(worker) + Ok(()) } #[tracing::instrument( diff --git a/crates/storage/src/queue/worker.rs b/crates/storage/src/queue/worker.rs index dfb9699e6..19ceead88 100644 --- a/crates/storage/src/queue/worker.rs +++ b/crates/storage/src/queue/worker.rs @@ -40,14 +40,11 @@ pub trait QueueWorkerRepository: Send + Sync { /// Send a heartbeat for the given worker. /// - /// Returns the updated worker. - /// /// # Errors /// /// Returns an error if the underlying repository fails or if the worker was /// shutdown. - async fn heartbeat(&mut self, clock: &dyn Clock, worker: Worker) - -> Result; + async fn heartbeat(&mut self, clock: &dyn Clock, worker: &Worker) -> Result<(), Self::Error>; /// Mark the given worker as shutdown. /// @@ -102,8 +99,8 @@ repository_impl!(QueueWorkerRepository: async fn heartbeat( &mut self, clock: &dyn Clock, - worker: Worker, - ) -> Result; + worker: &Worker, + ) -> Result<(), Self::Error>; async fn shutdown( &mut self, diff --git a/crates/tasks/src/new_queue.rs b/crates/tasks/src/new_queue.rs index eabf17aa6..4a8058b59 100644 --- a/crates/tasks/src/new_queue.rs +++ b/crates/tasks/src/new_queue.rs @@ -15,7 +15,7 @@ pub async fn run(state: State) -> Result<(), RepositoryError> { let mut rng = state.rng(); let clock = state.clock(); - let mut worker = repo.queue_worker().register(&mut rng, &clock).await?; + let worker = repo.queue_worker().register(&mut rng, &clock).await?; span.record("worker.id", tracing::field::display(worker.id)); repo.save().await?; @@ -44,7 +44,7 @@ pub async fn run(state: State) -> Result<(), RepositoryError> { // on a logged table if now - last_heartbeat >= chrono::Duration::minutes(1) { tracing::info!("Sending heartbeat"); - worker = repo.queue_worker().heartbeat(&clock, worker).await?; + repo.queue_worker().heartbeat(&clock, &worker).await?; last_heartbeat = now; } From 8c1a87b0df513071fbacaadfd66af24dd95dcd1f Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Mon, 14 Oct 2024 10:51:14 +0200 Subject: [PATCH 3/6] Move the worker logic in a struct --- crates/tasks/src/lib.rs | 13 +- crates/tasks/src/new_queue.rs | 218 +++++++++++++++++++++++++++------- 2 files changed, 183 insertions(+), 48 deletions(-) diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index 52e9683cc..0db6b6b81 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -12,6 +12,7 @@ use mas_matrix::HomeserverConnection; use mas_router::UrlBuilder; use mas_storage::{BoxClock, BoxRepository, RepositoryError, SystemClock}; use mas_storage_pg::PgRepository; +use new_queue::QueueRunnerError; use rand::SeedableRng; use sqlx::{Pool, Postgres}; use tracing::debug; @@ -142,7 +143,7 @@ pub async fn init( mailer: &Mailer, homeserver: impl HomeserverConnection + 'static, url_builder: UrlBuilder, -) -> Result, sqlx::Error> { +) -> Result, QueueRunnerError> { let state = State::new( pool.clone(), SystemClock::default(), @@ -158,13 +159,19 @@ pub async fn init( let monitor = self::user::register(name, monitor, &state, &factory); let monitor = self::recovery::register(name, monitor, &state, &factory); // TODO: we might want to grab the join handle here - factory.listen().await?; + // TODO: this error isn't right, I just want that to compile + factory + .listen() + .await + .map_err(QueueRunnerError::SetupListener)?; debug!(?monitor, "workers registered"); + let mut worker = self::new_queue::QueueWorker::new(state).await?; + // TODO: this is just spawning the task in the background, we probably actually // want to wrap that in a structure, and handle graceful shutdown correctly tokio::spawn(async move { - if let Err(e) = self::new_queue::run(state).await { + if let Err(e) = worker.run().await { tracing::error!( error = &e as &dyn std::error::Error, "Failed to run new queue" diff --git a/crates/tasks/src/new_queue.rs b/crates/tasks/src/new_queue.rs index 4a8058b59..c3596c0e9 100644 --- a/crates/tasks/src/new_queue.rs +++ b/crates/tasks/src/new_queue.rs @@ -3,79 +3,207 @@ // SPDX-License-Identifier: AGPL-3.0-only // Please see LICENSE in the repository root for full details. -use chrono::Duration; -use mas_storage::{RepositoryAccess, RepositoryError}; +use chrono::{DateTime, Duration, Utc}; +use mas_storage::{queue::Worker, Clock, RepositoryAccess, RepositoryError}; +use mas_storage_pg::{DatabaseError, PgRepository}; +use rand::{distributions::Uniform, Rng}; +use rand_chacha::ChaChaRng; +use sqlx::PgPool; +use thiserror::Error; use crate::State; -pub async fn run(state: State) -> Result<(), RepositoryError> { - let span = tracing::info_span!("worker.init", worker.id = tracing::field::Empty); - let guard = span.enter(); - let mut repo = state.repository().await?; - let mut rng = state.rng(); - let clock = state.clock(); +#[derive(Debug, Error)] +pub enum QueueRunnerError { + #[error("Failed to setup listener")] + SetupListener(#[source] sqlx::Error), - let worker = repo.queue_worker().register(&mut rng, &clock).await?; - span.record("worker.id", tracing::field::display(worker.id)); - repo.save().await?; + #[error("Failed to start transaction")] + StartTransaction(#[source] sqlx::Error), - tracing::info!("Registered worker"); - drop(guard); + #[error("Failed to commit transaction")] + CommitTransaction(#[source] sqlx::Error), - let mut was_i_the_leader = false; + #[error(transparent)] + Repository(#[from] RepositoryError), - // Record when we last sent a heartbeat - let mut last_heartbeat = clock.now(); + #[error(transparent)] + Database(#[from] DatabaseError), - loop { + #[error("Worker is not the leader")] + NotLeader, +} + +const MIN_SLEEP_DURATION: std::time::Duration = std::time::Duration::from_millis(900); +const MAX_SLEEP_DURATION: std::time::Duration = std::time::Duration::from_millis(1100); + +pub struct QueueWorker { + rng: ChaChaRng, + clock: Box, + pool: PgPool, + registration: Worker, + am_i_leader: bool, + last_heartbeat: DateTime, +} + +impl QueueWorker { + #[tracing::instrument( + name = "worker.init", + skip_all, + fields(worker.id) + )] + pub async fn new(state: State) -> Result { + let mut rng = state.rng(); + let clock = state.clock(); + let pool = state.pool().clone(); + + let txn = pool + .begin() + .await + .map_err(QueueRunnerError::StartTransaction)?; + let mut repo = PgRepository::from_conn(txn); + + let registration = repo.queue_worker().register(&mut rng, &clock).await?; + tracing::Span::current().record("worker.id", tracing::field::display(registration.id)); + repo.into_inner() + .commit() + .await + .map_err(QueueRunnerError::CommitTransaction)?; + + tracing::info!("Registered worker"); + let now = clock.now(); + + Ok(Self { + rng, + clock, + pool, + registration, + am_i_leader: false, + last_heartbeat: now, + }) + } + + pub async fn run(&mut self) -> Result<(), QueueRunnerError> { + loop { + self.run_loop().await?; + } + } + + #[tracing::instrument(name = "worker.run_loop", skip_all, err)] + async fn run_loop(&mut self) -> Result<(), QueueRunnerError> { + self.wait_until_wakeup().await?; + self.tick().await?; + + if self.am_i_leader { + self.perform_leader_duties().await?; + } + + Ok(()) + } + + #[tracing::instrument(name = "worker.wait_until_wakeup", skip_all, err)] + async fn wait_until_wakeup(&mut self) -> Result<(), QueueRunnerError> { // This is to make sure we wake up every second to do the maintenance tasks - // Later we might wait on other events, like a PG notification - let wakeup_sleep = tokio::time::sleep(std::time::Duration::from_secs(1)); - wakeup_sleep.await; + // We add a little bit of random jitter to the duration, so that we don't get + // fully synced workers waking up at the same time after each notification + let sleep_duration = self + .rng + .sample(Uniform::new(MIN_SLEEP_DURATION, MAX_SLEEP_DURATION)); + tokio::time::sleep(sleep_duration).await; + tracing::debug!("Woke up from sleep"); + + Ok(()) + } - let span = tracing::info_span!("worker.tick", %worker.id); - let _guard = span.enter(); + fn set_new_leader_state(&mut self, state: bool) { + // Do nothing if we were already on that state + if state == self.am_i_leader { + return; + } + // If we flipped state, log it + self.am_i_leader = state; + if self.am_i_leader { + tracing::info!("I'm the leader now"); + } else { + tracing::warn!("I am no longer the leader"); + } + } + + #[tracing::instrument( + name = "worker.tick", + skip_all, + fields(worker.id = %self.registration.id), + err, + )] + async fn tick(&mut self) -> Result<(), QueueRunnerError> { tracing::debug!("Tick"); - let now = clock.now(); - let mut repo = state.repository().await?; + let now = self.clock.now(); + + let txn = self + .pool + .begin() + .await + .map_err(QueueRunnerError::StartTransaction)?; + let mut repo = PgRepository::from_conn(txn); // We send a heartbeat every minute, to avoid writing to the database too often // on a logged table - if now - last_heartbeat >= chrono::Duration::minutes(1) { + if now - self.last_heartbeat >= chrono::Duration::minutes(1) { tracing::info!("Sending heartbeat"); - repo.queue_worker().heartbeat(&clock, &worker).await?; - last_heartbeat = now; + repo.queue_worker() + .heartbeat(&self.clock, &self.registration) + .await?; + self.last_heartbeat = now; } // Remove any dead worker leader leases repo.queue_worker() - .remove_leader_lease_if_expired(&clock) + .remove_leader_lease_if_expired(&self.clock) .await?; // Try to become (or stay) the leader - let am_i_the_leader = repo + let leader = repo .queue_worker() - .try_get_leader_lease(&clock, &worker) + .try_get_leader_lease(&self.clock, &self.registration) .await?; - // Log any changes in leadership - if !was_i_the_leader && am_i_the_leader { - tracing::info!("I'm the leader now"); - } else if was_i_the_leader && !am_i_the_leader { - tracing::warn!("I am no longer the leader"); - } - was_i_the_leader = am_i_the_leader; + repo.into_inner() + .commit() + .await + .map_err(QueueRunnerError::CommitTransaction)?; - // The leader does all the maintenance work - if am_i_the_leader { - // We also check if the worker is dead, and if so, we shutdown all the dead - // workers that haven't checked in the last two minutes - repo.queue_worker() - .shutdown_dead_workers(&clock, Duration::minutes(2)) - .await?; + // Save the new leader state + self.set_new_leader_state(leader); + + Ok(()) + } + + #[tracing::instrument(name = "worker.perform_leader_duties", skip_all, err)] + async fn perform_leader_duties(&mut self) -> Result<(), QueueRunnerError> { + // This should have been checked by the caller, but better safe than sorry + if !self.am_i_leader { + return Err(QueueRunnerError::NotLeader); } - repo.save().await?; + let txn = self + .pool + .begin() + .await + .map_err(QueueRunnerError::StartTransaction)?; + let mut repo = PgRepository::from_conn(txn); + + // We also check if the worker is dead, and if so, we shutdown all the dead + // workers that haven't checked in the last two minutes + repo.queue_worker() + .shutdown_dead_workers(&self.clock, Duration::minutes(2)) + .await?; + + repo.into_inner() + .commit() + .await + .map_err(QueueRunnerError::CommitTransaction)?; + + Ok(()) } } From d3e51a126e02709cf05f8c426fc15a068ef5071d Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Tue, 8 Oct 2024 17:02:18 +0200 Subject: [PATCH 4/6] TEMP: use patched sqlx We would like to use the underlying connection from the PgListener, which was added in a patch, but not yet merged or released. --- Cargo.lock | 51 ++++++++++----------------------------------------- Cargo.toml | 7 +++++++ deny.toml | 2 +- 3 files changed, 18 insertions(+), 42 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0c7db2a09..a61e07919 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3084,7 +3084,6 @@ version = "0.30.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" dependencies = [ - "cc", "pkg-config", "vcpkg", ] @@ -5882,21 +5881,10 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b9b39299b249ad65f3b7e96443bad61c02ca5cd3589f46cb6d610a0fd6c0d6a" -[[package]] -name = "sqlformat" -version = "0.2.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7bba3a93db0cc4f7bdece8bb09e77e2e785c20bfebf79eb8340ed80708048790" -dependencies = [ - "nom", - "unicode_categories", -] - [[package]] name = "sqlx" version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93334716a037193fac19df402f8571269c84a00852f6a7066b5d2616dcd64d3e" +source = "git+https://github.com/launchbadge/sqlx.git?branch=main#42ce24dab87aad98f041cafb35cf9a7d5b2b09a7" dependencies = [ "sqlx-core", "sqlx-macros", @@ -5908,31 +5896,25 @@ dependencies = [ [[package]] name = "sqlx-core" version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4d8060b456358185f7d50c55d9b5066ad956956fddec42ee2e8567134a8936e" +source = "git+https://github.com/launchbadge/sqlx.git?branch=main#42ce24dab87aad98f041cafb35cf9a7d5b2b09a7" dependencies = [ - "atoi", - "byteorder", "bytes", "chrono", "crc", "crossbeam-queue", "either", "event-listener 5.3.1", - "futures-channel", "futures-core", "futures-intrusive", "futures-io", "futures-util", "hashbrown 0.14.5", "hashlink", - "hex", "indexmap 2.6.0", "ipnetwork", "log", "memchr", "once_cell", - "paste", "percent-encoding", "rustls", "rustls-pemfile", @@ -5940,8 +5922,7 @@ dependencies = [ "serde_json", "sha2", "smallvec", - "sqlformat", - "thiserror 1.0.69", + "thiserror 2.0.3", "tokio", "tokio-stream", "tracing", @@ -5953,8 +5934,7 @@ dependencies = [ [[package]] name = "sqlx-macros" version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cac0692bcc9de3b073e8d747391827297e075c7710ff6276d9f7a1f3d58c6657" +source = "git+https://github.com/launchbadge/sqlx.git?branch=main#42ce24dab87aad98f041cafb35cf9a7d5b2b09a7" dependencies = [ "proc-macro2", "quote", @@ -5966,8 +5946,7 @@ dependencies = [ [[package]] name = "sqlx-macros-core" version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1804e8a7c7865599c9c79be146dc8a9fd8cc86935fa641d3ea58e5f0688abaa5" +source = "git+https://github.com/launchbadge/sqlx.git?branch=main#42ce24dab87aad98f041cafb35cf9a7d5b2b09a7" dependencies = [ "dotenvy", "either", @@ -5992,8 +5971,7 @@ dependencies = [ [[package]] name = "sqlx-mysql" version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64bb4714269afa44aef2755150a0fc19d756fb580a67db8885608cf02f47d06a" +source = "git+https://github.com/launchbadge/sqlx.git?branch=main#42ce24dab87aad98f041cafb35cf9a7d5b2b09a7" dependencies = [ "atoi", "base64 0.22.1", @@ -6027,7 +6005,7 @@ dependencies = [ "smallvec", "sqlx-core", "stringprep", - "thiserror 1.0.69", + "thiserror 2.0.3", "tracing", "uuid", "whoami", @@ -6036,8 +6014,7 @@ dependencies = [ [[package]] name = "sqlx-postgres" version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6fa91a732d854c5d7726349bb4bb879bb9478993ceb764247660aee25f67c2f8" +source = "git+https://github.com/launchbadge/sqlx.git?branch=main#42ce24dab87aad98f041cafb35cf9a7d5b2b09a7" dependencies = [ "atoi", "base64 0.22.1", @@ -6049,7 +6026,6 @@ dependencies = [ "etcetera", "futures-channel", "futures-core", - "futures-io", "futures-util", "hex", "hkdf", @@ -6068,7 +6044,7 @@ dependencies = [ "smallvec", "sqlx-core", "stringprep", - "thiserror 1.0.69", + "thiserror 2.0.3", "tracing", "uuid", "whoami", @@ -6077,8 +6053,7 @@ dependencies = [ [[package]] name = "sqlx-sqlite" version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5b2cf34a45953bfd3daaf3db0f7a7878ab9b7a6b91b422d24a7a9e4c857b680" +source = "git+https://github.com/launchbadge/sqlx.git?branch=main#42ce24dab87aad98f041cafb35cf9a7d5b2b09a7" dependencies = [ "atoi", "chrono", @@ -6778,12 +6753,6 @@ version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" -[[package]] -name = "unicode_categories" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" - [[package]] name = "universal-hash" version = "0.5.1" diff --git a/Cargo.toml b/Cargo.toml index 9d2927037..8d14cafde 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -373,3 +373,10 @@ rayon.opt-level = 3 regalloc2.opt-level = 3 sha2.opt-level = 3 sqlx-macros.opt-level = 3 + +[patch.crates-io] +sqlx = { git = "https://github.com/launchbadge/sqlx.git", branch = "main" } +sqlx-core = { git = "https://github.com/launchbadge/sqlx.git", branch = "main" } +sqlx-macros = { git = "https://github.com/launchbadge/sqlx.git", branch = "main" } +sqlx-macros-core = { git = "https://github.com/launchbadge/sqlx.git", branch = "main" } +sqlx-postgres = { git = "https://github.com/launchbadge/sqlx.git", branch = "main" } diff --git a/deny.toml b/deny.toml index f1dde8f9a..571923c35 100644 --- a/deny.toml +++ b/deny.toml @@ -89,4 +89,4 @@ deny = ["oldtime"] unknown-registry = "warn" unknown-git = "warn" allow-registry = ["https://github.com/rust-lang/crates.io-index"] -allow-git = [] +allow-git = ["https://github.com/launchbadge/sqlx.git"] From f9c8ade312c9d46f950964ec1e64c22419b45e99 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Mon, 14 Oct 2024 10:54:55 +0200 Subject: [PATCH 5/6] Graceful shutdown --- Cargo.lock | 1 + crates/cli/src/commands/server.rs | 2 + crates/cli/src/commands/worker.rs | 39 +++- ...ab747a469404533f59ff6fbd56e9eb5ad38e1.json | 14 ++ ...dabe674ea853e0d47eb5c713705cb0130c758.json | 12 ++ crates/storage-pg/src/queue/worker.rs | 26 ++- crates/storage/src/queue/worker.rs | 4 +- crates/tasks/Cargo.toml | 7 +- crates/tasks/src/lib.rs | 9 +- crates/tasks/src/new_queue.rs | 180 +++++++++++++++--- 10 files changed, 250 insertions(+), 44 deletions(-) create mode 100644 crates/storage-pg/.sqlx/query-399e261027fe6c9167511636157ab747a469404533f59ff6fbd56e9eb5ad38e1.json create mode 100644 crates/storage-pg/.sqlx/query-6ecad60e565367a6cfa539b4c32dabe674ea853e0d47eb5c713705cb0130c758.json diff --git a/Cargo.lock b/Cargo.lock index a61e07919..d8d66a228 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3758,6 +3758,7 @@ dependencies = [ "sqlx", "thiserror 2.0.3", "tokio", + "tokio-util", "tower 0.5.1", "tracing", "tracing-opentelemetry", diff --git a/crates/cli/src/commands/server.rs b/crates/cli/src/commands/server.rs index 0558453d8..70834ccba 100644 --- a/crates/cli/src/commands/server.rs +++ b/crates/cli/src/commands/server.rs @@ -172,6 +172,8 @@ impl Options { &mailer, homeserver_connection.clone(), url_builder.clone(), + shutdown.soft_shutdown_token(), + shutdown.task_tracker(), ) .await?; diff --git a/crates/cli/src/commands/worker.rs b/crates/cli/src/commands/worker.rs index 3bbef12dd..c58605a1b 100644 --- a/crates/cli/src/commands/worker.rs +++ b/crates/cli/src/commands/worker.rs @@ -17,8 +17,12 @@ use rand::{ }; use tracing::{info, info_span}; -use crate::util::{ - database_pool_from_config, mailer_from_config, site_config_from_config, templates_from_config, +use crate::{ + shutdown::ShutdownManager, + util::{ + database_pool_from_config, mailer_from_config, site_config_from_config, + templates_from_config, + }, }; #[derive(Parser, Debug, Default)] @@ -26,6 +30,7 @@ pub(super) struct Options {} impl Options { pub async fn run(self, figment: &Figment) -> anyhow::Result { + let shutdown = ShutdownManager::new()?; let span = info_span!("cli.worker.init").entered(); let config = AppConfig::extract(figment)?; @@ -71,11 +76,35 @@ impl Options { let worker_name = Alphanumeric.sample_string(&mut rng, 10); info!(worker_name, "Starting task scheduler"); - let monitor = mas_tasks::init(&worker_name, &pool, &mailer, conn, url_builder).await?; - + let monitor = mas_tasks::init( + &worker_name, + &pool, + &mailer, + conn, + url_builder, + shutdown.soft_shutdown_token(), + shutdown.task_tracker(), + ) + .await?; + + // XXX: The monitor from apalis is a bit annoying to use for graceful shutdowns, + // ideally we'd just give it a cancellation token + let shutdown_future = shutdown.soft_shutdown_token().cancelled_owned(); + shutdown.task_tracker().spawn(async move { + if let Err(e) = monitor + .run_with_signal(async move { + shutdown_future.await; + Ok(()) + }) + .await + { + tracing::error!(error = &e as &dyn std::error::Error, "Task worker failed"); + } + }); span.exit(); - monitor.run().await?; + shutdown.run().await; + Ok(ExitCode::SUCCESS) } } diff --git a/crates/storage-pg/.sqlx/query-399e261027fe6c9167511636157ab747a469404533f59ff6fbd56e9eb5ad38e1.json b/crates/storage-pg/.sqlx/query-399e261027fe6c9167511636157ab747a469404533f59ff6fbd56e9eb5ad38e1.json new file mode 100644 index 000000000..f0a50a645 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-399e261027fe6c9167511636157ab747a469404533f59ff6fbd56e9eb5ad38e1.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM queue_leader\n WHERE queue_worker_id = $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "399e261027fe6c9167511636157ab747a469404533f59ff6fbd56e9eb5ad38e1" +} diff --git a/crates/storage-pg/.sqlx/query-6ecad60e565367a6cfa539b4c32dabe674ea853e0d47eb5c713705cb0130c758.json b/crates/storage-pg/.sqlx/query-6ecad60e565367a6cfa539b4c32dabe674ea853e0d47eb5c713705cb0130c758.json new file mode 100644 index 000000000..564800a24 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-6ecad60e565367a6cfa539b4c32dabe674ea853e0d47eb5c713705cb0130c758.json @@ -0,0 +1,12 @@ +{ + "db_name": "PostgreSQL", + "query": "\n NOTIFY queue_leader_stepdown\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [] + }, + "nullable": [] + }, + "hash": "6ecad60e565367a6cfa539b4c32dabe674ea853e0d47eb5c713705cb0130c758" +} diff --git a/crates/storage-pg/src/queue/worker.rs b/crates/storage-pg/src/queue/worker.rs index 5fa784cb1..5d3c566b2 100644 --- a/crates/storage-pg/src/queue/worker.rs +++ b/crates/storage-pg/src/queue/worker.rs @@ -109,7 +109,7 @@ impl QueueWorkerRepository for PgQueueWorkerRepository<'_> { ), err, )] - async fn shutdown(&mut self, clock: &dyn Clock, worker: Worker) -> Result<(), Self::Error> { + async fn shutdown(&mut self, clock: &dyn Clock, worker: &Worker) -> Result<(), Self::Error> { let now = clock.now(); let res = sqlx::query!( r#" @@ -126,6 +126,30 @@ impl QueueWorkerRepository for PgQueueWorkerRepository<'_> { DatabaseError::ensure_affected_rows(&res, 1)?; + // Remove the leader lease if we were holding it + let res = sqlx::query!( + r#" + DELETE FROM queue_leader + WHERE queue_worker_id = $1 + "#, + Uuid::from(worker.id), + ) + .traced() + .execute(&mut *self.conn) + .await?; + + // If we were holding the leader lease, notify workers + if res.rows_affected() > 0 { + sqlx::query!( + r#" + NOTIFY queue_leader_stepdown + "#, + ) + .traced() + .execute(&mut *self.conn) + .await?; + } + Ok(()) } diff --git a/crates/storage/src/queue/worker.rs b/crates/storage/src/queue/worker.rs index 19ceead88..4916ec1fd 100644 --- a/crates/storage/src/queue/worker.rs +++ b/crates/storage/src/queue/worker.rs @@ -51,7 +51,7 @@ pub trait QueueWorkerRepository: Send + Sync { /// # Errors /// /// Returns an error if the underlying repository fails. - async fn shutdown(&mut self, clock: &dyn Clock, worker: Worker) -> Result<(), Self::Error>; + async fn shutdown(&mut self, clock: &dyn Clock, worker: &Worker) -> Result<(), Self::Error>; /// Find dead workers and shut them down. /// @@ -105,7 +105,7 @@ repository_impl!(QueueWorkerRepository: async fn shutdown( &mut self, clock: &dyn Clock, - worker: Worker, + worker: &Worker, ) -> Result<(), Self::Error>; async fn shutdown_dead_workers( diff --git a/crates/tasks/Cargo.toml b/crates/tasks/Cargo.toml index 8aff0da36..763b0f5fc 100644 --- a/crates/tasks/Cargo.toml +++ b/crates/tasks/Cargo.toml @@ -13,7 +13,11 @@ workspace = true [dependencies] anyhow.workspace = true -apalis-core = { version = "0.4.9", features = ["extensions", "tokio-comp", "storage"] } +apalis-core = { version = "0.4.9", features = [ + "extensions", + "tokio-comp", + "storage", +] } apalis-cron = "0.4.9" async-stream = "0.3.6" async-trait.workspace = true @@ -25,6 +29,7 @@ rand_chacha = "0.3.1" sqlx.workspace = true thiserror.workspace = true tokio.workspace = true +tokio-util.workspace = true tower.workspace = true tracing.workspace = true tracing-opentelemetry.workspace = true diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index 0db6b6b81..ac85eba14 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -15,6 +15,7 @@ use mas_storage_pg::PgRepository; use new_queue::QueueRunnerError; use rand::SeedableRng; use sqlx::{Pool, Postgres}; +use tokio_util::{sync::CancellationToken, task::TaskTracker}; use tracing::debug; use crate::storage::PostgresStorageFactory; @@ -143,6 +144,8 @@ pub async fn init( mailer: &Mailer, homeserver: impl HomeserverConnection + 'static, url_builder: UrlBuilder, + cancellation_token: CancellationToken, + task_tracker: &TaskTracker, ) -> Result, QueueRunnerError> { let state = State::new( pool.clone(), @@ -166,11 +169,9 @@ pub async fn init( .map_err(QueueRunnerError::SetupListener)?; debug!(?monitor, "workers registered"); - let mut worker = self::new_queue::QueueWorker::new(state).await?; + let mut worker = self::new_queue::QueueWorker::new(state, cancellation_token).await?; - // TODO: this is just spawning the task in the background, we probably actually - // want to wrap that in a structure, and handle graceful shutdown correctly - tokio::spawn(async move { + task_tracker.spawn(async move { if let Err(e) = worker.run().await { tracing::error!( error = &e as &dyn std::error::Error, diff --git a/crates/tasks/src/new_queue.rs b/crates/tasks/src/new_queue.rs index c3596c0e9..571f8591b 100644 --- a/crates/tasks/src/new_queue.rs +++ b/crates/tasks/src/new_queue.rs @@ -8,8 +8,12 @@ use mas_storage::{queue::Worker, Clock, RepositoryAccess, RepositoryError}; use mas_storage_pg::{DatabaseError, PgRepository}; use rand::{distributions::Uniform, Rng}; use rand_chacha::ChaChaRng; -use sqlx::PgPool; +use sqlx::{ + postgres::{PgAdvisoryLock, PgListener}, + Acquire, Either, +}; use thiserror::Error; +use tokio_util::sync::CancellationToken; use crate::State; @@ -24,6 +28,9 @@ pub enum QueueRunnerError { #[error("Failed to commit transaction")] CommitTransaction(#[source] sqlx::Error), + #[error("Failed to acquire leader lock")] + LeaderLock(#[source] sqlx::Error), + #[error(transparent)] Repository(#[from] RepositoryError), @@ -34,16 +41,21 @@ pub enum QueueRunnerError { NotLeader, } +// When the worker waits for a notification, we still want to wake it up every +// second. Because we don't want all the workers to wake up at the same time, we +// add a random jitter to the sleep duration, so they effectively sleep between +// 0.9 and 1.1 seconds. const MIN_SLEEP_DURATION: std::time::Duration = std::time::Duration::from_millis(900); const MAX_SLEEP_DURATION: std::time::Duration = std::time::Duration::from_millis(1100); pub struct QueueWorker { rng: ChaChaRng, clock: Box, - pool: PgPool, + listener: PgListener, registration: Worker, am_i_leader: bool, last_heartbeat: DateTime, + cancellation_token: CancellationToken, } impl QueueWorker { @@ -52,12 +64,24 @@ impl QueueWorker { skip_all, fields(worker.id) )] - pub async fn new(state: State) -> Result { + pub async fn new( + state: State, + cancellation_token: CancellationToken, + ) -> Result { let mut rng = state.rng(); let clock = state.clock(); - let pool = state.pool().clone(); - let txn = pool + let mut listener = PgListener::connect_with(state.pool()) + .await + .map_err(QueueRunnerError::SetupListener)?; + + // We get notifications of leader stepping down on this channel + listener + .listen("queue_leader_stepdown") + .await + .map_err(QueueRunnerError::SetupListener)?; + + let txn = listener .begin() .await .map_err(QueueRunnerError::StartTransaction)?; @@ -76,22 +100,32 @@ impl QueueWorker { Ok(Self { rng, clock, - pool, + listener, registration, am_i_leader: false, last_heartbeat: now, + cancellation_token, }) } pub async fn run(&mut self) -> Result<(), QueueRunnerError> { - loop { + while !self.cancellation_token.is_cancelled() { self.run_loop().await?; } + + self.shutdown().await?; + + Ok(()) } #[tracing::instrument(name = "worker.run_loop", skip_all, err)] async fn run_loop(&mut self) -> Result<(), QueueRunnerError> { self.wait_until_wakeup().await?; + + if self.cancellation_token.is_cancelled() { + return Ok(()); + } + self.tick().await?; if self.am_i_leader { @@ -101,6 +135,33 @@ impl QueueWorker { Ok(()) } + #[tracing::instrument(name = "worker.shutdown", skip_all, err)] + async fn shutdown(&mut self) -> Result<(), QueueRunnerError> { + tracing::info!("Shutting down worker"); + + // Start a transaction on the existing PgListener connection + let txn = self + .listener + .begin() + .await + .map_err(QueueRunnerError::StartTransaction)?; + + let mut repo = PgRepository::from_conn(txn); + + // Tell the other workers we're shutting down + // This also releases the leader election lease + repo.queue_worker() + .shutdown(&self.clock, &self.registration) + .await?; + + repo.into_inner() + .commit() + .await + .map_err(QueueRunnerError::CommitTransaction)?; + + Ok(()) + } + #[tracing::instrument(name = "worker.wait_until_wakeup", skip_all, err)] async fn wait_until_wakeup(&mut self) -> Result<(), QueueRunnerError> { // This is to make sure we wake up every second to do the maintenance tasks @@ -109,25 +170,34 @@ impl QueueWorker { let sleep_duration = self .rng .sample(Uniform::new(MIN_SLEEP_DURATION, MAX_SLEEP_DURATION)); - tokio::time::sleep(sleep_duration).await; - tracing::debug!("Woke up from sleep"); - - Ok(()) - } - - fn set_new_leader_state(&mut self, state: bool) { - // Do nothing if we were already on that state - if state == self.am_i_leader { - return; + let wakeup_sleep = tokio::time::sleep(sleep_duration); + + tokio::select! { + () = self.cancellation_token.cancelled() => { + tracing::debug!("Woke up from cancellation"); + }, + + () = wakeup_sleep => { + tracing::debug!("Woke up from sleep"); + }, + + notification = self.listener.recv() => { + match notification { + Ok(notification) => { + tracing::debug!( + notification.channel = notification.channel(), + notification.payload = notification.payload(), + "Woke up from notification" + ); + }, + Err(e) => { + tracing::error!(error = &e as &dyn std::error::Error, "Failed to receive notification"); + }, + } + }, } - // If we flipped state, log it - self.am_i_leader = state; - if self.am_i_leader { - tracing::info!("I'm the leader now"); - } else { - tracing::warn!("I am no longer the leader"); - } + Ok(()) } #[tracing::instrument( @@ -140,8 +210,9 @@ impl QueueWorker { tracing::debug!("Tick"); let now = self.clock.now(); + // Start a transaction on the existing PgListener connection let txn = self - .pool + .listener .begin() .await .map_err(QueueRunnerError::StartTransaction)?; @@ -168,13 +239,23 @@ impl QueueWorker { .try_get_leader_lease(&self.clock, &self.registration) .await?; + // After this point, we are locking the leader table, so it's important that we + // commit as soon as possible to not block the other workers for too long repo.into_inner() .commit() .await .map_err(QueueRunnerError::CommitTransaction)?; - // Save the new leader state - self.set_new_leader_state(leader); + // Save the new leader state to log any change + if leader != self.am_i_leader { + // If we flipped state, log it + self.am_i_leader = leader; + if self.am_i_leader { + tracing::info!("I'm the leader now"); + } else { + tracing::warn!("I am no longer the leader"); + } + } Ok(()) } @@ -186,12 +267,43 @@ impl QueueWorker { return Err(QueueRunnerError::NotLeader); } + // Start a transaction on the existing PgListener connection let txn = self - .pool + .listener .begin() .await .map_err(QueueRunnerError::StartTransaction)?; - let mut repo = PgRepository::from_conn(txn); + + // The thing with the leader election is that it locks the table during the + // election, preventing other workers from going through the loop. + // + // Ideally, we would do the leader duties in the same transaction so that we + // make sure only one worker is doing the leader duties, but that + // would mean we would lock all the workers for the duration of the + // duties, which is not ideal. + // + // So we do the duties in a separate transaction, in which we take an advisory + // lock, so that in the very rare case where two workers think they are the + // leader, we still don't have two workers doing the duties at the same time. + let lock = PgAdvisoryLock::new("leader-duties"); + + let locked = lock + .try_acquire(txn) + .await + .map_err(QueueRunnerError::LeaderLock)?; + + let locked = match locked { + Either::Left(locked) => locked, + Either::Right(txn) => { + tracing::error!("Another worker has the leader lock, aborting"); + txn.rollback() + .await + .map_err(QueueRunnerError::CommitTransaction)?; + return Ok(()); + } + }; + + let mut repo = PgRepository::from_conn(locked); // We also check if the worker is dead, and if so, we shutdown all the dead // workers that haven't checked in the last two minutes @@ -199,8 +311,14 @@ impl QueueWorker { .shutdown_dead_workers(&self.clock, Duration::minutes(2)) .await?; - repo.into_inner() - .commit() + // Release the leader lock + let txn = repo + .into_inner() + .release_now() + .await + .map_err(QueueRunnerError::LeaderLock)?; + + txn.commit() .await .map_err(QueueRunnerError::CommitTransaction)?; From 2692d9a28f9d8aa38090720b332f35eb08804da0 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Tue, 19 Nov 2024 17:23:55 +0100 Subject: [PATCH 6/6] Use the database time for leader election --- ...4d55cbfb01492985ac2af5a1ad4af9b3ccc77.json | 15 ++++++++++++++ ...d356a4ed86fd33400066e422545ffc55f9aa9.json | 16 --------------- ...f3005b55c654897a8e46dc933c7fd2263c7c.json} | 8 +++----- crates/storage-pg/src/queue/worker.rs | 20 +++++++++++-------- 4 files changed, 30 insertions(+), 29 deletions(-) create mode 100644 crates/storage-pg/.sqlx/query-67cd4880d84b38f20c3960789934d55cbfb01492985ac2af5a1ad4af9b3ccc77.json delete mode 100644 crates/storage-pg/.sqlx/query-8defee03b9ed60c2b8cc6478e34d356a4ed86fd33400066e422545ffc55f9aa9.json rename crates/storage-pg/.sqlx/{query-ed8bcbcd4b7f93f654670cf077f84163ae08e16a8e07c6ecbca4fd8cb10da8a5.json => query-fe7bd146523e4bb321cb234d6bf9f3005b55c654897a8e46dc933c7fd2263c7c.json} (51%) diff --git a/crates/storage-pg/.sqlx/query-67cd4880d84b38f20c3960789934d55cbfb01492985ac2af5a1ad4af9b3ccc77.json b/crates/storage-pg/.sqlx/query-67cd4880d84b38f20c3960789934d55cbfb01492985ac2af5a1ad4af9b3ccc77.json new file mode 100644 index 000000000..1d739df31 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-67cd4880d84b38f20c3960789934d55cbfb01492985ac2af5a1ad4af9b3ccc77.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO queue_leader (elected_at, expires_at, queue_worker_id)\n VALUES ($1, NOW() + INTERVAL '5 seconds', $2)\n ON CONFLICT (active)\n DO UPDATE SET expires_at = EXCLUDED.expires_at\n WHERE queue_leader.queue_worker_id = $2\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Timestamptz", + "Uuid" + ] + }, + "nullable": [] + }, + "hash": "67cd4880d84b38f20c3960789934d55cbfb01492985ac2af5a1ad4af9b3ccc77" +} diff --git a/crates/storage-pg/.sqlx/query-8defee03b9ed60c2b8cc6478e34d356a4ed86fd33400066e422545ffc55f9aa9.json b/crates/storage-pg/.sqlx/query-8defee03b9ed60c2b8cc6478e34d356a4ed86fd33400066e422545ffc55f9aa9.json deleted file mode 100644 index 9195a9d4d..000000000 --- a/crates/storage-pg/.sqlx/query-8defee03b9ed60c2b8cc6478e34d356a4ed86fd33400066e422545ffc55f9aa9.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO queue_leader (elected_at, expires_at, queue_worker_id)\n VALUES ($1, $2, $3)\n ON CONFLICT (active)\n DO UPDATE SET expires_at = EXCLUDED.expires_at\n WHERE queue_leader.queue_worker_id = $3\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Timestamptz", - "Timestamptz", - "Uuid" - ] - }, - "nullable": [] - }, - "hash": "8defee03b9ed60c2b8cc6478e34d356a4ed86fd33400066e422545ffc55f9aa9" -} diff --git a/crates/storage-pg/.sqlx/query-ed8bcbcd4b7f93f654670cf077f84163ae08e16a8e07c6ecbca4fd8cb10da8a5.json b/crates/storage-pg/.sqlx/query-fe7bd146523e4bb321cb234d6bf9f3005b55c654897a8e46dc933c7fd2263c7c.json similarity index 51% rename from crates/storage-pg/.sqlx/query-ed8bcbcd4b7f93f654670cf077f84163ae08e16a8e07c6ecbca4fd8cb10da8a5.json rename to crates/storage-pg/.sqlx/query-fe7bd146523e4bb321cb234d6bf9f3005b55c654897a8e46dc933c7fd2263c7c.json index af6213a8a..9cfabe8ed 100644 --- a/crates/storage-pg/.sqlx/query-ed8bcbcd4b7f93f654670cf077f84163ae08e16a8e07c6ecbca4fd8cb10da8a5.json +++ b/crates/storage-pg/.sqlx/query-fe7bd146523e4bb321cb234d6bf9f3005b55c654897a8e46dc933c7fd2263c7c.json @@ -1,14 +1,12 @@ { "db_name": "PostgreSQL", - "query": "\n DELETE FROM queue_leader\n WHERE expires_at < $1\n ", + "query": "\n DELETE FROM queue_leader\n WHERE expires_at < NOW()\n ", "describe": { "columns": [], "parameters": { - "Left": [ - "Timestamptz" - ] + "Left": [] }, "nullable": [] }, - "hash": "ed8bcbcd4b7f93f654670cf077f84163ae08e16a8e07c6ecbca4fd8cb10da8a5" + "hash": "fe7bd146523e4bb321cb234d6bf9f3005b55c654897a8e46dc933c7fd2263c7c" } diff --git a/crates/storage-pg/src/queue/worker.rs b/crates/storage-pg/src/queue/worker.rs index 5d3c566b2..61b96b5fd 100644 --- a/crates/storage-pg/src/queue/worker.rs +++ b/crates/storage-pg/src/queue/worker.rs @@ -166,6 +166,9 @@ impl QueueWorkerRepository for PgQueueWorkerRepository<'_> { clock: &dyn Clock, threshold: Duration, ) -> Result<(), Self::Error> { + // Here the threshold is usually set to a few minutes, so we don't need to use + // the database time, as we can assume worker clocks have less than a minute + // skew between each other, else other things would break let now = clock.now(); sqlx::query!( r#" @@ -194,15 +197,15 @@ impl QueueWorkerRepository for PgQueueWorkerRepository<'_> { )] async fn remove_leader_lease_if_expired( &mut self, - clock: &dyn Clock, + _clock: &dyn Clock, ) -> Result<(), Self::Error> { - let now = clock.now(); + // `expires_at` is a rare exception where we use the database time, as this + // would be very sensitive to clock skew between workers sqlx::query!( r#" DELETE FROM queue_leader - WHERE expires_at < $1 + WHERE expires_at < NOW() "#, - now, ) .traced() .execute(&mut *self.conn) @@ -226,22 +229,23 @@ impl QueueWorkerRepository for PgQueueWorkerRepository<'_> { worker: &Worker, ) -> Result { let now = clock.now(); - let ttl = Duration::seconds(5); // The queue_leader table is meant to only have a single row, which conflicts on // the `active` column // If there is a conflict, we update the `expires_at` column ONLY IF the current // leader is ourselves. + + // `expires_at` is a rare exception where we use the database time, as this + // would be very sensitive to clock skew between workers let res = sqlx::query!( r#" INSERT INTO queue_leader (elected_at, expires_at, queue_worker_id) - VALUES ($1, $2, $3) + VALUES ($1, NOW() + INTERVAL '5 seconds', $2) ON CONFLICT (active) DO UPDATE SET expires_at = EXCLUDED.expires_at - WHERE queue_leader.queue_worker_id = $3 + WHERE queue_leader.queue_worker_id = $2 "#, now, - now + ttl, Uuid::from(worker.id) ) .traced()