From 4227341857cacadcc1ea590fb0cff64a06fbc55a Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Wed, 9 Jul 2025 17:17:01 +0200 Subject: [PATCH 1/7] Make the task State::clock() return a &dyn Clock instead of a BoxClock --- crates/storage/src/clock.rs | 2 +- crates/tasks/src/database.rs | 2 +- crates/tasks/src/email.rs | 2 +- crates/tasks/src/lib.rs | 12 ++++++------ crates/tasks/src/matrix.rs | 6 +++--- crates/tasks/src/new_queue.rs | 2 +- crates/tasks/src/recovery.rs | 2 +- crates/tasks/src/sessions.rs | 22 +++++++++++----------- crates/tasks/src/user.rs | 10 +++++----- 9 files changed, 30 insertions(+), 30 deletions(-) diff --git a/crates/storage/src/clock.rs b/crates/storage/src/clock.rs index df32114bb..bf31835f0 100644 --- a/crates/storage/src/clock.rs +++ b/crates/storage/src/clock.rs @@ -15,7 +15,7 @@ use std::sync::{Arc, atomic::AtomicI64}; use chrono::{DateTime, TimeZone, Utc}; /// Represents a clock which can give the current date and time -pub trait Clock: Sync { +pub trait Clock: Send + Sync { /// Get the current date and time fn now(&self) -> DateTime; } diff --git a/crates/tasks/src/database.rs b/crates/tasks/src/database.rs index a659eb265..bc14215f8 100644 --- a/crates/tasks/src/database.rs +++ b/crates/tasks/src/database.rs @@ -24,7 +24,7 @@ impl RunnableJob for CleanupExpiredTokensJob { let count = repo .oauth2_access_token() - .cleanup_revoked(&clock) + .cleanup_revoked(clock) .await .map_err(JobError::retry)?; repo.save().await.map_err(JobError::retry)?; diff --git a/crates/tasks/src/email.rs b/crates/tasks/src/email.rs index 99170d2eb..8e685843a 100644 --- a/crates/tasks/src/email.rs +++ b/crates/tasks/src/email.rs @@ -100,7 +100,7 @@ impl RunnableJob for SendEmailAuthenticationCodeJob { .user_email() .add_authentication_code( &mut rng, - &clock, + clock, Duration::minutes(5), // TODO: make this configurable &user_email_authentication, code, diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index 58737574b..796f83396 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -10,7 +10,7 @@ use mas_data_model::SiteConfig; use mas_email::Mailer; use mas_matrix::HomeserverConnection; use mas_router::UrlBuilder; -use mas_storage::{BoxClock, BoxRepository, RepositoryError, RepositoryFactory, SystemClock}; +use mas_storage::{BoxRepository, Clock, RepositoryError, RepositoryFactory, SystemClock}; use mas_storage_pg::PgRepositoryFactory; use new_queue::QueueRunnerError; use opentelemetry::metrics::Meter; @@ -39,7 +39,7 @@ static METER: LazyLock = LazyLock::new(|| { struct State { repository_factory: PgRepositoryFactory, mailer: Mailer, - clock: SystemClock, + clock: Arc, homeserver: Arc, url_builder: UrlBuilder, site_config: SiteConfig, @@ -48,7 +48,7 @@ struct State { impl State { pub fn new( repository_factory: PgRepositoryFactory, - clock: SystemClock, + clock: impl Clock + 'static, mailer: Mailer, homeserver: impl HomeserverConnection + 'static, url_builder: UrlBuilder, @@ -57,7 +57,7 @@ impl State { Self { repository_factory, mailer, - clock, + clock: Arc::new(clock), homeserver: Arc::new(homeserver), url_builder, site_config, @@ -68,8 +68,8 @@ impl State { self.repository_factory.pool() } - pub fn clock(&self) -> BoxClock { - Box::new(self.clock.clone()) + pub fn clock(&self) -> &dyn Clock { + &self.clock } pub fn mailer(&self) -> &Mailer { diff --git a/crates/tasks/src/matrix.rs b/crates/tasks/src/matrix.rs index 40e38899c..92e15f448 100644 --- a/crates/tasks/src/matrix.rs +++ b/crates/tasks/src/matrix.rs @@ -80,7 +80,7 @@ impl RunnableJob for ProvisionUserJob { // Schedule a device sync job let sync_device_job = SyncDevicesJob::new(&user); repo.queue_job() - .schedule_job(&mut rng, &clock, sync_device_job) + .schedule_job(&mut rng, clock, sync_device_job) .await .map_err(JobError::retry)?; @@ -118,7 +118,7 @@ impl RunnableJob for ProvisionDeviceJob { // Schedule a device sync job repo.queue_job() - .schedule_job(&mut rng, &clock, SyncDevicesJob::new(&user)) + .schedule_job(&mut rng, clock, SyncDevicesJob::new(&user)) .await .map_err(JobError::retry)?; @@ -154,7 +154,7 @@ impl RunnableJob for DeleteDeviceJob { // Schedule a device sync job repo.queue_job() - .schedule_job(&mut rng, &clock, SyncDevicesJob::new(&user)) + .schedule_job(&mut rng, clock, SyncDevicesJob::new(&user)) .await .map_err(JobError::retry)?; diff --git a/crates/tasks/src/new_queue.rs b/crates/tasks/src/new_queue.rs index dcb578011..777844e2f 100644 --- a/crates/tasks/src/new_queue.rs +++ b/crates/tasks/src/new_queue.rs @@ -246,7 +246,7 @@ impl QueueWorker { .map_err(QueueRunnerError::StartTransaction)?; let mut repo = PgRepository::from_conn(txn); - let registration = repo.queue_worker().register(&mut rng, &clock).await?; + let registration = repo.queue_worker().register(&mut rng, clock).await?; tracing::Span::current().record("worker.id", tracing::field::display(registration.id)); repo.into_inner() .commit() diff --git a/crates/tasks/src/recovery.rs b/crates/tasks/src/recovery.rs index ab51e420b..03e02d57b 100644 --- a/crates/tasks/src/recovery.rs +++ b/crates/tasks/src/recovery.rs @@ -75,7 +75,7 @@ impl RunnableJob for SendAccountRecoveryEmailsJob { let ticket = repo .user_recovery() - .add_ticket(&mut rng, &clock, &session, &email, ticket) + .add_ticket(&mut rng, clock, &session, &email, ticket) .await .map_err(JobError::retry)?; diff --git a/crates/tasks/src/sessions.rs b/crates/tasks/src/sessions.rs index f457fe04f..d10d908da 100644 --- a/crates/tasks/src/sessions.rs +++ b/crates/tasks/src/sessions.rs @@ -39,7 +39,7 @@ impl RunnableJob for ExpireInactiveSessionsJob { repo.queue_job() .schedule_job( &mut rng, - &clock, + clock, ExpireInactiveOAuthSessionsJob::new(now - ttl), ) .await @@ -50,7 +50,7 @@ impl RunnableJob for ExpireInactiveSessionsJob { repo.queue_job() .schedule_job( &mut rng, - &clock, + clock, ExpireInactiveCompatSessionsJob::new(now - ttl), ) .await @@ -61,7 +61,7 @@ impl RunnableJob for ExpireInactiveSessionsJob { repo.queue_job() .schedule_job( &mut rng, - &clock, + clock, ExpireInactiveUserSessionsJob::new(now - ttl), ) .await @@ -104,7 +104,7 @@ impl RunnableJob for ExpireInactiveOAuthSessionsJob { if let Some(job) = self.next(&page) { tracing::info!("Scheduling job to expire the next batch of inactive sessions"); repo.queue_job() - .schedule_job(&mut rng, &clock, job) + .schedule_job(&mut rng, clock, job) .await .map_err(JobError::retry)?; } @@ -117,7 +117,7 @@ impl RunnableJob for ExpireInactiveOAuthSessionsJob { repo.queue_job() .schedule_job_later( &mut rng, - &clock, + clock, SyncDevicesJob::new_for_id(user_id), clock.now() + delay, ) @@ -128,7 +128,7 @@ impl RunnableJob for ExpireInactiveOAuthSessionsJob { } repo.oauth2_session() - .finish(&clock, edge) + .finish(clock, edge) .await .map_err(JobError::retry)?; } @@ -168,7 +168,7 @@ impl RunnableJob for ExpireInactiveCompatSessionsJob { if let Some(job) = self.next(&page) { tracing::info!("Scheduling job to expire the next batch of inactive sessions"); repo.queue_job() - .schedule_job(&mut rng, &clock, job) + .schedule_job(&mut rng, clock, job) .await .map_err(JobError::retry)?; } @@ -180,7 +180,7 @@ impl RunnableJob for ExpireInactiveCompatSessionsJob { repo.queue_job() .schedule_job_later( &mut rng, - &clock, + clock, SyncDevicesJob::new_for_id(edge.user_id), clock.now() + delay, ) @@ -190,7 +190,7 @@ impl RunnableJob for ExpireInactiveCompatSessionsJob { } repo.compat_session() - .finish(&clock, edge) + .finish(clock, edge) .await .map_err(JobError::retry)?; } @@ -223,14 +223,14 @@ impl RunnableJob for ExpireInactiveUserSessionsJob { if let Some(job) = self.next(&page) { tracing::info!("Scheduling job to expire the next batch of inactive sessions"); repo.queue_job() - .schedule_job(&mut rng, &clock, job) + .schedule_job(&mut rng, clock, job) .await .map_err(JobError::retry)?; } for edge in page.edges { repo.browser_session() - .finish(&clock, edge) + .finish(clock, edge) .await .map_err(JobError::retry)?; } diff --git a/crates/tasks/src/user.rs b/crates/tasks/src/user.rs index 9c9db9097..b5f64dd42 100644 --- a/crates/tasks/src/user.rs +++ b/crates/tasks/src/user.rs @@ -44,14 +44,14 @@ impl RunnableJob for DeactivateUserJob { // Let's first lock & deactivate the user let user = repo .user() - .lock(&clock, user) + .lock(clock, user) .await .context("Failed to lock user") .map_err(JobError::retry)?; let user = repo .user() - .deactivate(&clock, user) + .deactivate(clock, user) .await .context("Failed to deactivate user") .map_err(JobError::retry)?; @@ -60,7 +60,7 @@ impl RunnableJob for DeactivateUserJob { let n = repo .browser_session() .finish_bulk( - &clock, + clock, BrowserSessionFilter::new().for_user(&user).active_only(), ) .await @@ -70,7 +70,7 @@ impl RunnableJob for DeactivateUserJob { let n = repo .oauth2_session() .finish_bulk( - &clock, + clock, OAuth2SessionFilter::new().for_user(&user).active_only(), ) .await @@ -80,7 +80,7 @@ impl RunnableJob for DeactivateUserJob { let n = repo .compat_session() .finish_bulk( - &clock, + clock, CompatSessionFilter::new().for_user(&user).active_only(), ) .await From f69855e21edeca4dd5f856439c2d68b7d75b8dd2 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Wed, 9 Jul 2025 17:22:13 +0200 Subject: [PATCH 2/7] Remove the duplicate clock and rng in QueueWorker Use the ones in the inner State instead --- crates/tasks/src/new_queue.rs | 48 +++++++++++++++++------------------ 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/crates/tasks/src/new_queue.rs b/crates/tasks/src/new_queue.rs index 777844e2f..1ede913a7 100644 --- a/crates/tasks/src/new_queue.rs +++ b/crates/tasks/src/new_queue.rs @@ -19,7 +19,6 @@ use opentelemetry::{ metrics::{Counter, Histogram, UpDownCounter}, }; use rand::{Rng, RngCore, distributions::Uniform}; -use rand_chacha::ChaChaRng; use serde::de::DeserializeOwned; use sqlx::{ Acquire, Either, @@ -195,8 +194,6 @@ struct ScheduleDefinition { } pub struct QueueWorker { - rng: ChaChaRng, - clock: Box, listener: PgListener, registration: Worker, am_i_leader: bool, @@ -278,8 +275,6 @@ impl QueueWorker { let cancellation_guard = cancellation_token.clone().drop_guard(); Ok(Self { - rng, - clock, listener, registration, am_i_leader: false, @@ -397,6 +392,9 @@ impl QueueWorker { async fn shutdown(&mut self) -> Result<(), QueueRunnerError> { tracing::info!("Shutting down worker"); + let clock = self.state.clock(); + let mut rng = self.state.rng(); + // Start a transaction on the existing PgListener connection let txn = self .listener @@ -421,13 +419,13 @@ impl QueueWorker { // Wait for all the jobs to finish self.tracker - .process_jobs(&mut self.rng, &self.clock, &mut repo, true) + .process_jobs(&mut rng, clock, &mut repo, true) .await?; // Tell the other workers we're shutting down // This also releases the leader election lease repo.queue_worker() - .shutdown(&self.clock, &self.registration) + .shutdown(clock, &self.registration) .await?; repo.into_inner() @@ -440,12 +438,12 @@ impl QueueWorker { #[tracing::instrument(name = "worker.wait_until_wakeup", skip_all)] async fn wait_until_wakeup(&mut self) -> Result<(), QueueRunnerError> { + let mut rng = self.state.rng(); + // This is to make sure we wake up every second to do the maintenance tasks // We add a little bit of random jitter to the duration, so that we don't get // fully synced workers waking up at the same time after each notification - let sleep_duration = self - .rng - .sample(Uniform::new(MIN_SLEEP_DURATION, MAX_SLEEP_DURATION)); + let sleep_duration = rng.sample(Uniform::new(MIN_SLEEP_DURATION, MAX_SLEEP_DURATION)); let wakeup_sleep = tokio::time::sleep(sleep_duration); tokio::select! { @@ -490,7 +488,9 @@ impl QueueWorker { )] async fn tick(&mut self) -> Result<(), QueueRunnerError> { tracing::debug!("Tick"); - let now = self.clock.now(); + let clock = self.state.clock(); + let mut rng = self.state.rng(); + let now = clock.now(); // Start a transaction on the existing PgListener connection let txn = self @@ -505,25 +505,25 @@ impl QueueWorker { if now - self.last_heartbeat >= chrono::Duration::minutes(1) { tracing::info!("Sending heartbeat"); repo.queue_worker() - .heartbeat(&self.clock, &self.registration) + .heartbeat(clock, &self.registration) .await?; self.last_heartbeat = now; } // Remove any dead worker leader leases repo.queue_worker() - .remove_leader_lease_if_expired(&self.clock) + .remove_leader_lease_if_expired(clock) .await?; // Try to become (or stay) the leader let leader = repo .queue_worker() - .try_get_leader_lease(&self.clock, &self.registration) + .try_get_leader_lease(clock, &self.registration) .await?; // Process any job task which finished self.tracker - .process_jobs(&mut self.rng, &self.clock, &mut repo, false) + .process_jobs(&mut rng, clock, &mut repo, false) .await?; // Compute how many jobs we should fetch at most @@ -538,7 +538,7 @@ impl QueueWorker { let queues = self.tracker.queues(); let jobs = repo .queue_job() - .reserve(&self.clock, &self.registration, &queues, max_jobs_to_fetch) + .reserve(clock, &self.registration, &queues, max_jobs_to_fetch) .await?; for Job { @@ -592,6 +592,9 @@ impl QueueWorker { return Err(QueueRunnerError::NotLeader); } + let clock = self.state.clock(); + let mut rng = self.state.rng(); + // Start a transaction on the existing PgListener connection let txn = self .listener @@ -633,7 +636,7 @@ impl QueueWorker { // Look at the state of schedules in the database let schedules_status = repo.queue_schedule().list().await?; - let now = self.clock.now(); + let now = clock.now(); for schedule in &self.schedules { // Find the schedule status from the database let Some(schedule_status) = schedules_status @@ -670,8 +673,8 @@ impl QueueWorker { repo.queue_job() .schedule_later( - &mut self.rng, - &self.clock, + &mut rng, + clock, schedule.queue_name, schedule.payload.clone(), serde_json::json!({}), @@ -684,16 +687,13 @@ impl QueueWorker { // We also check if the worker is dead, and if so, we shutdown all the dead // workers that haven't checked in the last two minutes repo.queue_worker() - .shutdown_dead_workers(&self.clock, Duration::minutes(2)) + .shutdown_dead_workers(clock, Duration::minutes(2)) .await?; // TODO: mark tasks those workers had as lost // Mark all the scheduled jobs as available - let scheduled = repo - .queue_job() - .schedule_available_jobs(&self.clock) - .await?; + let scheduled = repo.queue_job().schedule_available_jobs(clock).await?; match scheduled { 0 => {} 1 => tracing::info!("One scheduled job marked as available"), From 45f15e15d0be95f22b6d89a901bffa51b5a06727 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Wed, 9 Jul 2025 17:24:04 +0200 Subject: [PATCH 3/7] Allow setting a custom clock on the QueueWorker & add one to the TestState --- Cargo.lock | 2 ++ crates/cli/src/commands/server.rs | 3 ++- crates/cli/src/commands/worker.rs | 4 ++- crates/handlers/Cargo.toml | 2 ++ crates/handlers/src/test_utils.rs | 25 +++++++++++++++++ crates/tasks/src/lib.rs | 45 ++++++++++++++++++++++++++----- crates/tasks/src/new_queue.rs | 10 +++---- 7 files changed, 78 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e66f2423b..7e01f113d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3341,6 +3341,7 @@ dependencies = [ "mas-config", "mas-context", "mas-data-model", + "mas-email", "mas-http", "mas-i18n", "mas-iana", @@ -3352,6 +3353,7 @@ dependencies = [ "mas-router", "mas-storage", "mas-storage-pg", + "mas-tasks", "mas-templates", "mime", "minijinja", diff --git a/crates/cli/src/commands/server.rs b/crates/cli/src/commands/server.rs index bce10710b..c42e99da4 100644 --- a/crates/cli/src/commands/server.rs +++ b/crates/cli/src/commands/server.rs @@ -173,8 +173,9 @@ impl Options { test_mailer_in_background(&mailer, Duration::from_secs(30)); info!("Starting task worker"); - mas_tasks::init( + mas_tasks::init_and_run( PgRepositoryFactory::new(pool.clone()), + SystemClock::default(), &mailer, homeserver_connection.clone(), url_builder.clone(), diff --git a/crates/cli/src/commands/worker.rs b/crates/cli/src/commands/worker.rs index 4a6f17bbf..c4e162370 100644 --- a/crates/cli/src/commands/worker.rs +++ b/crates/cli/src/commands/worker.rs @@ -10,6 +10,7 @@ use clap::Parser; use figment::Figment; use mas_config::{AppConfig, ConfigurationSection}; use mas_router::UrlBuilder; +use mas_storage::SystemClock; use mas_storage_pg::PgRepositoryFactory; use tracing::{info, info_span}; @@ -63,8 +64,9 @@ impl Options { drop(config); info!("Starting task scheduler"); - mas_tasks::init( + mas_tasks::init_and_run( PgRepositoryFactory::new(pool.clone()), + SystemClock::default(), &mailer, conn, url_builder, diff --git a/crates/handlers/Cargo.toml b/crates/handlers/Cargo.toml index 35f3e79f5..57f391854 100644 --- a/crates/handlers/Cargo.toml +++ b/crates/handlers/Cargo.toml @@ -72,6 +72,7 @@ mas-axum-utils.workspace = true mas-config.workspace = true mas-context.workspace = true mas-data-model.workspace = true +mas-email.workspace = true mas-http.workspace = true mas-i18n.workspace = true mas-iana.workspace = true @@ -83,6 +84,7 @@ mas-policy.workspace = true mas-router.workspace = true mas-storage.workspace = true mas-storage-pg.workspace = true +mas-tasks.workspace = true mas-templates.workspace = true oauth2-types.workspace = true zxcvbn.workspace = true diff --git a/crates/handlers/src/test_utils.rs b/crates/handlers/src/test_utils.rs index 4abd09506..2eba7d9e4 100644 --- a/crates/handlers/src/test_utils.rs +++ b/crates/handlers/src/test_utils.rs @@ -29,6 +29,7 @@ use mas_axum_utils::{ }; use mas_config::RateLimitingConfig; use mas_data_model::SiteConfig; +use mas_email::{MailTransport, Mailer}; use mas_i18n::Translator; use mas_keystore::{Encrypter, JsonWebKey, JsonWebKeySet, Keystore, PrivateKey}; use mas_matrix::{HomeserverConnection, MockHomeserverConnection}; @@ -39,6 +40,7 @@ use mas_storage::{ clock::MockClock, }; use mas_storage_pg::PgRepositoryFactory; +use mas_tasks::QueueWorker; use mas_templates::{SiteConfigExt, Templates}; use oauth2_types::{registration::ClientRegistrationResponse, requests::AccessTokenResponse}; use rand::SeedableRng; @@ -113,6 +115,7 @@ pub(crate) struct TestState { pub rng: Arc>, pub http_client: reqwest::Client, pub task_tracker: TaskTracker, + queue_worker: Arc>, #[allow(dead_code)] // It is used, as it will cancel the CancellationToken when dropped cancellation_drop_guard: Arc, @@ -235,6 +238,27 @@ impl TestState { shutdown_token.child_token(), ); + let mailer = Mailer::new( + templates.clone(), + MailTransport::blackhole(), + "hello@example.com".parse().unwrap(), + "hello@example.com".parse().unwrap(), + ); + + let queue_worker = mas_tasks::init( + PgRepositoryFactory::new(pool.clone()), + Arc::clone(&clock), + &mailer, + homeserver_connection.clone(), + url_builder.clone(), + &site_config, + shutdown_token.child_token(), + ) + .await + .unwrap(); + + let queue_worker = Arc::new(Mutex::new(queue_worker)); + Ok(Self { repository_factory: PgRepositoryFactory::new(pool), templates, @@ -254,6 +278,7 @@ impl TestState { rng, http_client, task_tracker, + queue_worker, cancellation_drop_guard: Arc::new(shutdown_token.drop_guard()), }) } diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index 796f83396..9ab97dca6 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -10,7 +10,7 @@ use mas_data_model::SiteConfig; use mas_email::Mailer; use mas_matrix::HomeserverConnection; use mas_router::UrlBuilder; -use mas_storage::{BoxRepository, Clock, RepositoryError, RepositoryFactory, SystemClock}; +use mas_storage::{BoxRepository, Clock, RepositoryError, RepositoryFactory}; use mas_storage_pg::PgRepositoryFactory; use new_queue::QueueRunnerError; use opentelemetry::metrics::Meter; @@ -18,6 +18,8 @@ use rand::SeedableRng; use sqlx::{Pool, Postgres}; use tokio_util::{sync::CancellationToken, task::TaskTracker}; +pub use crate::new_queue::QueueWorker; + mod database; mod email; mod matrix; @@ -99,29 +101,31 @@ impl State { } } -/// Initialise the workers. +/// Initialise the worker, without running it. +/// +/// This is mostly useful for tests. /// /// # Errors /// /// This function can fail if the database connection fails. pub async fn init( repository_factory: PgRepositoryFactory, + clock: impl Clock + 'static, mailer: &Mailer, homeserver: impl HomeserverConnection + 'static, url_builder: UrlBuilder, site_config: &SiteConfig, cancellation_token: CancellationToken, - task_tracker: &TaskTracker, -) -> Result<(), QueueRunnerError> { +) -> Result { let state = State::new( repository_factory, - SystemClock::default(), + clock, mailer.clone(), homeserver, url_builder, site_config.clone(), ); - let mut worker = self::new_queue::QueueWorker::new(state, cancellation_token).await?; + let mut worker = QueueWorker::new(state, cancellation_token).await?; worker .register_handler::() @@ -157,6 +161,35 @@ pub async fn init( mas_storage::queue::PruneStalePolicyDataJob, ); + Ok(worker) +} + +/// Initialise the worker and run it. +/// +/// # Errors +/// +/// This function can fail if the database connection fails. +pub async fn init_and_run( + repository_factory: PgRepositoryFactory, + clock: impl Clock + 'static, + mailer: &Mailer, + homeserver: impl HomeserverConnection + 'static, + url_builder: UrlBuilder, + site_config: &SiteConfig, + cancellation_token: CancellationToken, + task_tracker: &TaskTracker, +) -> Result<(), QueueRunnerError> { + let worker = init( + repository_factory, + clock, + mailer, + homeserver, + url_builder, + site_config, + cancellation_token, + ) + .await?; + task_tracker.spawn(worker.run()); Ok(()) diff --git a/crates/tasks/src/new_queue.rs b/crates/tasks/src/new_queue.rs index 1ede913a7..c2bf288e5 100644 --- a/crates/tasks/src/new_queue.rs +++ b/crates/tasks/src/new_queue.rs @@ -214,7 +214,7 @@ impl QueueWorker { skip_all, fields(worker.id) )] - pub async fn new( + pub(crate) async fn new( state: State, cancellation_token: CancellationToken, ) -> Result { @@ -289,7 +289,7 @@ impl QueueWorker { }) } - pub fn register_handler(&mut self) -> &mut Self { + pub(crate) fn register_handler(&mut self) -> &mut Self { // There is a potential panic here, which is fine as it's going to be caught // within the job task let factory = |payload: JobPayload| { @@ -302,7 +302,7 @@ impl QueueWorker { self } - pub fn add_schedule( + pub(crate) fn add_schedule( &mut self, schedule_name: &'static str, expression: Schedule, @@ -320,7 +320,7 @@ impl QueueWorker { self } - pub async fn run(mut self) { + pub(crate) async fn run(mut self) { if let Err(e) = self.run_inner().await { tracing::error!( error = &e as &dyn std::error::Error, @@ -344,7 +344,7 @@ impl QueueWorker { } #[tracing::instrument(name = "worker.setup_schedules", skip_all)] - pub async fn setup_schedules(&mut self) -> Result<(), QueueRunnerError> { + pub(crate) async fn setup_schedules(&mut self) -> Result<(), QueueRunnerError> { let schedules: Vec<_> = self.schedules.iter().map(|s| s.schedule_name).collect(); // Start a transaction on the existing PgListener connection From 388bfc25c20eb048940fa7f6f532a328191b7759 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Wed, 9 Jul 2025 17:26:09 +0200 Subject: [PATCH 4/7] Test helper to run all tests in the job queue --- crates/handlers/src/test_utils.rs | 8 ++++ crates/tasks/src/new_queue.rs | 67 +++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+) diff --git a/crates/handlers/src/test_utils.rs b/crates/handlers/src/test_utils.rs index 2eba7d9e4..7d348167c 100644 --- a/crates/handlers/src/test_utils.rs +++ b/crates/handlers/src/test_utils.rs @@ -283,6 +283,14 @@ impl TestState { }) } + /// Run all the available jobs in the queue. + /// + /// Panics if it fails to run the jobs (but not on job failures!) + pub async fn run_jobs_in_queue(&self) { + let mut queue = self.queue_worker.lock().unwrap(); + queue.process_all_jobs_in_tests().await.unwrap(); + } + /// Reset the test utils to a fresh state, with the same configuration. pub async fn reset(self) -> Self { let site_config = self.site_config.clone(); diff --git a/crates/tasks/src/new_queue.rs b/crates/tasks/src/new_queue.rs index c2bf288e5..60731ea72 100644 --- a/crates/tasks/src/new_queue.rs +++ b/crates/tasks/src/new_queue.rs @@ -713,6 +713,73 @@ impl QueueWorker { Ok(()) } + + /// Process all the pending jobs in the queue. + /// This should only be called in tests! + /// + /// # Errors + /// + /// This function can fail if the database connection fails. + pub async fn process_all_jobs_in_tests(&mut self) -> Result<(), QueueRunnerError> { + // I swear, I'm the leader! + self.am_i_leader = true; + + // First, perform the leader duties. This will make sure that we schedule + // recurring jobs. + self.perform_leader_duties().await?; + + let clock = self.state.clock(); + let mut rng = self.state.rng(); + + // Grab the connection from the PgListener + let txn = self + .listener + .begin() + .await + .map_err(QueueRunnerError::StartTransaction)?; + let mut repo = PgRepository::from_conn(txn); + + // Spawn all the jobs in the database + let queues = self.tracker.queues(); + let jobs = repo + .queue_job() + // I really hope that we don't spawn more than 10k jobs in tests + .reserve(clock, &self.registration, &queues, 10_000) + .await?; + + for Job { + id, + queue_name, + payload, + metadata, + attempt, + } in jobs + { + let cancellation_token = self.cancellation_token.child_token(); + let start = Instant::now(); + let context = JobContext { + id, + metadata, + queue_name, + attempt, + start, + cancellation_token, + }; + + self.tracker.spawn_job(self.state.clone(), context, payload); + } + + self.tracker + .process_jobs(&mut rng, clock, &mut repo, true) + .await?; + + repo.into_inner() + .commit() + .await + .map_err(QueueRunnerError::CommitTransaction)?; + + Ok(()) + } } /// Tracks running jobs From 3728d1e60588333517a4141876a55aeaae507db2 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Wed, 9 Jul 2025 17:28:54 +0200 Subject: [PATCH 5/7] Update the deactivate admin API test to run the deactivation job --- .../handlers/src/admin/v1/users/deactivate.rs | 83 ++++++++++++++----- 1 file changed, 64 insertions(+), 19 deletions(-) diff --git a/crates/handlers/src/admin/v1/users/deactivate.rs b/crates/handlers/src/admin/v1/users/deactivate.rs index 55194a613..87c2361a2 100644 --- a/crates/handlers/src/admin/v1/users/deactivate.rs +++ b/crates/handlers/src/admin/v1/users/deactivate.rs @@ -105,8 +105,9 @@ pub async fn handler( mod tests { use chrono::Duration; use hyper::{Request, StatusCode}; + use insta::assert_json_snapshot; use mas_storage::{Clock, RepositoryAccess, user::UserRepository}; - use sqlx::{PgPool, types::Json}; + use sqlx::PgPool; use crate::test_utils::{RequestBuilderExt, ResponseExt, TestState, setup}; @@ -137,15 +138,37 @@ mod tests { serde_json::json!(state.clock.now()) ); - // It should have scheduled a deactivation job for the user - // XXX: we don't have a good way to look for the deactivation job - let job: Json = sqlx::query_scalar( - "SELECT payload FROM queue_jobs WHERE queue_name = 'deactivate-user'", - ) - .fetch_one(&pool) - .await - .expect("Deactivation job to be scheduled"); - assert_eq!(job["user_id"], serde_json::json!(user.id)); + // Make sure to run the jobs in the queue + state.run_jobs_in_queue().await; + + let request = Request::get(format!("/api/admin/v1/users/{}", user.id)) + .bearer(&token) + .empty(); + let response = state.request(request).await; + response.assert_status(StatusCode::OK); + let body: serde_json::Value = response.json(); + + assert_json_snapshot!(body, @r#" + { + "data": { + "type": "user", + "id": "01FSHN9AG0MZAA6S4AF7CTV32E", + "attributes": { + "username": "alice", + "created_at": "2022-01-16T14:40:00Z", + "locked_at": "2022-01-16T14:40:00Z", + "deactivated_at": "2022-01-16T14:40:00Z", + "admin": false + }, + "links": { + "self": "/api/admin/v1/users/01FSHN9AG0MZAA6S4AF7CTV32E" + } + }, + "links": { + "self": "/api/admin/v1/users/01FSHN9AG0MZAA6S4AF7CTV32E" + } + } + "#); } #[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")] @@ -179,15 +202,37 @@ mod tests { serde_json::json!(state.clock.now()) ); - // It should have scheduled a deactivation job for the user - // XXX: we don't have a good way to look for the deactivation job - let job: Json = sqlx::query_scalar( - "SELECT payload FROM queue_jobs WHERE queue_name = 'deactivate-user'", - ) - .fetch_one(&pool) - .await - .expect("Deactivation job to be scheduled"); - assert_eq!(job["user_id"], serde_json::json!(user.id)); + // Make sure to run the jobs in the queue + state.run_jobs_in_queue().await; + + let request = Request::get(format!("/api/admin/v1/users/{}", user.id)) + .bearer(&token) + .empty(); + let response = state.request(request).await; + response.assert_status(StatusCode::OK); + let body: serde_json::Value = response.json(); + + assert_json_snapshot!(body, @r#" + { + "data": { + "type": "user", + "id": "01FSHN9AG0MZAA6S4AF7CTV32E", + "attributes": { + "username": "alice", + "created_at": "2022-01-16T14:40:00Z", + "locked_at": "2022-01-16T14:40:00Z", + "deactivated_at": "2022-01-16T14:41:00Z", + "admin": false + }, + "links": { + "self": "/api/admin/v1/users/01FSHN9AG0MZAA6S4AF7CTV32E" + } + }, + "links": { + "self": "/api/admin/v1/users/01FSHN9AG0MZAA6S4AF7CTV32E" + } + } + "#); } #[sqlx::test(migrator = "mas_storage_pg::MIGRATOR")] From e66782207b6dd6ea237bc263deec7df1cc2a0fd5 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Wed, 9 Jul 2025 18:04:45 +0200 Subject: [PATCH 6/7] Ignore clippy lint --- crates/tasks/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index 9ab97dca6..e2f539047 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -169,6 +169,7 @@ pub async fn init( /// # Errors /// /// This function can fail if the database connection fails. +#[expect(clippy::too_many_arguments, reason = "this is fine")] pub async fn init_and_run( repository_factory: PgRepositoryFactory, clock: impl Clock + 'static, From e955ecc2dde2fa2245192ec9fa5e43e7c191e8b4 Mon Sep 17 00:00:00 2001 From: Quentin Gliech Date: Wed, 9 Jul 2025 18:23:35 +0200 Subject: [PATCH 7/7] Use an async-aware mutex for the test queue worker --- crates/handlers/src/test_utils.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/handlers/src/test_utils.rs b/crates/handlers/src/test_utils.rs index 7d348167c..89159a6da 100644 --- a/crates/handlers/src/test_utils.rs +++ b/crates/handlers/src/test_utils.rs @@ -115,7 +115,7 @@ pub(crate) struct TestState { pub rng: Arc>, pub http_client: reqwest::Client, pub task_tracker: TaskTracker, - queue_worker: Arc>, + queue_worker: Arc>, #[allow(dead_code)] // It is used, as it will cancel the CancellationToken when dropped cancellation_drop_guard: Arc, @@ -257,7 +257,7 @@ impl TestState { .await .unwrap(); - let queue_worker = Arc::new(Mutex::new(queue_worker)); + let queue_worker = Arc::new(tokio::sync::Mutex::new(queue_worker)); Ok(Self { repository_factory: PgRepositoryFactory::new(pool), @@ -287,7 +287,7 @@ impl TestState { /// /// Panics if it fails to run the jobs (but not on job failures!) pub async fn run_jobs_in_queue(&self) { - let mut queue = self.queue_worker.lock().unwrap(); + let mut queue = self.queue_worker.lock().await; queue.process_all_jobs_in_tests().await.unwrap(); }