Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a0aeb2d
New job queue: worker registration and leader election
sandhose Oct 7, 2024
6363264
Make the worker heartbeat take a worker reference
sandhose Oct 9, 2024
8c1a87b
Move the worker logic in a struct
sandhose Oct 14, 2024
d3e51a1
TEMP: use patched sqlx
sandhose Oct 8, 2024
f9c8ade
Graceful shutdown
sandhose Oct 14, 2024
2692d9a
Use the database time for leader election
sandhose Nov 19, 2024
874ae39
WIP jobs
sandhose Oct 14, 2024
f683bc9
Move the jobs types in the queue module
sandhose Oct 15, 2024
fdc3d9a
Schedule jobs through the new queue
sandhose Oct 15, 2024
4994761
WIP: job consumption
sandhose Oct 15, 2024
880349c
Actually consume jobs
sandhose Oct 31, 2024
ca44be7
Decide in each job whether it should retry or not
sandhose Nov 20, 2024
390a65c
Retry failed jobs
sandhose Nov 20, 2024
6d92fb1
Refactor job processing to wait for them to finish on shutdown
sandhose Nov 21, 2024
7b120f1
Allow scheduling jobs in the future
sandhose Nov 22, 2024
8dbc914
Cron-like recurring jobs
sandhose Nov 25, 2024
45b4fcc
Remove the schedule_expression from the database & other fixes
sandhose Dec 5, 2024
9328647
Merge pull request #3307 from element-hq/quenting/new-queue/initial
sandhose Dec 6, 2024
3395526
Merge pull request #3367 from element-hq/quenting/new-queue/insert-jobs
sandhose Dec 6, 2024
fcc99bc
Merge pull request #3455 from element-hq/quenting/new-queue/jobs
sandhose Dec 6, 2024
82e3438
Merge pull request #3558 from element-hq/quenting/new-queue/scheduled…
sandhose Dec 6, 2024
5ffc48a
Merge pull request #3623 from element-hq/quenting/new-queue/cron
sandhose Dec 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 27 additions & 144 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ features = ["serde", "clock"]
version = "4.5.21"
features = ["derive"]

# Cron expressions
[workspace.dependencies.cron]
version = "0.13.0"

# Elliptic curve cryptography
[workspace.dependencies.elliptic-curve]
version = "0.13.8"
Expand Down Expand Up @@ -373,3 +377,10 @@ rayon.opt-level = 3
regalloc2.opt-level = 3
sha2.opt-level = 3
sqlx-macros.opt-level = 3

[patch.crates-io]
sqlx = { git = "https://github.com/launchbadge/sqlx.git", branch = "main" }
sqlx-core = { git = "https://github.com/launchbadge/sqlx.git", branch = "main" }
sqlx-macros = { git = "https://github.com/launchbadge/sqlx.git", branch = "main" }
sqlx-macros-core = { git = "https://github.com/launchbadge/sqlx.git", branch = "main" }
sqlx-postgres = { git = "https://github.com/launchbadge/sqlx.git", branch = "main" }
25 changes: 15 additions & 10 deletions crates/cli/src/commands/manage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ use mas_matrix::HomeserverConnection;
use mas_matrix_synapse::SynapseConnection;
use mas_storage::{
compat::{CompatAccessTokenRepository, CompatSessionFilter, CompatSessionRepository},
job::{
DeactivateUserJob, JobRepositoryExt, ProvisionUserJob, ReactivateUserJob, SyncDevicesJob,
},
oauth2::OAuth2SessionFilter,
queue::{
DeactivateUserJob, ProvisionUserJob, QueueJobRepositoryExt as _, ReactivateUserJob,
SyncDevicesJob,
},
user::{BrowserSessionFilter, UserEmailRepository, UserPasswordRepository, UserRepository},
Clock, RepositoryAccess, SystemClock,
};
Expand Down Expand Up @@ -366,7 +367,7 @@ impl Options {
let id = id.into();
info!(user.id = %id, "Scheduling provisioning job");
let job = ProvisionUserJob::new_for_id(id);
repo.job().schedule_job(job).await?;
repo.queue_job().schedule_job(&mut rng, &clock, job).await?;
}

