diff --git a/Cargo.lock b/Cargo.lock index d8d66a228..71b98659a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -194,44 +194,6 @@ version = "1.0.93" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4c95c10ba0b00a02636238b814946408b1322d5ac4760326e6fb8ec956d85775" -[[package]] -name = "apalis-core" -version = "0.4.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1deb48475efcdece1f23a0553209ee842f264c2a5e9bcc4928bfa6a15a044cde" -dependencies = [ - "async-stream", - "async-trait", - "chrono", - "futures", - "graceful-shutdown", - "http", - "log", - "pin-project-lite", - "serde", - "strum 0.25.0", - "thiserror 1.0.69", - "tokio", - "tower 0.4.13", - "tracing", - "ulid", -] - -[[package]] -name = "apalis-cron" -version = "0.4.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43310b7e0132f9520b09224fb6faafb32eec82a672aa79c09e46b5b488ed505b" -dependencies = [ - "apalis-core", - "async-stream", - "chrono", - "cron", - "futures", - "tokio", - "tower 0.4.13", -] - [[package]] name = "arbitrary" version = "1.3.2" @@ -391,7 +353,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "strum 0.26.3", + "strum", "syn", "thiserror 1.0.69", ] @@ -638,7 +600,7 @@ dependencies = [ "serde_urlencoded", "sync_wrapper 1.0.1", "tokio", - "tower 0.5.1", + "tower", "tower-layer", "tower-service", "tracing", @@ -685,7 +647,7 @@ dependencies = [ "multer", "pin-project-lite", "serde", - "tower 0.5.1", + "tower", "tower-layer", "tower-service", ] @@ -1352,17 +1314,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "cron" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f8c3e73077b4b4a6ab1ea5047c37c57aee77657bc8ecd6f29b0af082d0b0c07" -dependencies = [ - "chrono", - "nom", - "once_cell", -] - [[package]] name = "crossbeam-channel" version = "0.5.13" @@ -2138,17 +2089,6 @@ dependencies = [ "spinning_top", ] -[[package]] -name = "graceful-shutdown" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3effbaf774a1da3462925bb182ccf975c284cf46edca5569ea93420a657af484" -dependencies = [ - "futures-core", - "pin-project-lite", - "tokio", -] - [[package]] name = "group" version = "0.13.0" @@ -3186,7 +3126,7 @@ dependencies = [ "serde_with", "thiserror 2.0.3", "tokio", - "tower 0.5.1", + "tower", "tracing", "ulid", "url", @@ -3255,7 +3195,7 @@ dependencies = [ "sqlx", "tokio", "tokio-util", - "tower 0.5.1", + "tower", "tower-http", "tracing", "tracing-appender", @@ -3395,7 +3335,7 @@ dependencies = [ "time", "tokio", "tokio-util", - "tower 0.5.1", + "tower", "tower-http", "tracing", "tracing-subscriber", @@ -3420,7 +3360,7 @@ dependencies = [ "reqwest", "rustls-platform-verifier", "tokio", - "tower 0.5.1", + "tower", "tower-http", "tracing", "tracing-opentelemetry", @@ -3567,7 +3507,7 @@ dependencies = [ "tokio-rustls", "tokio-test", "tokio-util", - "tower 0.5.1", + "tower", "tower-http", "tracing", "tracing-subscriber", @@ -3599,7 +3539,7 @@ dependencies = [ "serde", "serde_json", "thiserror 2.0.3", - "tower 0.5.1", + "tower", "tracing", "url", "urlencoding", @@ -3683,7 +3623,6 @@ dependencies = [ name = "mas-storage" version = "0.12.0" dependencies = [ - "apalis-core", "async-trait", "chrono", "futures-util", @@ -3734,8 +3673,6 @@ name = "mas-tasks" version = "0.12.0" dependencies = [ "anyhow", - "apalis-core", - "apalis-cron", "async-stream", "async-trait", "chrono", @@ -3759,7 +3696,7 @@ dependencies = [ "thiserror 2.0.3", "tokio", "tokio-util", - "tower 0.5.1", + "tower", "tracing", "tracing-opentelemetry", "ulid", @@ -3804,7 +3741,7 @@ dependencies = [ "opentelemetry-http", "opentelemetry-semantic-conventions", "pin-project-lite", - "tower 0.5.1", + "tower", "tracing", "tracing-opentelemetry", ] @@ -6117,35 +6054,13 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" -[[package]] -name = "strum" -version = "0.25.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125" -dependencies = [ - "strum_macros 0.25.3", -] - [[package]] name = "strum" version = "0.26.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" dependencies = [ - "strum_macros 0.26.4", -] - -[[package]] -name = "strum_macros" -version = "0.25.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23dc1fa9ac9c169a78ba62f0b841814b7abae11bdd047b9c58f893439e309ea0" -dependencies = [ - "heck 0.4.1", - "proc-macro2", - "quote", - "rustversion", - "syn", + "strum_macros", ] [[package]] @@ -6484,21 +6399,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "tower" -version = "0.4.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" -dependencies = [ - "futures-core", - "futures-util", - "pin-project", - "pin-project-lite", - "tower-layer", - "tower-service", - "tracing", -] - [[package]] name = "tower" version = "0.5.1" diff --git a/crates/cli/src/commands/manage.rs b/crates/cli/src/commands/manage.rs index c2fe37a10..c7379a960 100644 --- a/crates/cli/src/commands/manage.rs +++ b/crates/cli/src/commands/manage.rs @@ -20,10 +20,11 @@ use mas_matrix::HomeserverConnection; use mas_matrix_synapse::SynapseConnection; use mas_storage::{ compat::{CompatAccessTokenRepository, CompatSessionFilter, CompatSessionRepository}, - job::{ - DeactivateUserJob, JobRepositoryExt, ProvisionUserJob, ReactivateUserJob, SyncDevicesJob, - }, oauth2::OAuth2SessionFilter, + queue::{ + DeactivateUserJob, ProvisionUserJob, QueueJobRepositoryExt as _, ReactivateUserJob, + SyncDevicesJob, + }, user::{BrowserSessionFilter, UserEmailRepository, UserPasswordRepository, UserRepository}, Clock, RepositoryAccess, SystemClock, }; @@ -366,7 +367,7 @@ impl Options { let id = id.into(); info!(user.id = %id, "Scheduling provisioning job"); let job = ProvisionUserJob::new_for_id(id); - repo.job().schedule_job(job).await?; + repo.queue_job().schedule_job(&mut rng, &clock, job).await?; } repo.into_inner().commit().await?; @@ -429,7 +430,9 @@ impl Options { // Schedule a job to sync the devices of the user with the homeserver warn!("Scheduling job to sync devices for the user"); - repo.job().schedule_job(SyncDevicesJob::new(&user)).await?; + repo.queue_job() + .schedule_job(&mut rng, &clock, SyncDevicesJob::new(&user)) + .await?; let txn = repo.into_inner(); if dry_run { @@ -467,8 +470,8 @@ impl Options { if deactivate { warn!(%user.id, "Scheduling user deactivation"); - repo.job() - .schedule_job(DeactivateUserJob::new(&user, false)) + repo.queue_job() + .schedule_job(&mut rng, &clock, DeactivateUserJob::new(&user, false)) .await?; } @@ -491,8 +494,8 @@ impl Options { .context("User not found")?; warn!(%user.id, "User scheduling user reactivation"); - repo.job() - .schedule_job(ReactivateUserJob::new(&user)) + repo.queue_job() + .schedule_job(&mut rng, &clock, ReactivateUserJob::new(&user)) .await?; repo.into_inner().commit().await?; @@ -975,7 +978,9 @@ impl UserCreationRequest<'_> { provision_job = provision_job.set_display_name(display_name); } - repo.job().schedule_job(provision_job).await?; + repo.queue_job() + .schedule_job(rng, clock, provision_job) + .await?; Ok(user) } diff --git a/crates/cli/src/commands/server.rs b/crates/cli/src/commands/server.rs index 70834ccba..0b0efdc31 100644 --- a/crates/cli/src/commands/server.rs +++ b/crates/cli/src/commands/server.rs @@ -19,10 +19,6 @@ use mas_matrix_synapse::SynapseConnection; use mas_router::UrlBuilder; use mas_storage::SystemClock; use mas_storage_pg::MIGRATOR; -use rand::{ - distributions::{Alphanumeric, DistString}, - thread_rng, -}; use sqlx::migrate::Migrate; use tracing::{info, info_span, warn, Instrument}; @@ -161,13 +157,8 @@ impl Options { let mailer = mailer_from_config(&config.email, &templates)?; mailer.test_connection().await?; - #[allow(clippy::disallowed_methods)] - let mut rng = thread_rng(); - let worker_name = Alphanumeric.sample_string(&mut rng, 10); - - info!(worker_name, "Starting task worker"); - let monitor = mas_tasks::init( - &worker_name, + info!("Starting task worker"); + mas_tasks::init( &pool, &mailer, homeserver_connection.clone(), @@ -176,21 +167,6 @@ impl Options { shutdown.task_tracker(), ) .await?; - - // XXX: The monitor from apalis is a bit annoying to use for graceful shutdowns, - // ideally we'd just give it a cancellation token - let shutdown_future = shutdown.soft_shutdown_token().cancelled_owned(); - shutdown.task_tracker().spawn(async move { - if let Err(e) = monitor - .run_with_signal(async move { - shutdown_future.await; - Ok(()) - }) - .await - { - tracing::error!(error = &e as &dyn std::error::Error, "Task worker failed"); - } - }); } let listeners_config = config.http.listeners.clone(); diff --git a/crates/cli/src/commands/worker.rs b/crates/cli/src/commands/worker.rs index c58605a1b..3d976d46e 100644 --- a/crates/cli/src/commands/worker.rs +++ b/crates/cli/src/commands/worker.rs @@ -11,10 +11,6 @@ use figment::Figment; use mas_config::{AppConfig, ConfigurationSection}; use mas_matrix_synapse::SynapseConnection; use mas_router::UrlBuilder; -use rand::{ - distributions::{Alphanumeric, DistString}, - thread_rng, -}; use tracing::{info, info_span}; use crate::{ @@ -71,13 +67,8 @@ impl Options { drop(config); - #[allow(clippy::disallowed_methods)] - let mut rng = thread_rng(); - let worker_name = Alphanumeric.sample_string(&mut rng, 10); - - info!(worker_name, "Starting task scheduler"); - let monitor = mas_tasks::init( - &worker_name, + info!("Starting task scheduler"); + mas_tasks::init( &pool, &mailer, conn, @@ -87,20 +78,6 @@ impl Options { ) .await?; - // XXX: The monitor from apalis is a bit annoying to use for graceful shutdowns, - // ideally we'd just give it a cancellation token - let shutdown_future = shutdown.soft_shutdown_token().cancelled_owned(); - shutdown.task_tracker().spawn(async move { - if let Err(e) = monitor - .run_with_signal(async move { - shutdown_future.await; - Ok(()) - }) - .await - { - tracing::error!(error = &e as &dyn std::error::Error, "Task worker failed"); - } - }); span.exit(); shutdown.run().await; diff --git a/crates/handlers/src/admin/v1/users/add.rs b/crates/handlers/src/admin/v1/users/add.rs index d45e184bc..81b17a1ed 100644 --- a/crates/handlers/src/admin/v1/users/add.rs +++ b/crates/handlers/src/admin/v1/users/add.rs @@ -9,7 +9,7 @@ use axum::{extract::State, response::IntoResponse, Json}; use hyper::StatusCode; use mas_matrix::BoxHomeserverConnection; use mas_storage::{ - job::{JobRepositoryExt, ProvisionUserJob}, + queue::{ProvisionUserJob, QueueJobRepositoryExt as _}, BoxRng, }; use schemars::JsonSchema; @@ -164,8 +164,8 @@ pub async fn handler( let user = repo.user().add(&mut rng, &clock, params.username).await?; - repo.job() - .schedule_job(ProvisionUserJob::new(&user)) + repo.queue_job() + .schedule_job(&mut rng, &clock, ProvisionUserJob::new(&user)) .await?; repo.save().await?; diff --git a/crates/handlers/src/admin/v1/users/deactivate.rs b/crates/handlers/src/admin/v1/users/deactivate.rs index 61afcc3a7..ad09a7ca2 100644 --- a/crates/handlers/src/admin/v1/users/deactivate.rs +++ b/crates/handlers/src/admin/v1/users/deactivate.rs @@ -4,10 +4,13 @@ // SPDX-License-Identifier: AGPL-3.0-only // Please see LICENSE in the repository root for full details. -use aide::{transform::TransformOperation, OperationIo}; +use aide::{transform::TransformOperation, NoApi, OperationIo}; use axum::{response::IntoResponse, Json}; use hyper::StatusCode; -use mas_storage::job::{DeactivateUserJob, JobRepositoryExt}; +use mas_storage::{ + queue::{DeactivateUserJob, QueueJobRepositoryExt as _}, + BoxRng, +}; use tracing::info; use ulid::Ulid; @@ -69,6 +72,7 @@ pub async fn handler( CallContext { mut repo, clock, .. }: CallContext, + NoApi(mut rng): NoApi, id: UlidPathParam, ) -> Result>, RouteError> { let id = *id; @@ -83,8 +87,8 @@ pub async fn handler( } info!("Scheduling deactivation of user {}", user.id); - repo.job() - .schedule_job(DeactivateUserJob::new(&user, true)) + repo.queue_job() + .schedule_job(&mut rng, &clock, DeactivateUserJob::new(&user, true)) .await?; repo.save().await?; @@ -133,11 +137,12 @@ mod tests { // It should have scheduled a deactivation job for the user // XXX: we don't have a good way to look for the deactivation job - let job: Json = - sqlx::query_scalar("SELECT job FROM apalis.jobs WHERE job_type = 'deactivate-user'") - .fetch_one(&pool) - .await - .expect("Deactivation job to be scheduled"); + let job: Json = sqlx::query_scalar( + "SELECT payload FROM queue_jobs WHERE queue_name = 'deactivate-user'", + ) + .fetch_one(&pool) + .await + .expect("Deactivation job to be scheduled"); assert_eq!(job["user_id"], serde_json::json!(user.id)); } @@ -174,11 +179,12 @@ mod tests { // It should have scheduled a deactivation job for the user // XXX: we don't have a good way to look for the deactivation job - let job: Json = - sqlx::query_scalar("SELECT job FROM apalis.jobs WHERE job_type = 'deactivate-user'") - .fetch_one(&pool) - .await - .expect("Deactivation job to be scheduled"); + let job: Json = sqlx::query_scalar( + "SELECT payload FROM queue_jobs WHERE queue_name = 'deactivate-user'", + ) + .fetch_one(&pool) + .await + .expect("Deactivation job to be scheduled"); assert_eq!(job["user_id"], serde_json::json!(user.id)); } diff --git a/crates/handlers/src/compat/logout.rs b/crates/handlers/src/compat/logout.rs index 59826f0ba..8ef2dd95b 100644 --- a/crates/handlers/src/compat/logout.rs +++ b/crates/handlers/src/compat/logout.rs @@ -12,8 +12,8 @@ use mas_axum_utils::sentry::SentryEventID; use mas_data_model::TokenType; use mas_storage::{ compat::{CompatAccessTokenRepository, CompatSessionRepository}, - job::{JobRepositoryExt, SyncDevicesJob}, - BoxClock, BoxRepository, Clock, RepositoryAccess, + queue::{QueueJobRepositoryExt as _, SyncDevicesJob}, + BoxClock, BoxRepository, BoxRng, Clock, RepositoryAccess, }; use thiserror::Error; @@ -65,6 +65,7 @@ impl IntoResponse for RouteError { #[tracing::instrument(name = "handlers.compat.logout.post", skip_all, err)] pub(crate) async fn post( clock: BoxClock, + mut rng: BoxRng, mut repo: BoxRepository, activity_tracker: BoundActivityTracker, maybe_authorization: Option>>, @@ -104,7 +105,9 @@ pub(crate) async fn post( .ok_or(RouteError::InvalidAuthorization)?; // Schedule a job to sync the devices of the user with the homeserver - repo.job().schedule_job(SyncDevicesJob::new(&user)).await?; + repo.queue_job() + .schedule_job(&mut rng, &clock, SyncDevicesJob::new(&user)) + .await?; repo.compat_session().finish(&clock, session).await?; diff --git a/crates/handlers/src/graphql/mutations/compat_session.rs b/crates/handlers/src/graphql/mutations/compat_session.rs index baf385994..c13c1e559 100644 --- a/crates/handlers/src/graphql/mutations/compat_session.rs +++ b/crates/handlers/src/graphql/mutations/compat_session.rs @@ -8,7 +8,7 @@ use anyhow::Context as _; use async_graphql::{Context, Enum, InputObject, Object, ID}; use mas_storage::{ compat::CompatSessionRepository, - job::{JobRepositoryExt, SyncDevicesJob}, + queue::{QueueJobRepositoryExt as _, SyncDevicesJob}, RepositoryAccess, }; @@ -72,6 +72,7 @@ impl CompatSessionMutations { input: EndCompatSessionInput, ) -> Result { let state = ctx.state(); + let mut rng = state.rng(); let compat_session_id = NodeType::CompatSession.extract_ulid(&input.compat_session_id)?; let requester = ctx.requester(); @@ -94,7 +95,9 @@ impl CompatSessionMutations { .context("Could not load user")?; // Schedule a job to sync the devices of the user with the homeserver - repo.job().schedule_job(SyncDevicesJob::new(&user)).await?; + repo.queue_job() + .schedule_job(&mut rng, &clock, SyncDevicesJob::new(&user)) + .await?; let session = repo.compat_session().finish(&clock, session).await?; diff --git a/crates/handlers/src/graphql/mutations/oauth2_session.rs b/crates/handlers/src/graphql/mutations/oauth2_session.rs index a0595e695..0b1dbe669 100644 --- a/crates/handlers/src/graphql/mutations/oauth2_session.rs +++ b/crates/handlers/src/graphql/mutations/oauth2_session.rs @@ -9,11 +9,11 @@ use async_graphql::{Context, Description, Enum, InputObject, Object, ID}; use chrono::Duration; use mas_data_model::{Device, TokenType}; use mas_storage::{ - job::{JobRepositoryExt, SyncDevicesJob}, oauth2::{ OAuth2AccessTokenRepository, OAuth2ClientRepository, OAuth2RefreshTokenRepository, OAuth2SessionRepository, }, + queue::{QueueJobRepositoryExt as _, SyncDevicesJob}, user::UserRepository, RepositoryAccess, }; @@ -217,6 +217,7 @@ impl OAuth2SessionMutations { let mut repo = state.repository().await?; let clock = state.clock(); + let mut rng = state.rng(); let session = repo.oauth2_session().lookup(oauth2_session_id).await?; let Some(session) = session else { @@ -235,7 +236,9 @@ impl OAuth2SessionMutations { .context("Could not load user")?; // Schedule a job to sync the devices of the user with the homeserver - repo.job().schedule_job(SyncDevicesJob::new(&user)).await?; + repo.queue_job() + .schedule_job(&mut rng, &clock, SyncDevicesJob::new(&user)) + .await?; } let session = repo.oauth2_session().finish(&clock, session).await?; diff --git a/crates/handlers/src/graphql/mutations/user.rs b/crates/handlers/src/graphql/mutations/user.rs index 3fd4a10d5..04d9cc9b3 100644 --- a/crates/handlers/src/graphql/mutations/user.rs +++ b/crates/handlers/src/graphql/mutations/user.rs @@ -7,7 +7,7 @@ use anyhow::Context as _; use async_graphql::{Context, Description, Enum, InputObject, Object, ID}; use mas_storage::{ - job::{DeactivateUserJob, JobRepositoryExt, ProvisionUserJob}, + queue::{DeactivateUserJob, ProvisionUserJob, QueueJobRepositoryExt as _}, user::UserRepository, }; use tracing::{info, warn}; @@ -398,8 +398,8 @@ impl UserMutations { let user = repo.user().add(&mut rng, &clock, input.username).await?; - repo.job() - .schedule_job(ProvisionUserJob::new(&user)) + repo.queue_job() + .schedule_job(&mut rng, &clock, ProvisionUserJob::new(&user)) .await?; repo.save().await?; @@ -414,6 +414,8 @@ impl UserMutations { input: LockUserInput, ) -> Result { let state = ctx.state(); + let clock = state.clock(); + let mut rng = state.rng(); let requester = ctx.requester(); if !requester.is_admin() { @@ -435,8 +437,8 @@ impl UserMutations { if deactivate { info!("Scheduling deactivation of user {}", user.id); - repo.job() - .schedule_job(DeactivateUserJob::new(&user, deactivate)) + repo.queue_job() + .schedule_job(&mut rng, &clock, DeactivateUserJob::new(&user, deactivate)) .await?; } diff --git a/crates/handlers/src/graphql/mutations/user_email.rs b/crates/handlers/src/graphql/mutations/user_email.rs index 91a963dc7..08604f652 100644 --- a/crates/handlers/src/graphql/mutations/user_email.rs +++ b/crates/handlers/src/graphql/mutations/user_email.rs @@ -7,7 +7,7 @@ use anyhow::Context as _; use async_graphql::{Context, Description, Enum, InputObject, Object, ID}; use mas_storage::{ - job::{JobRepositoryExt, ProvisionUserJob, VerifyEmailJob}, + queue::{ProvisionUserJob, QueueJobRepositoryExt as _, VerifyEmailJob}, user::{UserEmailRepository, UserRepository}, RepositoryAccess, }; @@ -376,6 +376,8 @@ impl UserEmailMutations { let state = ctx.state(); let id = NodeType::User.extract_ulid(&input.user_id)?; let requester = ctx.requester(); + let clock = state.clock(); + let mut rng = state.rng(); if !requester.is_owner_or_admin(&UserId(id)) { return Err(async_graphql::Error::new("Unauthorized")); @@ -427,9 +429,6 @@ impl UserEmailMutations { let (added, mut user_email) = if let Some(user_email) = existing_user_email { (false, user_email) } else { - let clock = state.clock(); - let mut rng = state.rng(); - let user_email = repo .user_email() .add(&mut rng, &clock, &user, input.email) @@ -447,8 +446,8 @@ impl UserEmailMutations { .await?; } else { // TODO: figure out the locale - repo.job() - .schedule_job(VerifyEmailJob::new(&user_email)) + repo.queue_job() + .schedule_job(&mut rng, &clock, VerifyEmailJob::new(&user_email)) .await?; } } @@ -470,6 +469,8 @@ impl UserEmailMutations { input: SendVerificationEmailInput, ) -> Result { let state = ctx.state(); + let clock = state.clock(); + let mut rng = state.rng(); let user_email_id = NodeType::UserEmail.extract_ulid(&input.user_email_id)?; let requester = ctx.requester(); @@ -489,8 +490,8 @@ impl UserEmailMutations { let needs_verification = user_email.confirmed_at.is_none(); if needs_verification { // TODO: figure out the locale - repo.job() - .schedule_job(VerifyEmailJob::new(&user_email)) + repo.queue_job() + .schedule_job(&mut rng, &clock, VerifyEmailJob::new(&user_email)) .await?; } @@ -515,6 +516,7 @@ impl UserEmailMutations { let requester = ctx.requester(); let clock = state.clock(); + let mut rng = state.rng(); let mut repo = state.repository().await?; let user_email = repo @@ -567,8 +569,8 @@ impl UserEmailMutations { .mark_as_verified(&clock, user_email) .await?; - repo.job() - .schedule_job(ProvisionUserJob::new(&user)) + repo.queue_job() + .schedule_job(&mut rng, &clock, ProvisionUserJob::new(&user)) .await?; repo.save().await?; @@ -586,6 +588,8 @@ impl UserEmailMutations { let user_email_id = NodeType::UserEmail.extract_ulid(&input.user_email_id)?; let requester = ctx.requester(); + let mut rng = state.rng(); + let clock = state.clock(); let mut repo = state.repository().await?; let user_email = repo.user_email().lookup(user_email_id).await?; @@ -616,8 +620,8 @@ impl UserEmailMutations { repo.user_email().remove(user_email.clone()).await?; // Schedule a job to update the user - repo.job() - .schedule_job(ProvisionUserJob::new(&user)) + repo.queue_job() + .schedule_job(&mut rng, &clock, ProvisionUserJob::new(&user)) .await?; repo.save().await?; diff --git a/crates/handlers/src/oauth2/revoke.rs b/crates/handlers/src/oauth2/revoke.rs index 584a02053..557614c5e 100644 --- a/crates/handlers/src/oauth2/revoke.rs +++ b/crates/handlers/src/oauth2/revoke.rs @@ -14,8 +14,8 @@ use mas_data_model::TokenType; use mas_iana::oauth::OAuthTokenTypeHint; use mas_keystore::Encrypter; use mas_storage::{ - job::{JobRepositoryExt, SyncDevicesJob}, - BoxClock, BoxRepository, RepositoryAccess, + queue::{QueueJobRepositoryExt as _, SyncDevicesJob}, + BoxClock, BoxRepository, BoxRng, RepositoryAccess, }; use oauth2_types::{ errors::{ClientError, ClientErrorCode}, @@ -110,6 +110,7 @@ impl From for RouteError { )] pub(crate) async fn post( clock: BoxClock, + mut rng: BoxRng, State(http_client): State, mut repo: BoxRepository, activity_tracker: BoundActivityTracker, @@ -209,7 +210,9 @@ pub(crate) async fn post( .ok_or(RouteError::UnknownToken)?; // Schedule a job to sync the devices of the user with the homeserver - repo.job().schedule_job(SyncDevicesJob::new(&user)).await?; + repo.queue_job() + .schedule_job(&mut rng, &clock, SyncDevicesJob::new(&user)) + .await?; } // Now that we checked everything, we can end the session. diff --git a/crates/handlers/src/upstream_oauth2/link.rs b/crates/handlers/src/upstream_oauth2/link.rs index aea5d32b7..b58da8a3b 100644 --- a/crates/handlers/src/upstream_oauth2/link.rs +++ b/crates/handlers/src/upstream_oauth2/link.rs @@ -23,7 +23,7 @@ use mas_matrix::BoxHomeserverConnection; use mas_policy::Policy; use mas_router::UrlBuilder; use mas_storage::{ - job::{JobRepositoryExt, ProvisionUserJob}, + queue::{ProvisionUserJob, QueueJobRepositoryExt as _}, upstream_oauth2::{UpstreamOAuthLinkRepository, UpstreamOAuthSessionRepository}, user::{BrowserSessionRepository, UserEmailRepository, UserRepository}, BoxClock, BoxRepository, BoxRng, RepositoryAccess, @@ -796,7 +796,7 @@ pub(crate) async fn post( job = job.set_display_name(name); } - repo.job().schedule_job(job).await?; + repo.queue_job().schedule_job(&mut rng, &clock, job).await?; // If we have an email, add it to the user if let Some(email) = email { diff --git a/crates/handlers/src/views/account/emails/add.rs b/crates/handlers/src/views/account/emails/add.rs index 8ea14474d..fbdbccdd5 100644 --- a/crates/handlers/src/views/account/emails/add.rs +++ b/crates/handlers/src/views/account/emails/add.rs @@ -17,7 +17,7 @@ use mas_data_model::SiteConfig; use mas_policy::Policy; use mas_router::UrlBuilder; use mas_storage::{ - job::{JobRepositoryExt, VerifyEmailJob}, + queue::{QueueJobRepositoryExt as _, VerifyEmailJob}, user::UserEmailRepository, BoxClock, BoxRepository, BoxRng, }; @@ -137,8 +137,12 @@ pub(crate) async fn post( // If the email was not confirmed, send a confirmation email & redirect to the // verify page let next = if user_email.confirmed_at.is_none() { - repo.job() - .schedule_job(VerifyEmailJob::new(&user_email).with_language(locale.to_string())) + repo.queue_job() + .schedule_job( + &mut rng, + &clock, + VerifyEmailJob::new(&user_email).with_language(locale.to_string()), + ) .await?; let next = mas_router::AccountVerifyEmail::new(user_email.id); diff --git a/crates/handlers/src/views/account/emails/verify.rs b/crates/handlers/src/views/account/emails/verify.rs index a358f6d0a..518177c7b 100644 --- a/crates/handlers/src/views/account/emails/verify.rs +++ b/crates/handlers/src/views/account/emails/verify.rs @@ -16,7 +16,7 @@ use mas_axum_utils::{ }; use mas_router::UrlBuilder; use mas_storage::{ - job::{JobRepositoryExt, ProvisionUserJob}, + queue::{ProvisionUserJob, QueueJobRepositoryExt as _}, user::UserEmailRepository, BoxClock, BoxRepository, BoxRng, RepositoryAccess, }; @@ -94,6 +94,7 @@ pub(crate) async fn get( )] pub(crate) async fn post( clock: BoxClock, + mut rng: BoxRng, mut repo: BoxRepository, cookie_jar: CookieJar, State(url_builder): State, @@ -141,8 +142,8 @@ pub(crate) async fn post( .mark_as_verified(&clock, user_email) .await?; - repo.job() - .schedule_job(ProvisionUserJob::new(&session.user)) + repo.queue_job() + .schedule_job(&mut rng, &clock, ProvisionUserJob::new(&session.user)) .await?; repo.save().await?; diff --git a/crates/handlers/src/views/recovery/progress.rs b/crates/handlers/src/views/recovery/progress.rs index e4d416691..3c2a178fa 100644 --- a/crates/handlers/src/views/recovery/progress.rs +++ b/crates/handlers/src/views/recovery/progress.rs @@ -18,7 +18,7 @@ use mas_axum_utils::{ use mas_data_model::SiteConfig; use mas_router::UrlBuilder; use mas_storage::{ - job::{JobRepositoryExt, SendAccountRecoveryEmailsJob}, + queue::{QueueJobRepositoryExt as _, SendAccountRecoveryEmailsJob}, BoxClock, BoxRepository, BoxRng, }; use mas_templates::{EmptyContext, RecoveryProgressContext, TemplateContext, Templates}; @@ -136,8 +136,12 @@ pub(crate) async fn post( } // Schedule a new batch of emails - repo.job() - .schedule_job(SendAccountRecoveryEmailsJob::new(&recovery_session)) + repo.queue_job() + .schedule_job( + &mut rng, + &clock, + SendAccountRecoveryEmailsJob::new(&recovery_session), + ) .await?; repo.save().await?; diff --git a/crates/handlers/src/views/recovery/start.rs b/crates/handlers/src/views/recovery/start.rs index 53896a427..9b5a7a4d5 100644 --- a/crates/handlers/src/views/recovery/start.rs +++ b/crates/handlers/src/views/recovery/start.rs @@ -21,7 +21,7 @@ use mas_axum_utils::{ use mas_data_model::{SiteConfig, UserAgent}; use mas_router::UrlBuilder; use mas_storage::{ - job::{JobRepositoryExt, SendAccountRecoveryEmailsJob}, + queue::{QueueJobRepositoryExt as _, SendAccountRecoveryEmailsJob}, BoxClock, BoxRepository, BoxRng, }; use mas_templates::{ @@ -145,8 +145,12 @@ pub(crate) async fn post( ) .await?; - repo.job() - .schedule_job(SendAccountRecoveryEmailsJob::new(&session)) + repo.queue_job() + .schedule_job( + &mut rng, + &clock, + SendAccountRecoveryEmailsJob::new(&session), + ) .await?; repo.save().await?; diff --git a/crates/handlers/src/views/register.rs b/crates/handlers/src/views/register.rs index fa8de82f4..0eaa99801 100644 --- a/crates/handlers/src/views/register.rs +++ b/crates/handlers/src/views/register.rs @@ -24,7 +24,7 @@ use mas_matrix::BoxHomeserverConnection; use mas_policy::Policy; use mas_router::UrlBuilder; use mas_storage::{ - job::{JobRepositoryExt, ProvisionUserJob, VerifyEmailJob}, + queue::{ProvisionUserJob, QueueJobRepositoryExt as _, VerifyEmailJob}, user::{BrowserSessionRepository, UserEmailRepository, UserPasswordRepository, UserRepository}, BoxClock, BoxRepository, BoxRng, RepositoryAccess, }; @@ -294,12 +294,16 @@ pub(crate) async fn post( .authenticate_with_password(&mut rng, &clock, &session, &user_password) .await?; - repo.job() - .schedule_job(VerifyEmailJob::new(&user_email).with_language(locale.to_string())) + repo.queue_job() + .schedule_job( + &mut rng, + &clock, + VerifyEmailJob::new(&user_email).with_language(locale.to_string()), + ) .await?; - repo.job() - .schedule_job(ProvisionUserJob::new(&user)) + repo.queue_job() + .schedule_job(&mut rng, &clock, ProvisionUserJob::new(&user)) .await?; repo.save().await?; diff --git a/crates/storage-pg/.sqlx/query-359a00f6667b5b1fef616b0c18e11eb91698aa1f2d5d146cffbb7aea8d77467b.json b/crates/storage-pg/.sqlx/query-359a00f6667b5b1fef616b0c18e11eb91698aa1f2d5d146cffbb7aea8d77467b.json deleted file mode 100644 index 941ae4366..000000000 --- a/crates/storage-pg/.sqlx/query-359a00f6667b5b1fef616b0c18e11eb91698aa1f2d5d146cffbb7aea8d77467b.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n INSERT INTO apalis.jobs (job, id, job_type)\n VALUES ($1::json, $2::text, $3::text)\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Json", - "Text", - "Text" - ] - }, - "nullable": [] - }, - "hash": "359a00f6667b5b1fef616b0c18e11eb91698aa1f2d5d146cffbb7aea8d77467b" -} diff --git a/crates/storage-pg/.sqlx/query-e291be0434ab9c346dee777e50f8e601f12c8003fe93a5ecb110d02642d14c3c.json b/crates/storage-pg/.sqlx/query-e291be0434ab9c346dee777e50f8e601f12c8003fe93a5ecb110d02642d14c3c.json new file mode 100644 index 000000000..84ac12de9 --- /dev/null +++ b/crates/storage-pg/.sqlx/query-e291be0434ab9c346dee777e50f8e601f12c8003fe93a5ecb110d02642d14c3c.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO queue_jobs\n (queue_job_id, queue_name, payload, metadata, created_at)\n VALUES ($1, $2, $3, $4, $5)\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Text", + "Jsonb", + "Jsonb", + "Timestamptz" + ] + }, + "nullable": [] + }, + "hash": "e291be0434ab9c346dee777e50f8e601f12c8003fe93a5ecb110d02642d14c3c" +} diff --git a/crates/storage-pg/migrations/20241004121132_queue_job.sql b/crates/storage-pg/migrations/20241004121132_queue_job.sql new file mode 100644 index 000000000..859377d52 --- /dev/null +++ b/crates/storage-pg/migrations/20241004121132_queue_job.sql @@ -0,0 +1,79 @@ +-- Copyright 2024 New Vector Ltd. +-- +-- SPDX-License-Identifier: AGPL-3.0-only +-- Please see LICENSE in the repository root for full details. + +CREATE TYPE queue_job_status AS ENUM ( + -- The job is available to be picked up by a worker + 'available', + + -- The job is currently being processed by a worker + 'running', + + -- The job has been completed + 'completed', + + -- The worker running the job was lost + 'lost' +); + +CREATE TABLE queue_jobs ( + queue_job_id UUID NOT NULL PRIMARY KEY, + + -- The status of the job + status queue_job_status NOT NULL DEFAULT 'available', + + -- When the job was created + created_at TIMESTAMP WITH TIME ZONE NOT NULL, + + -- When the job was grabbed by a worker + started_at TIMESTAMP WITH TIME ZONE, + + -- Which worker is currently processing the job + started_by UUID REFERENCES queue_workers (queue_worker_id), + + -- When the job was completed + completed_at TIMESTAMP WITH TIME ZONE, + + -- The name of the queue this job belongs to + queue_name TEXT NOT NULL, + + -- The arguments to the job + payload JSONB NOT NULL DEFAULT '{}', + + -- Arbitrary metadata about the job, like the trace context + metadata JSONB NOT NULL DEFAULT '{}' +); + +-- When we grab jobs, we filter on the status of the job and the queue name +-- Then we order on the `queue_job_id` column, as it is a ULID, which ensures timestamp ordering +CREATE INDEX idx_queue_jobs_status_queue_job_id + ON queue_jobs + USING BTREE (status, queue_name, queue_job_id); + +-- We would like to notify workers when a job is available to wake them up +CREATE OR REPLACE FUNCTION queue_job_notify() + RETURNS TRIGGER + AS $$ +DECLARE + payload json; +BEGIN + IF NEW.status = 'available' THEN + -- The idea with this trigger is to notify the queue worker that a new job + -- is available on a queue. If there are many notifications with the same + -- payload, PG will coalesce them in a single notification, which is why we + -- keep the payload simple. + payload = json_build_object('queue', NEW.queue_name); + PERFORM + pg_notify('queue_available', payload::text); + END IF; + RETURN NULL; +END; +$$ +LANGUAGE plpgsql; + +CREATE TRIGGER queue_job_notify_trigger + AFTER INSERT OR UPDATE OF status + ON queue_jobs + FOR EACH ROW + EXECUTE PROCEDURE queue_job_notify(); diff --git a/crates/storage-pg/src/job.rs b/crates/storage-pg/src/job.rs deleted file mode 100644 index f0630458a..000000000 --- a/crates/storage-pg/src/job.rs +++ /dev/null @@ -1,67 +0,0 @@ -// Copyright 2024 New Vector Ltd. -// Copyright 2023, 2024 The Matrix.org Foundation C.I.C. -// -// SPDX-License-Identifier: AGPL-3.0-only -// Please see LICENSE in the repository root for full details. - -//! A module containing the PostgreSQL implementation of the [`JobRepository`]. - -use async_trait::async_trait; -use mas_storage::job::{JobId, JobRepository, JobSubmission}; -use sqlx::PgConnection; - -use crate::{DatabaseError, ExecuteExt}; - -/// An implementation of [`JobRepository`] for a PostgreSQL connection. -pub struct PgJobRepository<'c> { - conn: &'c mut PgConnection, -} - -impl<'c> PgJobRepository<'c> { - /// Create a new [`PgJobRepository`] from an active PostgreSQL connection. - #[must_use] - pub fn new(conn: &'c mut PgConnection) -> Self { - Self { conn } - } -} - -#[async_trait] -impl JobRepository for PgJobRepository<'_> { - type Error = DatabaseError; - - #[tracing::instrument( - name = "db.job.schedule_submission", - skip_all, - fields( - db.query.text, - job.id, - job.name = submission.name(), - ), - err, - )] - async fn schedule_submission( - &mut self, - submission: JobSubmission, - ) -> Result { - // XXX: This does not use the clock nor the rng - let id = JobId::new(); - tracing::Span::current().record("job.id", tracing::field::display(&id)); - - let res = sqlx::query!( - r#" - INSERT INTO apalis.jobs (job, id, job_type) - VALUES ($1::json, $2::text, $3::text) - "#, - submission.payload(), - id.to_string(), - submission.name(), - ) - .traced() - .execute(&mut *self.conn) - .await?; - - DatabaseError::ensure_affected_rows(&res, 1)?; - - Ok(id) - } -} diff --git a/crates/storage-pg/src/lib.rs b/crates/storage-pg/src/lib.rs index e16303278..3312d73b8 100644 --- a/crates/storage-pg/src/lib.rs +++ b/crates/storage-pg/src/lib.rs @@ -164,7 +164,6 @@ use sqlx::migrate::Migrator; pub mod app_session; pub mod compat; -pub mod job; pub mod oauth2; pub mod queue; pub mod upstream_oauth2; diff --git a/crates/storage-pg/src/queue/job.rs b/crates/storage-pg/src/queue/job.rs new file mode 100644 index 000000000..e2ef1005a --- /dev/null +++ b/crates/storage-pg/src/queue/job.rs @@ -0,0 +1,76 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +//! A module containing the PostgreSQL implementation of the +//! [`QueueJobRepository`]. + +use async_trait::async_trait; +use mas_storage::{queue::QueueJobRepository, Clock}; +use rand::RngCore; +use sqlx::PgConnection; +use ulid::Ulid; +use uuid::Uuid; + +use crate::{DatabaseError, ExecuteExt}; + +/// An implementation of [`QueueJobRepository`] for a PostgreSQL connection. +pub struct PgQueueJobRepository<'c> { + conn: &'c mut PgConnection, +} + +impl<'c> PgQueueJobRepository<'c> { + /// Create a new [`PgQueueJobRepository`] from an active PostgreSQL + /// connection. + #[must_use] + pub fn new(conn: &'c mut PgConnection) -> Self { + Self { conn } + } +} + +#[async_trait] +impl QueueJobRepository for PgQueueJobRepository<'_> { + type Error = DatabaseError; + + #[tracing::instrument( + name = "db.queue_job.schedule", + fields( + queue_job.id, + queue_job.queue_name = queue_name, + db.query.text, + ), + skip_all, + err, + )] + async fn schedule( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &dyn Clock, + queue_name: &str, + payload: serde_json::Value, + metadata: serde_json::Value, + ) -> Result<(), Self::Error> { + let created_at = clock.now(); + let id = Ulid::from_datetime_with_source(created_at.into(), rng); + tracing::Span::current().record("queue_job.id", tracing::field::display(id)); + + sqlx::query!( + r#" + INSERT INTO queue_jobs + (queue_job_id, queue_name, payload, metadata, created_at) + VALUES ($1, $2, $3, $4, $5) + "#, + Uuid::from(id), + queue_name, + payload, + metadata, + created_at, + ) + .traced() + .execute(&mut *self.conn) + .await?; + + Ok(()) + } +} diff --git a/crates/storage-pg/src/queue/mod.rs b/crates/storage-pg/src/queue/mod.rs index b6ba8295e..eca02b809 100644 --- a/crates/storage-pg/src/queue/mod.rs +++ b/crates/storage-pg/src/queue/mod.rs @@ -5,4 +5,5 @@ //! A module containing the PostgreSQL implementation of the job queue +pub mod job; pub mod worker; diff --git a/crates/storage-pg/src/repository.rs b/crates/storage-pg/src/repository.rs index 99580467c..b5c2b68b2 100644 --- a/crates/storage-pg/src/repository.rs +++ b/crates/storage-pg/src/repository.rs @@ -13,7 +13,6 @@ use mas_storage::{ CompatAccessTokenRepository, CompatRefreshTokenRepository, CompatSessionRepository, CompatSsoLoginRepository, }, - job::JobRepository, oauth2::{ OAuth2AccessTokenRepository, OAuth2AuthorizationGrantRepository, OAuth2ClientRepository, OAuth2DeviceCodeGrantRepository, OAuth2RefreshTokenRepository, OAuth2SessionRepository, @@ -34,13 +33,12 @@ use crate::{ PgCompatAccessTokenRepository, PgCompatRefreshTokenRepository, PgCompatSessionRepository, PgCompatSsoLoginRepository, }, - job::PgJobRepository, oauth2::{ PgOAuth2AccessTokenRepository, PgOAuth2AuthorizationGrantRepository, PgOAuth2ClientRepository, PgOAuth2DeviceCodeGrantRepository, PgOAuth2RefreshTokenRepository, PgOAuth2SessionRepository, }, - queue::worker::PgQueueWorkerRepository, + queue::{job::PgQueueJobRepository, worker::PgQueueWorkerRepository}, upstream_oauth2::{ PgUpstreamOAuthLinkRepository, PgUpstreamOAuthProviderRepository, PgUpstreamOAuthSessionRepository, @@ -261,13 +259,15 @@ where Box::new(PgCompatRefreshTokenRepository::new(self.conn.as_mut())) } - fn job<'c>(&'c mut self) -> Box + 'c> { - Box::new(PgJobRepository::new(self.conn.as_mut())) - } - fn queue_worker<'c>( &'c mut self, ) -> Box + 'c> { Box::new(PgQueueWorkerRepository::new(self.conn.as_mut())) } + + fn queue_job<'c>( + &'c mut self, + ) -> Box + 'c> { + Box::new(PgQueueJobRepository::new(self.conn.as_mut())) + } } diff --git a/crates/storage/Cargo.toml b/crates/storage/Cargo.toml index 53b62c916..22d209df0 100644 --- a/crates/storage/Cargo.toml +++ b/crates/storage/Cargo.toml @@ -14,18 +14,16 @@ workspace = true [dependencies] async-trait.workspace = true chrono.workspace = true -thiserror.workspace = true futures-util.workspace = true - -apalis-core = { version = "0.4.9", features = ["tokio-comp"] } opentelemetry.workspace = true rand_core = "0.6.4" serde.workspace = true serde_json.workspace = true -tracing.workspace = true +thiserror.workspace = true tracing-opentelemetry.workspace = true -url.workspace = true +tracing.workspace = true ulid.workspace = true +url.workspace = true oauth2-types.workspace = true mas-data-model.workspace = true diff --git a/crates/storage/src/job.rs b/crates/storage/src/job.rs deleted file mode 100644 index 7b64c8f0b..000000000 --- a/crates/storage/src/job.rs +++ /dev/null @@ -1,514 +0,0 @@ -// Copyright 2024 New Vector Ltd. -// Copyright 2023, 2024 The Matrix.org Foundation C.I.C. -// -// SPDX-License-Identifier: AGPL-3.0-only -// Please see LICENSE in the repository root for full details. - -//! Repository to schedule persistent jobs. - -use std::{num::ParseIntError, ops::Deref}; - -pub use apalis_core::job::{Job, JobId}; -use async_trait::async_trait; -use opentelemetry::trace::{SpanContext, SpanId, TraceContextExt, TraceFlags, TraceId, TraceState}; -use serde::{Deserialize, Serialize}; -use serde_json::Value; -use tracing_opentelemetry::OpenTelemetrySpanExt; - -use crate::repository_impl; - -/// A job submission to be scheduled through the repository. -pub struct JobSubmission { - name: &'static str, - payload: Value, -} - -#[derive(Serialize, Deserialize)] -struct SerializableSpanContext { - trace_id: String, - span_id: String, - trace_flags: u8, -} - -impl From<&SpanContext> for SerializableSpanContext { - fn from(value: &SpanContext) -> Self { - Self { - trace_id: value.trace_id().to_string(), - span_id: value.span_id().to_string(), - trace_flags: value.trace_flags().to_u8(), - } - } -} - -impl TryFrom<&SerializableSpanContext> for SpanContext { - type Error = ParseIntError; - - fn try_from(value: &SerializableSpanContext) -> Result { - Ok(SpanContext::new( - TraceId::from_hex(&value.trace_id)?, - SpanId::from_hex(&value.span_id)?, - TraceFlags::new(value.trace_flags), - // XXX: is that fine? - true, - TraceState::default(), - )) - } -} - -/// A wrapper for [`Job`] which adds the span context in the payload. -#[derive(Serialize, Deserialize)] -pub struct JobWithSpanContext { - #[serde(skip_serializing_if = "Option::is_none")] - span_context: Option, - - #[serde(flatten)] - payload: T, -} - -impl From for JobWithSpanContext { - fn from(payload: J) -> Self { - Self { - span_context: None, - payload, - } - } -} - -impl Job for JobWithSpanContext { - const NAME: &'static str = J::NAME; -} - -impl Deref for JobWithSpanContext { - type Target = T; - - fn deref(&self) -> &Self::Target { - &self.payload - } -} - -impl JobWithSpanContext { - /// Get the span context of the job. - /// - /// # Returns - /// - /// Returns [`None`] if the job has no span context, or if the span context - /// is invalid. - #[must_use] - pub fn span_context(&self) -> Option { - self.span_context - .as_ref() - .and_then(|ctx| ctx.try_into().ok()) - } -} - -impl JobSubmission { - /// Create a new job submission out of a [`Job`]. - /// - /// # Panics - /// - /// Panics if the job cannot be serialized. - #[must_use] - pub fn new(job: J) -> Self { - let payload = serde_json::to_value(job).expect("Could not serialize job"); - - Self { - name: J::NAME, - payload, - } - } - - /// Create a new job submission out of a [`Job`] and a [`SpanContext`]. - /// - /// # Panics - /// - /// Panics if the job cannot be serialized. - #[must_use] - pub fn new_with_span_context(job: J, span_context: &SpanContext) -> Self { - // Serialize the span context alongside the job. - let span_context = SerializableSpanContext::from(span_context); - - Self::new(JobWithSpanContext { - payload: job, - span_context: Some(span_context), - }) - } - - /// The name of the job. - #[must_use] - pub fn name(&self) -> &'static str { - self.name - } - - /// The payload of the job. - #[must_use] - pub fn payload(&self) -> &Value { - &self.payload - } -} - -/// A [`JobRepository`] is used to schedule jobs to be executed by a worker. -#[async_trait] -pub trait JobRepository: Send + Sync { - /// The error type returned by the repository. - type Error; - - /// Schedule a job submission to be executed at a later time. - /// - /// # Parameters - /// - /// * `submission` - The job to schedule. - /// - /// # Errors - /// - /// Returns [`Self::Error`] if the underlying repository fails - async fn schedule_submission( - &mut self, - submission: JobSubmission, - ) -> Result; -} - -repository_impl!(JobRepository: - async fn schedule_submission(&mut self, submission: JobSubmission) -> Result; -); - -/// An extension trait for [`JobRepository`] to schedule jobs directly. -#[async_trait] -pub trait JobRepositoryExt { - /// The error type returned by the repository. - type Error; - - /// Schedule a job to be executed at a later time. - /// - /// # Parameters - /// - /// * `job` - The job to schedule. - /// - /// # Errors - /// - /// Returns [`Self::Error`] if the underlying repository fails - async fn schedule_job( - &mut self, - job: J, - ) -> Result; -} - -#[async_trait] -impl JobRepositoryExt for T -where - T: JobRepository + ?Sized, -{ - type Error = T::Error; - - #[tracing::instrument( - name = "db.job.schedule_job", - skip_all, - fields( - job.name = J::NAME, - ), - )] - async fn schedule_job( - &mut self, - job: J, - ) -> Result { - let span = tracing::Span::current(); - let ctx = span.context(); - let span = ctx.span(); - let span_context = span.span_context(); - - self.schedule_submission(JobSubmission::new_with_span_context(job, span_context)) - .await - } -} - -mod jobs { - // XXX: Move this somewhere else? - use apalis_core::job::Job; - use mas_data_model::{Device, User, UserEmail, UserRecoverySession}; - use serde::{Deserialize, Serialize}; - use ulid::Ulid; - - /// A job to verify an email address. - #[derive(Serialize, Deserialize, Debug, Clone)] - pub struct VerifyEmailJob { - user_email_id: Ulid, - language: Option, - } - - impl VerifyEmailJob { - /// Create a new job to verify an email address. - #[must_use] - pub fn new(user_email: &UserEmail) -> Self { - Self { - user_email_id: user_email.id, - language: None, - } - } - - /// Set the language to use for the email. - #[must_use] - pub fn with_language(mut self, language: String) -> Self { - self.language = Some(language); - self - } - - /// The language to use for the email. - #[must_use] - pub fn language(&self) -> Option<&str> { - self.language.as_deref() - } - - /// The ID of the email address to verify. - #[must_use] - pub fn user_email_id(&self) -> Ulid { - self.user_email_id - } - } - - impl Job for VerifyEmailJob { - const NAME: &'static str = "verify-email"; - } - - /// A job to provision the user on the homeserver. - #[derive(Serialize, Deserialize, Debug, Clone)] - pub struct ProvisionUserJob { - user_id: Ulid, - set_display_name: Option, - } - - impl ProvisionUserJob { - /// Create a new job to provision the user on the homeserver. - #[must_use] - pub fn new(user: &User) -> Self { - Self { - user_id: user.id, - set_display_name: None, - } - } - - #[doc(hidden)] - #[must_use] - pub fn new_for_id(user_id: Ulid) -> Self { - Self { - user_id, - set_display_name: None, - } - } - - /// Set the display name of the user. - #[must_use] - pub fn set_display_name(mut self, display_name: String) -> Self { - self.set_display_name = Some(display_name); - self - } - - /// Get the display name to be set. - #[must_use] - pub fn display_name_to_set(&self) -> Option<&str> { - self.set_display_name.as_deref() - } - - /// The ID of the user to provision. - #[must_use] - pub fn user_id(&self) -> Ulid { - self.user_id - } - } - - impl Job for ProvisionUserJob { - const NAME: &'static str = "provision-user"; - } - - /// A job to provision a device for a user on the homeserver. - /// - /// This job is deprecated, use the `SyncDevicesJob` instead. It is kept to - /// not break existing jobs in the database. - #[derive(Serialize, Deserialize, Debug, Clone)] - pub struct ProvisionDeviceJob { - user_id: Ulid, - device_id: String, - } - - impl ProvisionDeviceJob { - /// The ID of the user to provision the device for. - #[must_use] - pub fn user_id(&self) -> Ulid { - self.user_id - } - - /// The ID of the device to provision. - #[must_use] - pub fn device_id(&self) -> &str { - &self.device_id - } - } - - impl Job for ProvisionDeviceJob { - const NAME: &'static str = "provision-device"; - } - - /// A job to delete a device for a user on the homeserver. - /// - /// This job is deprecated, use the `SyncDevicesJob` instead. It is kept to - /// not break existing jobs in the database. - #[derive(Serialize, Deserialize, Debug, Clone)] - pub struct DeleteDeviceJob { - user_id: Ulid, - device_id: String, - } - - impl DeleteDeviceJob { - /// Create a new job to delete a device for a user on the homeserver. - #[must_use] - pub fn new(user: &User, device: &Device) -> Self { - Self { - user_id: user.id, - device_id: device.as_str().to_owned(), - } - } - - /// The ID of the user to delete the device for. - #[must_use] - pub fn user_id(&self) -> Ulid { - self.user_id - } - - /// The ID of the device to delete. - #[must_use] - pub fn device_id(&self) -> &str { - &self.device_id - } - } - - impl Job for DeleteDeviceJob { - const NAME: &'static str = "delete-device"; - } - - /// A job which syncs the list of devices of a user with the homeserver - #[derive(Serialize, Deserialize, Debug, Clone)] - pub struct SyncDevicesJob { - user_id: Ulid, - } - - impl SyncDevicesJob { - /// Create a new job to sync the list of devices of a user with the - /// homeserver - #[must_use] - pub fn new(user: &User) -> Self { - Self { user_id: user.id } - } - - /// The ID of the user to sync the devices for - #[must_use] - pub fn user_id(&self) -> Ulid { - self.user_id - } - } - - impl Job for SyncDevicesJob { - const NAME: &'static str = "sync-devices"; - } - - /// A job to deactivate and lock a user - #[derive(Serialize, Deserialize, Debug, Clone)] - pub struct DeactivateUserJob { - user_id: Ulid, - hs_erase: bool, - } - - impl DeactivateUserJob { - /// Create a new job to deactivate and lock a user - /// - /// # Parameters - /// - /// * `user` - The user to deactivate - /// * `hs_erase` - Whether to erase the user from the homeserver - #[must_use] - pub fn new(user: &User, hs_erase: bool) -> Self { - Self { - user_id: user.id, - hs_erase, - } - } - - /// The ID of the user to deactivate - #[must_use] - pub fn user_id(&self) -> Ulid { - self.user_id - } - - /// Whether to erase the user from the homeserver - #[must_use] - pub fn hs_erase(&self) -> bool { - self.hs_erase - } - } - - impl Job for DeactivateUserJob { - const NAME: &'static str = "deactivate-user"; - } - - /// A job to reactivate a user - #[derive(Serialize, Deserialize, Debug, Clone)] - pub struct ReactivateUserJob { - user_id: Ulid, - } - - impl ReactivateUserJob { - /// Create a new job to reactivate a user - /// - /// # Parameters - /// - /// * `user` - The user to reactivate - #[must_use] - pub fn new(user: &User) -> Self { - Self { user_id: user.id } - } - - /// The ID of the user to reactivate - #[must_use] - pub fn user_id(&self) -> Ulid { - self.user_id - } - } - - impl Job for ReactivateUserJob { - const NAME: &'static str = "reactivate-user"; - } - - /// Send account recovery emails - #[derive(Serialize, Deserialize, Debug, Clone)] - pub struct SendAccountRecoveryEmailsJob { - user_recovery_session_id: Ulid, - } - - impl SendAccountRecoveryEmailsJob { - /// Create a new job to send account recovery emails - /// - /// # Parameters - /// - /// * `user_recovery_session` - The user recovery session to send the - /// email for - /// * `language` - The locale to send the email in - #[must_use] - pub fn new(user_recovery_session: &UserRecoverySession) -> Self { - Self { - user_recovery_session_id: user_recovery_session.id, - } - } - - /// The ID of the user recovery session to send the email for - #[must_use] - pub fn user_recovery_session_id(&self) -> Ulid { - self.user_recovery_session_id - } - } - - impl Job for SendAccountRecoveryEmailsJob { - const NAME: &'static str = "send-account-recovery-email"; - } -} - -pub use self::jobs::{ - DeactivateUserJob, DeleteDeviceJob, ProvisionDeviceJob, ProvisionUserJob, ReactivateUserJob, - SendAccountRecoveryEmailsJob, SyncDevicesJob, VerifyEmailJob, -}; diff --git a/crates/storage/src/lib.rs b/crates/storage/src/lib.rs index 30dc553de..cd0d646c3 100644 --- a/crates/storage/src/lib.rs +++ b/crates/storage/src/lib.rs @@ -118,7 +118,6 @@ mod utils; pub mod app_session; pub mod compat; -pub mod job; pub mod oauth2; pub mod queue; pub mod upstream_oauth2; diff --git a/crates/storage/src/queue/job.rs b/crates/storage/src/queue/job.rs new file mode 100644 index 000000000..c5ec3f4f4 --- /dev/null +++ b/crates/storage/src/queue/job.rs @@ -0,0 +1,168 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +//! Repository to interact with jobs in the job queue + +use async_trait::async_trait; +use opentelemetry::trace::TraceContextExt; +use rand_core::RngCore; +use serde::{Deserialize, Serialize}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use ulid::Ulid; + +use crate::{repository_impl, Clock}; + +/// Represents a job in the job queue +pub struct Job { + /// The ID of the job + pub id: Ulid, + + /// The payload of the job + pub payload: serde_json::Value, + + /// Arbitrary metadata about the job + pub metadata: JobMetadata, +} + +/// Metadata stored alongside the job +#[derive(Serialize, Deserialize, Default)] +pub struct JobMetadata { + #[serde(default)] + trace_id: String, + + #[serde(default)] + span_id: String, + + #[serde(default)] + trace_flags: u8, +} + +impl JobMetadata { + fn new(span_context: &opentelemetry::trace::SpanContext) -> Self { + Self { + trace_id: span_context.trace_id().to_string(), + span_id: span_context.span_id().to_string(), + trace_flags: span_context.trace_flags().to_u8(), + } + } + + /// Get the [`opentelemetry::trace::SpanContext`] from this [`JobMetadata`] + #[must_use] + pub fn span_context(&self) -> opentelemetry::trace::SpanContext { + use opentelemetry::trace::{SpanContext, SpanId, TraceFlags, TraceId, TraceState}; + SpanContext::new( + TraceId::from_hex(&self.trace_id).unwrap_or(TraceId::INVALID), + SpanId::from_hex(&self.span_id).unwrap_or(SpanId::INVALID), + TraceFlags::new(self.trace_flags), + // Trace context is remote, as it comes from another service/from the database + true, + TraceState::NONE, + ) + } +} + +/// A trait that represents a job which can be inserted into a queue +pub trait InsertableJob: Serialize + Send { + /// The name of the queue this job belongs to + const QUEUE_NAME: &'static str; +} + +/// A [`QueueJobRepository`] is used to schedule jobs to be executed by a +/// worker. +#[async_trait] +pub trait QueueJobRepository: Send + Sync { + /// The error type returned by the repository. + type Error; + + /// Schedule a job to be executed as soon as possible by a worker. + /// + /// # Parameters + /// + /// * `rng` - The random number generator used to generate a new job ID + /// * `clock` - The clock used to generate timestamps + /// * `queue_name` - The name of the queue to schedule the job on + /// * `payload` - The payload of the job + /// * `metadata` - Arbitrary metadata about the job scheduled immediately. + /// + /// # Errors + /// + /// Returns an error if the underlying repository fails. + async fn schedule( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &dyn Clock, + queue_name: &str, + payload: serde_json::Value, + metadata: serde_json::Value, + ) -> Result<(), Self::Error>; +} + +repository_impl!(QueueJobRepository: + async fn schedule( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &dyn Clock, + queue_name: &str, + payload: serde_json::Value, + metadata: serde_json::Value, + ) -> Result<(), Self::Error>; +); + +/// Extension trait for [`QueueJobRepository`] to help adding a job to the queue +/// through the [`InsertableJob`] trait. This isn't in the +/// [`QueueJobRepository`] trait to keep it object safe. +#[async_trait] +pub trait QueueJobRepositoryExt: QueueJobRepository { + /// Schedule a job to be executed as soon as possible by a worker. + /// + /// # Parameters + /// + /// * `rng` - The random number generator used to generate a new job ID + /// * `clock` - The clock used to generate timestamps + /// * `job` - The job to schedule + /// + /// # Errors + /// + /// Returns an error if the underlying repository fails. + async fn schedule_job( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &dyn Clock, + job: J, + ) -> Result<(), Self::Error>; +} + +#[async_trait] +impl QueueJobRepositoryExt for T +where + T: QueueJobRepository, +{ + #[tracing::instrument( + name = "db.queue_job.schedule_job", + fields( + queue_job.queue_name = J::QUEUE_NAME, + ), + skip_all, + )] + async fn schedule_job( + &mut self, + rng: &mut (dyn RngCore + Send), + clock: &dyn Clock, + job: J, + ) -> Result<(), Self::Error> { + // Grab the span context from the current span + let span = tracing::Span::current(); + let ctx = span.context(); + let span = ctx.span(); + let span_context = span.span_context(); + + let metadata = JobMetadata::new(span_context); + let metadata = serde_json::to_value(metadata).expect("Could not serialize metadata"); + + let payload = serde_json::to_value(job).expect("Could not serialize job"); + self.schedule(rng, clock, J::QUEUE_NAME, payload, metadata) + .await + } +} diff --git a/crates/storage/src/queue/mod.rs b/crates/storage/src/queue/mod.rs index 4ca97ec5e..d02bee5fd 100644 --- a/crates/storage/src/queue/mod.rs +++ b/crates/storage/src/queue/mod.rs @@ -5,6 +5,12 @@ //! A module containing repositories for the job queue +mod job; +mod tasks; mod worker; -pub use self::worker::{QueueWorkerRepository, Worker}; +pub use self::{ + job::{InsertableJob, Job, JobMetadata, QueueJobRepository, QueueJobRepositoryExt}, + tasks::*, + worker::{QueueWorkerRepository, Worker}, +}; diff --git a/crates/storage/src/queue/tasks.rs b/crates/storage/src/queue/tasks.rs new file mode 100644 index 000000000..a2fe85be4 --- /dev/null +++ b/crates/storage/src/queue/tasks.rs @@ -0,0 +1,290 @@ +// Copyright 2024 New Vector Ltd. +// +// SPDX-License-Identifier: AGPL-3.0-only +// Please see LICENSE in the repository root for full details. + +use mas_data_model::{Device, User, UserEmail, UserRecoverySession}; +use serde::{Deserialize, Serialize}; +use ulid::Ulid; + +use super::InsertableJob; + +/// A job to verify an email address. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct VerifyEmailJob { + user_email_id: Ulid, + language: Option, +} + +impl VerifyEmailJob { + /// Create a new job to verify an email address. + #[must_use] + pub fn new(user_email: &UserEmail) -> Self { + Self { + user_email_id: user_email.id, + language: None, + } + } + + /// Set the language to use for the email. + #[must_use] + pub fn with_language(mut self, language: String) -> Self { + self.language = Some(language); + self + } + + /// The language to use for the email. + #[must_use] + pub fn language(&self) -> Option<&str> { + self.language.as_deref() + } + + /// The ID of the email address to verify. + #[must_use] + pub fn user_email_id(&self) -> Ulid { + self.user_email_id + } +} + +impl InsertableJob for VerifyEmailJob { + const QUEUE_NAME: &'static str = "verify-email"; +} + +/// A job to provision the user on the homeserver. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ProvisionUserJob { + user_id: Ulid, + set_display_name: Option, +} + +impl ProvisionUserJob { + /// Create a new job to provision the user on the homeserver. + #[must_use] + pub fn new(user: &User) -> Self { + Self { + user_id: user.id, + set_display_name: None, + } + } + + #[doc(hidden)] + #[must_use] + pub fn new_for_id(user_id: Ulid) -> Self { + Self { + user_id, + set_display_name: None, + } + } + + /// Set the display name of the user. + #[must_use] + pub fn set_display_name(mut self, display_name: String) -> Self { + self.set_display_name = Some(display_name); + self + } + + /// Get the display name to be set. + #[must_use] + pub fn display_name_to_set(&self) -> Option<&str> { + self.set_display_name.as_deref() + } + + /// The ID of the user to provision. + #[must_use] + pub fn user_id(&self) -> Ulid { + self.user_id + } +} + +impl InsertableJob for ProvisionUserJob { + const QUEUE_NAME: &'static str = "provision-user"; +} + +/// A job to provision a device for a user on the homeserver. +/// +/// This job is deprecated, use the `SyncDevicesJob` instead. It is kept to +/// not break existing jobs in the database. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ProvisionDeviceJob { + user_id: Ulid, + device_id: String, +} + +impl ProvisionDeviceJob { + /// The ID of the user to provision the device for. + #[must_use] + pub fn user_id(&self) -> Ulid { + self.user_id + } + + /// The ID of the device to provision. + #[must_use] + pub fn device_id(&self) -> &str { + &self.device_id + } +} + +impl InsertableJob for ProvisionDeviceJob { + const QUEUE_NAME: &'static str = "provision-device"; +} + +/// A job to delete a device for a user on the homeserver. +/// +/// This job is deprecated, use the `SyncDevicesJob` instead. It is kept to +/// not break existing jobs in the database. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct DeleteDeviceJob { + user_id: Ulid, + device_id: String, +} + +impl DeleteDeviceJob { + /// Create a new job to delete a device for a user on the homeserver. + #[must_use] + pub fn new(user: &User, device: &Device) -> Self { + Self { + user_id: user.id, + device_id: device.as_str().to_owned(), + } + } + + /// The ID of the user to delete the device for. + #[must_use] + pub fn user_id(&self) -> Ulid { + self.user_id + } + + /// The ID of the device to delete. + #[must_use] + pub fn device_id(&self) -> &str { + &self.device_id + } +} + +impl InsertableJob for DeleteDeviceJob { + const QUEUE_NAME: &'static str = "delete-device"; +} + +/// A job which syncs the list of devices of a user with the homeserver +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct SyncDevicesJob { + user_id: Ulid, +} + +impl SyncDevicesJob { + /// Create a new job to sync the list of devices of a user with the + /// homeserver + #[must_use] + pub fn new(user: &User) -> Self { + Self { user_id: user.id } + } + + /// The ID of the user to sync the devices for + #[must_use] + pub fn user_id(&self) -> Ulid { + self.user_id + } +} + +impl InsertableJob for SyncDevicesJob { + const QUEUE_NAME: &'static str = "sync-devices"; +} + +/// A job to deactivate and lock a user +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct DeactivateUserJob { + user_id: Ulid, + hs_erase: bool, +} + +impl DeactivateUserJob { + /// Create a new job to deactivate and lock a user + /// + /// # Parameters + /// + /// * `user` - The user to deactivate + /// * `hs_erase` - Whether to erase the user from the homeserver + #[must_use] + pub fn new(user: &User, hs_erase: bool) -> Self { + Self { + user_id: user.id, + hs_erase, + } + } + + /// The ID of the user to deactivate + #[must_use] + pub fn user_id(&self) -> Ulid { + self.user_id + } + + /// Whether to erase the user from the homeserver + #[must_use] + pub fn hs_erase(&self) -> bool { + self.hs_erase + } +} + +impl InsertableJob for DeactivateUserJob { + const QUEUE_NAME: &'static str = "deactivate-user"; +} + +/// A job to reactivate a user +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct ReactivateUserJob { + user_id: Ulid, +} + +impl ReactivateUserJob { + /// Create a new job to reactivate a user + /// + /// # Parameters + /// + /// * `user` - The user to reactivate + #[must_use] + pub fn new(user: &User) -> Self { + Self { user_id: user.id } + } + + /// The ID of the user to reactivate + #[must_use] + pub fn user_id(&self) -> Ulid { + self.user_id + } +} + +impl InsertableJob for ReactivateUserJob { + const QUEUE_NAME: &'static str = "reactivate-user"; +} + +/// Send account recovery emails +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct SendAccountRecoveryEmailsJob { + user_recovery_session_id: Ulid, +} + +impl SendAccountRecoveryEmailsJob { + /// Create a new job to send account recovery emails + /// + /// # Parameters + /// + /// * `user_recovery_session` - The user recovery session to send the email + /// for + /// * `language` - The locale to send the email in + #[must_use] + pub fn new(user_recovery_session: &UserRecoverySession) -> Self { + Self { + user_recovery_session_id: user_recovery_session.id, + } + } + + /// The ID of the user recovery session to send the email for + #[must_use] + pub fn user_recovery_session_id(&self) -> Ulid { + self.user_recovery_session_id + } +} + +impl InsertableJob for SendAccountRecoveryEmailsJob { + const QUEUE_NAME: &'static str = "send-account-recovery-email"; +} diff --git a/crates/storage/src/repository.rs b/crates/storage/src/repository.rs index 55d19d281..161ef05e3 100644 --- a/crates/storage/src/repository.rs +++ b/crates/storage/src/repository.rs @@ -13,12 +13,11 @@ use crate::{ CompatAccessTokenRepository, CompatRefreshTokenRepository, CompatSessionRepository, CompatSsoLoginRepository, }, - job::JobRepository, oauth2::{ OAuth2AccessTokenRepository, OAuth2AuthorizationGrantRepository, OAuth2ClientRepository, OAuth2DeviceCodeGrantRepository, OAuth2RefreshTokenRepository, OAuth2SessionRepository, }, - queue::QueueWorkerRepository, + queue::{QueueJobRepository, QueueWorkerRepository}, upstream_oauth2::{ UpstreamOAuthLinkRepository, UpstreamOAuthProviderRepository, UpstreamOAuthSessionRepository, @@ -190,11 +189,11 @@ pub trait RepositoryAccess: Send { &'c mut self, ) -> Box + 'c>; - /// Get a [`JobRepository`] - fn job<'c>(&'c mut self) -> Box + 'c>; - /// Get a [`QueueWorkerRepository`] fn queue_worker<'c>(&'c mut self) -> Box + 'c>; + + /// Get a [`QueueJobRepository`] + fn queue_job<'c>(&'c mut self) -> Box + 'c>; } /// Implementations of the [`RepositoryAccess`], [`RepositoryTransaction`] and @@ -209,13 +208,12 @@ mod impls { CompatAccessTokenRepository, CompatRefreshTokenRepository, CompatSessionRepository, CompatSsoLoginRepository, }, - job::JobRepository, oauth2::{ OAuth2AccessTokenRepository, OAuth2AuthorizationGrantRepository, OAuth2ClientRepository, OAuth2DeviceCodeGrantRepository, OAuth2RefreshTokenRepository, OAuth2SessionRepository, }, - queue::QueueWorkerRepository, + queue::{QueueJobRepository, QueueWorkerRepository}, upstream_oauth2::{ UpstreamOAuthLinkRepository, UpstreamOAuthProviderRepository, UpstreamOAuthSessionRepository, @@ -407,15 +405,15 @@ mod impls { )) } - fn job<'c>(&'c mut self) -> Box + 'c> { - Box::new(MapErr::new(self.inner.job(), &mut self.mapper)) - } - fn queue_worker<'c>( &'c mut self, ) -> Box + 'c> { Box::new(MapErr::new(self.inner.queue_worker(), &mut self.mapper)) } + + fn queue_job<'c>(&'c mut self) -> Box + 'c> { + Box::new(MapErr::new(self.inner.queue_job(), &mut self.mapper)) + } } impl RepositoryAccess for Box { @@ -535,14 +533,14 @@ mod impls { (**self).compat_refresh_token() } - fn job<'c>(&'c mut self) -> Box + 'c> { - (**self).job() - } - fn queue_worker<'c>( &'c mut self, ) -> Box + 'c> { (**self).queue_worker() } + + fn queue_job<'c>(&'c mut self) -> Box + 'c> { + (**self).queue_job() + } } } diff --git a/crates/tasks/Cargo.toml b/crates/tasks/Cargo.toml index 763b0f5fc..7a18ca0aa 100644 --- a/crates/tasks/Cargo.toml +++ b/crates/tasks/Cargo.toml @@ -13,12 +13,6 @@ workspace = true [dependencies] anyhow.workspace = true -apalis-core = { version = "0.4.9", features = [ - "extensions", - "tokio-comp", - "storage", -] } -apalis-cron = "0.4.9" async-stream = "0.3.6" async-trait.workspace = true chrono.workspace = true diff --git a/crates/tasks/src/email.rs b/crates/tasks/src/email.rs index 90d0aac35..a16ca29dc 100644 --- a/crates/tasks/src/email.rs +++ b/crates/tasks/src/email.rs @@ -9,7 +9,7 @@ use apalis_core::{context::JobContext, executor::TokioExecutor, monitor::Monitor use chrono::Duration; use mas_email::{Address, Mailbox}; use mas_i18n::locale; -use mas_storage::job::{JobWithSpanContext, VerifyEmailJob}; +use mas_storage::{job::JobWithSpanContext, queue::VerifyEmailJob}; use mas_templates::{EmailVerificationContext, TemplateContext}; use rand::{distributions::Uniform, Rng}; use tracing::info; diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index ac85eba14..e56a082c7 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -4,9 +4,10 @@ // SPDX-License-Identifier: AGPL-3.0-only // Please see LICENSE in the repository root for full details. +#![allow(dead_code)] + use std::sync::Arc; -use apalis_core::{executor::TokioExecutor, layers::extensions::Extension, monitor::Monitor}; use mas_email::Mailer; use mas_matrix::HomeserverConnection; use mas_router::UrlBuilder; @@ -16,18 +17,15 @@ use new_queue::QueueRunnerError; use rand::SeedableRng; use sqlx::{Pool, Postgres}; use tokio_util::{sync::CancellationToken, task::TaskTracker}; -use tracing::debug; - -use crate::storage::PostgresStorageFactory; -mod database; -mod email; -mod matrix; +// mod database; +// mod email; +// mod matrix; mod new_queue; -mod recovery; -mod storage; -mod user; -mod utils; +// mod recovery; +// mod storage; +// mod user; +// mod utils; #[derive(Clone)] struct State { @@ -55,10 +53,6 @@ impl State { } } - pub fn inject(&self) -> Extension { - Extension(self.clone()) - } - pub fn pool(&self) -> &Pool { &self.pool } @@ -95,58 +89,19 @@ impl State { } } -trait JobContextExt { - fn state(&self) -> State; -} - -impl JobContextExt for apalis_core::context::JobContext { - fn state(&self) -> State { - self.data_opt::() - .expect("state not injected in job context") - .clone() - } -} - -/// Helper macro to build a storage-backed worker. -macro_rules! build { - ($job:ty => $fn:ident, $suffix:expr, $state:expr, $factory:expr) => {{ - let storage = $factory.build(); - let worker_name = format!( - "{job}-{suffix}", - job = <$job as ::apalis_core::job::Job>::NAME, - suffix = $suffix - ); - - let builder = ::apalis_core::builder::WorkerBuilder::new(worker_name) - .layer($state.inject()) - .layer(crate::utils::trace_layer()) - .layer(crate::utils::metrics_layer()); - - let builder = ::apalis_core::storage::builder::WithStorage::with_storage_config( - builder, - storage, - |c| c.fetch_interval(std::time::Duration::from_secs(1)), - ); - ::apalis_core::builder::WorkerFactory::build(builder, ::apalis_core::job_fn::job_fn($fn)) - }}; -} - -pub(crate) use build; - /// Initialise the workers. /// /// # Errors /// /// This function can fail if the database connection fails. pub async fn init( - name: &str, pool: &Pool, mailer: &Mailer, homeserver: impl HomeserverConnection + 'static, url_builder: UrlBuilder, cancellation_token: CancellationToken, task_tracker: &TaskTracker, -) -> Result, QueueRunnerError> { +) -> Result<(), QueueRunnerError> { let state = State::new( pool.clone(), SystemClock::default(), @@ -154,21 +109,6 @@ pub async fn init( homeserver, url_builder, ); - let factory = PostgresStorageFactory::new(pool.clone()); - let monitor = Monitor::new().executor(TokioExecutor::new()); - let monitor = self::database::register(name, monitor, &state); - let monitor = self::email::register(name, monitor, &state, &factory); - let monitor = self::matrix::register(name, monitor, &state, &factory); - let monitor = self::user::register(name, monitor, &state, &factory); - let monitor = self::recovery::register(name, monitor, &state, &factory); - // TODO: we might want to grab the join handle here - // TODO: this error isn't right, I just want that to compile - factory - .listen() - .await - .map_err(QueueRunnerError::SetupListener)?; - debug!(?monitor, "workers registered"); - let mut worker = self::new_queue::QueueWorker::new(state, cancellation_token).await?; task_tracker.spawn(async move { @@ -180,5 +120,5 @@ pub async fn init( } }); - Ok(monitor) + Ok(()) } diff --git a/crates/tasks/src/matrix.rs b/crates/tasks/src/matrix.rs index ad1cb591e..3cc09b272 100644 --- a/crates/tasks/src/matrix.rs +++ b/crates/tasks/src/matrix.rs @@ -12,11 +12,9 @@ use mas_data_model::Device; use mas_matrix::ProvisionRequest; use mas_storage::{ compat::CompatSessionFilter, - job::{ - DeleteDeviceJob, JobRepositoryExt as _, JobWithSpanContext, ProvisionDeviceJob, - ProvisionUserJob, SyncDevicesJob, - }, + job::{JobRepositoryExt as _, JobWithSpanContext}, oauth2::OAuth2SessionFilter, + queue::{DeleteDeviceJob, ProvisionDeviceJob, ProvisionUserJob, SyncDevicesJob}, user::{UserEmailRepository, UserRepository}, Pagination, RepositoryAccess, }; diff --git a/crates/tasks/src/recovery.rs b/crates/tasks/src/recovery.rs index 142f9f165..79f469b06 100644 --- a/crates/tasks/src/recovery.rs +++ b/crates/tasks/src/recovery.rs @@ -9,7 +9,8 @@ use apalis_core::{context::JobContext, executor::TokioExecutor, monitor::Monitor use mas_email::{Address, Mailbox}; use mas_i18n::DataLocale; use mas_storage::{ - job::{JobWithSpanContext, SendAccountRecoveryEmailsJob}, + job::JobWithSpanContext, + queue::SendAccountRecoveryEmailsJob, user::{UserEmailFilter, UserRecoveryRepository}, Pagination, RepositoryAccess, }; diff --git a/crates/tasks/src/user.rs b/crates/tasks/src/user.rs index 76f81d20e..b3d062bb4 100644 --- a/crates/tasks/src/user.rs +++ b/crates/tasks/src/user.rs @@ -8,8 +8,9 @@ use anyhow::Context; use apalis_core::{context::JobContext, executor::TokioExecutor, monitor::Monitor}; use mas_storage::{ compat::CompatSessionFilter, - job::{DeactivateUserJob, JobWithSpanContext, ReactivateUserJob}, + job::JobWithSpanContext, oauth2::OAuth2SessionFilter, + queue::{DeactivateUserJob, ReactivateUserJob}, user::{BrowserSessionFilter, UserRepository}, RepositoryAccess, }; diff --git a/deny.toml b/deny.toml index 571923c35..0adab1ff9 100644 --- a/deny.toml +++ b/deny.toml @@ -69,13 +69,8 @@ skip = [ { name = "heck", version = "0.4.1" }, # wasmtime -> cranelift is depending on this old version { name = "itertools", version = "0.12.1" }, - # apalis-core depends on this old version - { name = "strum", version = "0.25.0" }, - { name = "strum_macros", version = "0.25.0" }, # For some reason, axum-core depends on this old version, even though axum is on the new one { name = "sync_wrapper", version = "0.1.2" }, - # `apalis` depends on this old version of tower - { name = "tower", version = "0.4.13" }, ] skip-tree = []