@@ -10,14 +10,16 @@ use mas_data_model::SiteConfig;
1010use mas_email:: Mailer ;
1111use mas_matrix:: HomeserverConnection ;
1212use mas_router:: UrlBuilder ;
13- use mas_storage:: { BoxClock , BoxRepository , RepositoryError , RepositoryFactory , SystemClock } ;
13+ use mas_storage:: { BoxRepository , Clock , RepositoryError , RepositoryFactory } ;
1414use mas_storage_pg:: PgRepositoryFactory ;
1515use new_queue:: QueueRunnerError ;
1616use opentelemetry:: metrics:: Meter ;
1717use rand:: SeedableRng ;
1818use sqlx:: { Pool , Postgres } ;
1919use tokio_util:: { sync:: CancellationToken , task:: TaskTracker } ;
2020
21+ pub use crate :: new_queue:: QueueWorker ;
22+
2123mod database;
2224mod email;
2325mod matrix;
@@ -39,7 +41,7 @@ static METER: LazyLock<Meter> = LazyLock::new(|| {
3941struct State {
4042 repository_factory : PgRepositoryFactory ,
4143 mailer : Mailer ,
42- clock : SystemClock ,
44+ clock : Arc < dyn Clock > ,
4345 homeserver : Arc < dyn HomeserverConnection > ,
4446 url_builder : UrlBuilder ,
4547 site_config : SiteConfig ,
@@ -48,7 +50,7 @@ struct State {
4850impl State {
4951 pub fn new (
5052 repository_factory : PgRepositoryFactory ,
51- clock : SystemClock ,
53+ clock : impl Clock + ' static ,
5254 mailer : Mailer ,
5355 homeserver : impl HomeserverConnection + ' static ,
5456 url_builder : UrlBuilder ,
@@ -57,7 +59,7 @@ impl State {
5759 Self {
5860 repository_factory,
5961 mailer,
60- clock,
62+ clock : Arc :: new ( clock ) ,
6163 homeserver : Arc :: new ( homeserver) ,
6264 url_builder,
6365 site_config,
@@ -68,8 +70,8 @@ impl State {
6870 self . repository_factory . pool ( )
6971 }
7072
71- pub fn clock ( & self ) -> BoxClock {
72- Box :: new ( self . clock . clone ( ) )
73+ pub fn clock ( & self ) -> & dyn Clock {
74+ & self . clock
7375 }
7476
7577 pub fn mailer ( & self ) -> & Mailer {
@@ -99,29 +101,31 @@ impl State {
99101 }
100102}
101103
102- /// Initialise the workers.
104+ /// Initialise the worker, without running it.
105+ ///
106+ /// This is mostly useful for tests.
103107///
104108/// # Errors
105109///
106110/// This function can fail if the database connection fails.
107111pub async fn init (
108112 repository_factory : PgRepositoryFactory ,
113+ clock : impl Clock + ' static ,
109114 mailer : & Mailer ,
110115 homeserver : impl HomeserverConnection + ' static ,
111116 url_builder : UrlBuilder ,
112117 site_config : & SiteConfig ,
113118 cancellation_token : CancellationToken ,
114- task_tracker : & TaskTracker ,
115- ) -> Result < ( ) , QueueRunnerError > {
119+ ) -> Result < QueueWorker , QueueRunnerError > {
116120 let state = State :: new (
117121 repository_factory,
118- SystemClock :: default ( ) ,
122+ clock ,
119123 mailer. clone ( ) ,
120124 homeserver,
121125 url_builder,
122126 site_config. clone ( ) ,
123127 ) ;
124- let mut worker = self :: new_queue :: QueueWorker :: new ( state, cancellation_token) . await ?;
128+ let mut worker = QueueWorker :: new ( state, cancellation_token) . await ?;
125129
126130 worker
127131 . register_handler :: < mas_storage:: queue:: CleanupExpiredTokensJob > ( )
@@ -157,6 +161,36 @@ pub async fn init(
157161 mas_storage:: queue:: PruneStalePolicyDataJob ,
158162 ) ;
159163
164+ Ok ( worker)
165+ }
166+
167+ /// Initialise the worker and run it.
168+ ///
169+ /// # Errors
170+ ///
171+ /// This function can fail if the database connection fails.
172+ #[ expect( clippy:: too_many_arguments, reason = "this is fine" ) ]
173+ pub async fn init_and_run (
174+ repository_factory : PgRepositoryFactory ,
175+ clock : impl Clock + ' static ,
176+ mailer : & Mailer ,
177+ homeserver : impl HomeserverConnection + ' static ,
178+ url_builder : UrlBuilder ,
179+ site_config : & SiteConfig ,
180+ cancellation_token : CancellationToken ,
181+ task_tracker : & TaskTracker ,
182+ ) -> Result < ( ) , QueueRunnerError > {
183+ let worker = init (
184+ repository_factory,
185+ clock,
186+ mailer,
187+ homeserver,
188+ url_builder,
189+ site_config,
190+ cancellation_token,
191+ )
192+ . await ?;
193+
160194 task_tracker. spawn ( worker. run ( ) ) ;
161195
162196 Ok ( ( ) )
0 commit comments