Skip to content

Commit 45f15e1

Browse files
committed
Allow setting a custom clock on the QueueWorker & add one to the
TestState
1 parent f69855e commit 45f15e1

File tree

7 files changed

+78
-13
lines changed

7 files changed

+78
-13
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/cli/src/commands/server.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,9 @@ impl Options {
173173
test_mailer_in_background(&mailer, Duration::from_secs(30));
174174

175175
info!("Starting task worker");
176-
mas_tasks::init(
176+
mas_tasks::init_and_run(
177177
PgRepositoryFactory::new(pool.clone()),
178+
SystemClock::default(),
178179
&mailer,
179180
homeserver_connection.clone(),
180181
url_builder.clone(),

crates/cli/src/commands/worker.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use clap::Parser;
1010
use figment::Figment;
1111
use mas_config::{AppConfig, ConfigurationSection};
1212
use mas_router::UrlBuilder;
13+
use mas_storage::SystemClock;
1314
use mas_storage_pg::PgRepositoryFactory;
1415
use tracing::{info, info_span};
1516

@@ -63,8 +64,9 @@ impl Options {
6364
drop(config);
6465

6566
info!("Starting task scheduler");
66-
mas_tasks::init(
67+
mas_tasks::init_and_run(
6768
PgRepositoryFactory::new(pool.clone()),
69+
SystemClock::default(),
6870
&mailer,
6971
conn,
7072
url_builder,

crates/handlers/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ mas-axum-utils.workspace = true
7272
mas-config.workspace = true
7373
mas-context.workspace = true
7474
mas-data-model.workspace = true
75+
mas-email.workspace = true
7576
mas-http.workspace = true
7677
mas-i18n.workspace = true
7778
mas-iana.workspace = true
@@ -83,6 +84,7 @@ mas-policy.workspace = true
8384
mas-router.workspace = true
8485
mas-storage.workspace = true
8586
mas-storage-pg.workspace = true
87+
mas-tasks.workspace = true
8688
mas-templates.workspace = true
8789
oauth2-types.workspace = true
8890
zxcvbn.workspace = true

crates/handlers/src/test_utils.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ use mas_axum_utils::{
2929
};
3030
use mas_config::RateLimitingConfig;
3131
use mas_data_model::SiteConfig;
32+
use mas_email::{MailTransport, Mailer};
3233
use mas_i18n::Translator;
3334
use mas_keystore::{Encrypter, JsonWebKey, JsonWebKeySet, Keystore, PrivateKey};
3435
use mas_matrix::{HomeserverConnection, MockHomeserverConnection};
@@ -39,6 +40,7 @@ use mas_storage::{
3940
clock::MockClock,
4041
};
4142
use mas_storage_pg::PgRepositoryFactory;
43+
use mas_tasks::QueueWorker;
4244
use mas_templates::{SiteConfigExt, Templates};
4345
use oauth2_types::{registration::ClientRegistrationResponse, requests::AccessTokenResponse};
4446
use rand::SeedableRng;
@@ -113,6 +115,7 @@ pub(crate) struct TestState {
113115
pub rng: Arc<Mutex<ChaChaRng>>,
114116
pub http_client: reqwest::Client,
115117
pub task_tracker: TaskTracker,
118+
queue_worker: Arc<Mutex<QueueWorker>>,
116119

117120
#[allow(dead_code)] // It is used, as it will cancel the CancellationToken when dropped
118121
cancellation_drop_guard: Arc<DropGuard>,
@@ -235,6 +238,27 @@ impl TestState {
235238
shutdown_token.child_token(),
236239
);
237240

241+
let mailer = Mailer::new(
242+
templates.clone(),
243+
MailTransport::blackhole(),
244+
"[email protected]".parse().unwrap(),
245+
"[email protected]".parse().unwrap(),
246+
);
247+
248+
let queue_worker = mas_tasks::init(
249+
PgRepositoryFactory::new(pool.clone()),
250+
Arc::clone(&clock),
251+
&mailer,
252+
homeserver_connection.clone(),
253+
url_builder.clone(),
254+
&site_config,
255+
shutdown_token.child_token(),
256+
)
257+
.await
258+
.unwrap();
259+
260+
let queue_worker = Arc::new(Mutex::new(queue_worker));
261+
238262
Ok(Self {
239263
repository_factory: PgRepositoryFactory::new(pool),
240264
templates,
@@ -254,6 +278,7 @@ impl TestState {
254278
rng,
255279
http_client,
256280
task_tracker,
281+
queue_worker,
257282
cancellation_drop_guard: Arc::new(shutdown_token.drop_guard()),
258283
})
259284
}

crates/tasks/src/lib.rs

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,16 @@ use mas_data_model::SiteConfig;
1010
use mas_email::Mailer;
1111
use mas_matrix::HomeserverConnection;
1212
use mas_router::UrlBuilder;
13-
use mas_storage::{BoxRepository, Clock, RepositoryError, RepositoryFactory, SystemClock};
13+
use mas_storage::{BoxRepository, Clock, RepositoryError, RepositoryFactory};
1414
use mas_storage_pg::PgRepositoryFactory;
1515
use new_queue::QueueRunnerError;
1616
use opentelemetry::metrics::Meter;
1717
use rand::SeedableRng;
1818
use sqlx::{Pool, Postgres};
1919
use tokio_util::{sync::CancellationToken, task::TaskTracker};
2020

21+
pub use crate::new_queue::QueueWorker;
22+
2123
mod database;
2224
mod email;
2325
mod matrix;
@@ -99,29 +101,31 @@ impl State {
99101
}
100102
}
101103

102-
/// Initialise the workers.
104+
/// Initialise the worker, without running it.
105+
///
106+
/// This is mostly useful for tests.
103107
///
104108
/// # Errors
105109
///
106110
/// This function can fail if the database connection fails.
107111
pub async fn init(
108112
repository_factory: PgRepositoryFactory,
113+
clock: impl Clock + 'static,
109114
mailer: &Mailer,
110115
homeserver: impl HomeserverConnection + 'static,
111116
url_builder: UrlBuilder,
112117
site_config: &SiteConfig,
113118
cancellation_token: CancellationToken,
114-
task_tracker: &TaskTracker,
115-
) -> Result<(), QueueRunnerError> {
119+
) -> Result<QueueWorker, QueueRunnerError> {
116120
let state = State::new(
117121
repository_factory,
118-
SystemClock::default(),
122+
clock,
119123
mailer.clone(),
120124
homeserver,
121125
url_builder,
122126
site_config.clone(),
123127
);
124-
let mut worker = self::new_queue::QueueWorker::new(state, cancellation_token).await?;
128+
let mut worker = QueueWorker::new(state, cancellation_token).await?;
125129

126130
worker
127131
.register_handler::<mas_storage::queue::CleanupExpiredTokensJob>()
@@ -157,6 +161,35 @@ pub async fn init(
157161
mas_storage::queue::PruneStalePolicyDataJob,
158162
);
159163

164+
Ok(worker)
165+
}
166+
167+
/// Initialise the worker and run it.
168+
///
169+
/// # Errors
170+
///
171+
/// This function can fail if the database connection fails.
172+
pub async fn init_and_run(
173+
repository_factory: PgRepositoryFactory,
174+
clock: impl Clock + 'static,
175+
mailer: &Mailer,
176+
homeserver: impl HomeserverConnection + 'static,
177+
url_builder: UrlBuilder,
178+
site_config: &SiteConfig,
179+
cancellation_token: CancellationToken,
180+
task_tracker: &TaskTracker,
181+
) -> Result<(), QueueRunnerError> {
182+
let worker = init(
183+
repository_factory,
184+
clock,
185+
mailer,
186+
homeserver,
187+
url_builder,
188+
site_config,
189+
cancellation_token,
190+
)
191+
.await?;
192+
160193
task_tracker.spawn(worker.run());
161194

162195
Ok(())

crates/tasks/src/new_queue.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ impl QueueWorker {
214214
skip_all,
215215
fields(worker.id)
216216
)]
217-
pub async fn new(
217+
pub(crate) async fn new(
218218
state: State,
219219
cancellation_token: CancellationToken,
220220
) -> Result<Self, QueueRunnerError> {
@@ -289,7 +289,7 @@ impl QueueWorker {
289289
})
290290
}
291291

292-
pub fn register_handler<T: RunnableJob + InsertableJob>(&mut self) -> &mut Self {
292+
pub(crate) fn register_handler<T: RunnableJob + InsertableJob>(&mut self) -> &mut Self {
293293
// There is a potential panic here, which is fine as it's going to be caught
294294
// within the job task
295295
let factory = |payload: JobPayload| {
@@ -302,7 +302,7 @@ impl QueueWorker {
302302
self
303303
}
304304

305-
pub fn add_schedule<T: InsertableJob>(
305+
pub(crate) fn add_schedule<T: InsertableJob>(
306306
&mut self,
307307
schedule_name: &'static str,
308308
expression: Schedule,
@@ -320,7 +320,7 @@ impl QueueWorker {
320320
self
321321
}
322322

323-
pub async fn run(mut self) {
323+
pub(crate) async fn run(mut self) {
324324
if let Err(e) = self.run_inner().await {
325325
tracing::error!(
326326
error = &e as &dyn std::error::Error,
@@ -344,7 +344,7 @@ impl QueueWorker {
344344
}
345345

346346
#[tracing::instrument(name = "worker.setup_schedules", skip_all)]
347-
pub async fn setup_schedules(&mut self) -> Result<(), QueueRunnerError> {
347+
pub(crate) async fn setup_schedules(&mut self) -> Result<(), QueueRunnerError> {
348348
let schedules: Vec<_> = self.schedules.iter().map(|s| s.schedule_name).collect();
349349

350350
// Start a transaction on the existing PgListener connection

0 commit comments

Comments
 (0)