Skip to content

Commit 76afd6a

Browse files
committed
Graceful shutdown
1 parent f4897ca commit 76afd6a

File tree

10 files changed

+250
-44
lines changed

10 files changed

+250
-44
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/cli/src/commands/server.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,8 @@ impl Options {
172172
&mailer,
173173
homeserver_connection.clone(),
174174
url_builder.clone(),
175+
shutdown.soft_shutdown_token(),
176+
shutdown.task_tracker(),
175177
)
176178
.await?;
177179

crates/cli/src/commands/worker.rs

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,20 @@ use rand::{
1717
};
1818
use tracing::{info, info_span};
1919

20-
use crate::util::{
21-
database_pool_from_config, mailer_from_config, site_config_from_config, templates_from_config,
20+
use crate::{
21+
shutdown::ShutdownManager,
22+
util::{
23+
database_pool_from_config, mailer_from_config, site_config_from_config,
24+
templates_from_config,
25+
},
2226
};
2327

2428
#[derive(Parser, Debug, Default)]
2529
pub(super) struct Options {}
2630

2731
impl Options {
2832
pub async fn run(self, figment: &Figment) -> anyhow::Result<ExitCode> {
33+
let shutdown = ShutdownManager::new()?;
2934
let span = info_span!("cli.worker.init").entered();
3035
let config = AppConfig::extract(figment)?;
3136

@@ -71,11 +76,35 @@ impl Options {
7176
let worker_name = Alphanumeric.sample_string(&mut rng, 10);
7277

7378
info!(worker_name, "Starting task scheduler");
74-
let monitor = mas_tasks::init(&worker_name, &pool, &mailer, conn, url_builder).await?;
75-
79+
let monitor = mas_tasks::init(
80+
&worker_name,
81+
&pool,
82+
&mailer,
83+
conn,
84+
url_builder,
85+
shutdown.soft_shutdown_token(),
86+
shutdown.task_tracker(),
87+
)
88+
.await?;
89+
90+
// XXX: The monitor from apalis is a bit annoying to use for graceful shutdowns,
91+
// ideally we'd just give it a cancellation token
92+
let shutdown_future = shutdown.soft_shutdown_token().cancelled_owned();
93+
shutdown.task_tracker().spawn(async move {
94+
if let Err(e) = monitor
95+
.run_with_signal(async move {
96+
shutdown_future.await;
97+
Ok(())
98+
})
99+
.await
100+
{
101+
tracing::error!(error = &e as &dyn std::error::Error, "Task worker failed");
102+
}
103+
});
76104
span.exit();
77105

78-
monitor.run().await?;
106+
shutdown.run().await;
107+
79108
Ok(ExitCode::SUCCESS)
80109
}
81110
}

crates/storage-pg/.sqlx/query-399e261027fe6c9167511636157ab747a469404533f59ff6fbd56e9eb5ad38e1.json

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/storage-pg/.sqlx/query-6ecad60e565367a6cfa539b4c32dabe674ea853e0d47eb5c713705cb0130c758.json

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/storage-pg/src/queue/worker.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ impl<'c> QueueWorkerRepository for PgQueueWorkerRepository<'c> {
109109
),
110110
err,
111111
)]
112-
async fn shutdown(&mut self, clock: &dyn Clock, worker: Worker) -> Result<(), Self::Error> {
112+
async fn shutdown(&mut self, clock: &dyn Clock, worker: &Worker) -> Result<(), Self::Error> {
113113
let now = clock.now();
114114
let res = sqlx::query!(
115115
r#"
@@ -126,6 +126,30 @@ impl<'c> QueueWorkerRepository for PgQueueWorkerRepository<'c> {
126126

127127
DatabaseError::ensure_affected_rows(&res, 1)?;
128128

129+
// Remove the leader lease if we were holding it
130+
let res = sqlx::query!(
131+
r#"
132+
DELETE FROM queue_leader
133+
WHERE queue_worker_id = $1
134+
"#,
135+
Uuid::from(worker.id),
136+
)
137+
.traced()
138+
.execute(&mut *self.conn)
139+
.await?;
140+
141+
// If we were holding the leader lease, notify workers
142+
if res.rows_affected() > 0 {
143+
sqlx::query!(
144+
r#"
145+
NOTIFY queue_leader_stepdown
146+
"#,
147+
)
148+
.traced()
149+
.execute(&mut *self.conn)
150+
.await?;
151+
}
152+
129153
Ok(())
130154
}
131155

crates/storage/src/queue/worker.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ pub trait QueueWorkerRepository: Send + Sync {
5151
/// # Errors
5252
///
5353
/// Returns an error if the underlying repository fails.
54-
async fn shutdown(&mut self, clock: &dyn Clock, worker: Worker) -> Result<(), Self::Error>;
54+
async fn shutdown(&mut self, clock: &dyn Clock, worker: &Worker) -> Result<(), Self::Error>;
5555

5656
/// Find dead workers and shut them down.
5757
///
@@ -105,7 +105,7 @@ repository_impl!(QueueWorkerRepository:
105105
async fn shutdown(
106106
&mut self,
107107
clock: &dyn Clock,
108-
worker: Worker,
108+
worker: &Worker,
109109
) -> Result<(), Self::Error>;
110110

111111
async fn shutdown_dead_workers(

crates/tasks/Cargo.toml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,11 @@ workspace = true
1313

1414
[dependencies]
1515
anyhow.workspace = true
16-
apalis-core = { version = "0.4.9", features = ["extensions", "tokio-comp", "storage"] }
16+
apalis-core = { version = "0.4.9", features = [
17+
"extensions",
18+
"tokio-comp",
19+
"storage",
20+
] }
1721
apalis-cron = "0.4.9"
1822
async-stream = "0.3.6"
1923
async-trait.workspace = true
@@ -25,6 +29,7 @@ rand_chacha = "0.3.1"
2529
sqlx.workspace = true
2630
thiserror.workspace = true
2731
tokio.workspace = true
32+
tokio-util.workspace = true
2833
tower.workspace = true
2934
tracing.workspace = true
3035
tracing-opentelemetry.workspace = true

crates/tasks/src/lib.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use mas_storage_pg::PgRepository;
1515
use new_queue::QueueRunnerError;
1616
use rand::SeedableRng;
1717
use sqlx::{Pool, Postgres};
18+
use tokio_util::{sync::CancellationToken, task::TaskTracker};
1819
use tracing::debug;
1920

2021
use crate::storage::PostgresStorageFactory;
@@ -143,6 +144,8 @@ pub async fn init(
143144
mailer: &Mailer,
144145
homeserver: impl HomeserverConnection<Error = anyhow::Error> + 'static,
145146
url_builder: UrlBuilder,
147+
cancellation_token: CancellationToken,
148+
task_tracker: &TaskTracker,
146149
) -> Result<Monitor<TokioExecutor>, QueueRunnerError> {
147150
let state = State::new(
148151
pool.clone(),
@@ -166,11 +169,9 @@ pub async fn init(
166169
.map_err(QueueRunnerError::SetupListener)?;
167170
debug!(?monitor, "workers registered");
168171

169-
let mut worker = self::new_queue::QueueWorker::new(state).await?;
172+
let mut worker = self::new_queue::QueueWorker::new(state, cancellation_token).await?;
170173

171-
// TODO: this is just spawning the task in the background, we probably actually
172-
// want to wrap that in a structure, and handle graceful shutdown correctly
173-
tokio::spawn(async move {
174+
task_tracker.spawn(async move {
174175
if let Err(e) = worker.run().await {
175176
tracing::error!(
176177
error = &e as &dyn std::error::Error,

0 commit comments

Comments
 (0)