repo.into_inner().commit().await?;
Expand Down Expand Up @@ -429,7 +430,9 @@ impl Options {

// Schedule a job to sync the devices of the user with the homeserver
warn!("Scheduling job to sync devices for the user");
repo.job().schedule_job(SyncDevicesJob::new(&user)).await?;
repo.queue_job()
.schedule_job(&mut rng, &clock, SyncDevicesJob::new(&user))
.await?;

let txn = repo.into_inner();
if dry_run {
Expand Down Expand Up @@ -467,8 +470,8 @@ impl Options {

if deactivate {
warn!(%user.id, "Scheduling user deactivation");
repo.job()
.schedule_job(DeactivateUserJob::new(&user, false))
repo.queue_job()
.schedule_job(&mut rng, &clock, DeactivateUserJob::new(&user, false))
.await?;
}

Expand All @@ -491,8 +494,8 @@ impl Options {
.context("User not found")?;

warn!(%user.id, "User scheduling user reactivation");
repo.job()
.schedule_job(ReactivateUserJob::new(&user))
repo.queue_job()
.schedule_job(&mut rng, &clock, ReactivateUserJob::new(&user))
.await?;

repo.into_inner().commit().await?;
Expand Down Expand Up @@ -975,7 +978,9 @@ impl UserCreationRequest<'_> {
provision_job = provision_job.set_display_name(display_name);
}

repo.job().schedule_job(provision_job).await?;
repo.queue_job()
.schedule_job(rng, clock, provision_job)
.await?;

Ok(user)
}
Expand Down
30 changes: 4 additions & 26 deletions crates/cli/src/commands/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@ use mas_matrix_synapse::SynapseConnection;
use mas_router::UrlBuilder;
use mas_storage::SystemClock;
use mas_storage_pg::MIGRATOR;
use rand::{
distributions::{Alphanumeric, DistString},
thread_rng,
};
use sqlx::migrate::Migrate;
use tracing::{info, info_span, warn, Instrument};

Expand Down Expand Up @@ -161,34 +157,16 @@ impl Options {
let mailer = mailer_from_config(&config.email, &templates)?;
mailer.test_connection().await?;

#[allow(clippy::disallowed_methods)]
let mut rng = thread_rng();
let worker_name = Alphanumeric.sample_string(&mut rng, 10);

info!(worker_name, "Starting task worker");
let monitor = mas_tasks::init(
&worker_name,
info!("Starting task worker");
mas_tasks::init(
&pool,
&mailer,
homeserver_connection.clone(),
url_builder.clone(),
shutdown.soft_shutdown_token(),
shutdown.task_tracker(),
)
.await?;

// XXX: The monitor from apalis is a bit annoying to use for graceful shutdowns,
// ideally we'd just give it a cancellation token
let shutdown_future = shutdown.soft_shutdown_token().cancelled_owned();
shutdown.task_tracker().spawn(async move {
if let Err(e) = monitor
.run_with_signal(async move {
shutdown_future.await;
Ok(())
})
.await
{
tracing::error!(error = &e as &dyn std::error::Error, "Task worker failed");
}
});
}

let listeners_config = config.http.listeners.clone();
Expand Down
32 changes: 19 additions & 13 deletions crates/cli/src/commands/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,22 @@ use figment::Figment;
use mas_config::{AppConfig, ConfigurationSection};
use mas_matrix_synapse::SynapseConnection;
use mas_router::UrlBuilder;
use rand::{
distributions::{Alphanumeric, DistString},
thread_rng,
};
use tracing::{info, info_span};

use crate::util::{
database_pool_from_config, mailer_from_config, site_config_from_config, templates_from_config,
use crate::{
shutdown::ShutdownManager,
util::{
database_pool_from_config, mailer_from_config, site_config_from_config,
templates_from_config,
},
};

#[derive(Parser, Debug, Default)]
pub(super) struct Options {}

impl Options {
pub async fn run(self, figment: &Figment) -> anyhow::Result<ExitCode> {
let shutdown = ShutdownManager::new()?;
let span = info_span!("cli.worker.init").entered();
let config = AppConfig::extract(figment)?;

Expand Down Expand Up @@ -66,16 +67,21 @@ impl Options {

drop(config);

#[allow(clippy::disallowed_methods)]
let mut rng = thread_rng();
let worker_name = Alphanumeric.sample_string(&mut rng, 10);

info!(worker_name, "Starting task scheduler");
let monitor = mas_tasks::init(&worker_name, &pool, &mailer, conn, url_builder).await?;
info!("Starting task scheduler");
mas_tasks::init(
&pool,
&mailer,
conn,
url_builder,
shutdown.soft_shutdown_token(),
shutdown.task_tracker(),
)
.await?;

span.exit();

monitor.run().await?;
shutdown.run().await;

Ok(ExitCode::SUCCESS)
}
}
6 changes: 3 additions & 3 deletions crates/handlers/src/admin/v1/users/add.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use axum::{extract::State, response::IntoResponse, Json};
use hyper::StatusCode;
use mas_matrix::BoxHomeserverConnection;
use mas_storage::{
job::{JobRepositoryExt, ProvisionUserJob},
queue::{ProvisionUserJob, QueueJobRepositoryExt as _},
BoxRng,
};
use schemars::JsonSchema;
Expand Down Expand Up @@ -164,8 +164,8 @@ pub async fn handler(

let user = repo.user().add(&mut rng, &clock, params.username).await?;

repo.job()
.schedule_job(ProvisionUserJob::new(&user))
repo.queue_job()
.schedule_job(&mut rng, &clock, ProvisionUserJob::new(&user))
.await?;

repo.save().await?;
Expand Down
34 changes: 20 additions & 14 deletions crates/handlers/src/admin/v1/users/deactivate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.

use aide::{transform::TransformOperation, OperationIo};
use aide::{transform::TransformOperation, NoApi, OperationIo};
use axum::{response::IntoResponse, Json};
use hyper::StatusCode;
use mas_storage::job::{DeactivateUserJob, JobRepositoryExt};
use mas_storage::{
queue::{DeactivateUserJob, QueueJobRepositoryExt as _},
BoxRng,
};
use tracing::info;
use ulid::Ulid;

Expand Down Expand Up @@ -69,6 +72,7 @@ pub async fn handler(
CallContext {
mut repo, clock, ..
}: CallContext,
NoApi(mut rng): NoApi<BoxRng>,
id: UlidPathParam,
) -> Result<Json<SingleResponse<User>>, RouteError> {
let id = *id;
Expand All @@ -83,8 +87,8 @@ pub async fn handler(
}

info!("Scheduling deactivation of user {}", user.id);
repo.job()
.schedule_job(DeactivateUserJob::new(&user, true))
repo.queue_job()
.schedule_job(&mut rng, &clock, DeactivateUserJob::new(&user, true))
.await?;

repo.save().await?;
Expand Down Expand Up @@ -133,11 +137,12 @@ mod tests {

// It should have scheduled a deactivation job for the user
// XXX: we don't have a good way to look for the deactivation job
let job: Json<serde_json::Value> =
sqlx::query_scalar("SELECT job FROM apalis.jobs WHERE job_type = 'deactivate-user'")
.fetch_one(&pool)
.await
.expect("Deactivation job to be scheduled");
let job: Json<serde_json::Value> = sqlx::query_scalar(
"SELECT payload FROM queue_jobs WHERE queue_name = 'deactivate-user'",
)
.fetch_one(&pool)
.await
.expect("Deactivation job to be scheduled");
assert_eq!(job["user_id"], serde_json::json!(user.id));
}

Expand Down Expand Up @@ -174,11 +179,12 @@ mod tests {

// It should have scheduled a deactivation job for the user
// XXX: we don't have a good way to look for the deactivation job
let job: Json<serde_json::Value> =
sqlx::query_scalar("SELECT job FROM apalis.jobs WHERE job_type = 'deactivate-user'")
.fetch_one(&pool)
.await
.expect("Deactivation job to be scheduled");
let job: Json<serde_json::Value> = sqlx::query_scalar(
"SELECT payload FROM queue_jobs WHERE queue_name = 'deactivate-user'",
)
.fetch_one(&pool)
.await
.expect("Deactivation job to be scheduled");
assert_eq!(job["user_id"], serde_json::json!(user.id));
}

Expand Down
9 changes: 6 additions & 3 deletions crates/handlers/src/compat/logout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use mas_axum_utils::sentry::SentryEventID;
use mas_data_model::TokenType;
use mas_storage::{
compat::{CompatAccessTokenRepository, CompatSessionRepository},
job::{JobRepositoryExt, SyncDevicesJob},
BoxClock, BoxRepository, Clock, RepositoryAccess,
queue::{QueueJobRepositoryExt as _, SyncDevicesJob},
BoxClock, BoxRepository, BoxRng, Clock, RepositoryAccess,
};
use thiserror::Error;

Expand Down Expand Up @@ -65,6 +65,7 @@ impl IntoResponse for RouteError {
#[tracing::instrument(name = "handlers.compat.logout.post", skip_all, err)]
pub(crate) async fn post(
clock: BoxClock,
mut rng: BoxRng,
mut repo: BoxRepository,
activity_tracker: BoundActivityTracker,
maybe_authorization: Option<TypedHeader<Authorization<Bearer>>>,
Expand Down Expand Up @@ -104,7 +105,9 @@ pub(crate) async fn post(
.ok_or(RouteError::InvalidAuthorization)?;

// Schedule a job to sync the devices of the user with the homeserver
repo.job().schedule_job(SyncDevicesJob::new(&user)).await?;
repo.queue_job()
.schedule_job(&mut rng, &clock, SyncDevicesJob::new(&user))
.await?;

repo.compat_session().finish(&clock, session).await?;

Expand Down
7 changes: 5 additions & 2 deletions crates/handlers/src/graphql/mutations/compat_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use anyhow::Context as _;
use async_graphql::{Context, Enum, InputObject, Object, ID};
use mas_storage::{
compat::CompatSessionRepository,
job::{JobRepositoryExt, SyncDevicesJob},
queue::{QueueJobRepositoryExt as _, SyncDevicesJob},
RepositoryAccess,
};

Expand Down Expand Up @@ -72,6 +72,7 @@ impl CompatSessionMutations {
input: EndCompatSessionInput,
) -> Result<EndCompatSessionPayload, async_graphql::Error> {
let state = ctx.state();
let mut rng = state.rng();
let compat_session_id = NodeType::CompatSession.extract_ulid(&input.compat_session_id)?;
let requester = ctx.requester();

Expand All @@ -94,7 +95,9 @@ impl CompatSessionMutations {
.context("Could not load user")?;

// Schedule a job to sync the devices of the user with the homeserver
repo.job().schedule_job(SyncDevicesJob::new(&user)).await?;
repo.queue_job()
.schedule_job(&mut rng, &clock, SyncDevicesJob::new(&user))
.await?;

let session = repo.compat_session().finish(&clock, session).await?;

Expand Down
7 changes: 5 additions & 2 deletions crates/handlers/src/graphql/mutations/oauth2_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ use async_graphql::{Context, Description, Enum, InputObject, Object, ID};
use chrono::Duration;
use mas_data_model::{Device, TokenType};
use mas_storage::{
job::{JobRepositoryExt, SyncDevicesJob},
oauth2::{
OAuth2AccessTokenRepository, OAuth2ClientRepository, OAuth2RefreshTokenRepository,
OAuth2SessionRepository,
},
queue::{QueueJobRepositoryExt as _, SyncDevicesJob},
user::UserRepository,
RepositoryAccess,
};
Expand Down Expand Up @@ -217,6 +217,7 @@ impl OAuth2SessionMutations {

let mut repo = state.repository().await?;
let clock = state.clock();
let mut rng = state.rng();

let session = repo.oauth2_session().lookup(oauth2_session_id).await?;
let Some(session) = session else {
Expand All @@ -235,7 +236,9 @@ impl OAuth2SessionMutations {
.context("Could not load user")?;

// Schedule a job to sync the devices of the user with the homeserver
repo.job().schedule_job(SyncDevicesJob::new(&user)).await?;
repo.queue_job()
.schedule_job(&mut rng, &clock, SyncDevicesJob::new(&user))
.await?;
}

let session = repo.oauth2_session().finish(&clock, session).await?;
Expand Down
Loading
Loading