Skip to content

Commit 69bbcfc

Browse files
authored
Merge pull request #3903 from element-hq/quenting/lifecycle-manager
Notify the service state through `sd_notify`
2 parents 00b31b8 + d57552c commit 69bbcfc

File tree

8 files changed

+264
-173
lines changed

8 files changed

+264
-173
lines changed

Cargo.lock

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/cli/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ console = "0.15.10"
2323
dialoguer = { version = "0.11.0", features = ["fuzzy-select"] }
2424
dotenvy = "0.15.7"
2525
figment.workspace = true
26+
futures-util.workspace = true
2627
http-body-util.workspace = true
2728
hyper.workspace = true
2829
ipnetwork = "0.20.0"
@@ -32,6 +33,7 @@ rand.workspace = true
3233
rand_chacha = "0.3.1"
3334
reqwest.workspace = true
3435
rustls.workspace = true
36+
sd-notify = "0.4.5"
3537
serde_json.workspace = true
3638
serde_yaml = "0.9.34"
3739
sqlx.workspace = true

crates/cli/src/commands/server.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,10 @@ use tracing::{info, info_span, warn, Instrument};
2424

2525
use crate::{
2626
app_state::AppState,
27-
shutdown::ShutdownManager,
27+
lifecycle::LifecycleManager,
2828
util::{
2929
database_pool_from_config, mailer_from_config, password_manager_from_config,
30-
policy_factory_from_config, register_sighup, site_config_from_config,
31-
templates_from_config,
30+
policy_factory_from_config, site_config_from_config, templates_from_config,
3231
},
3332
};
3433

@@ -57,7 +56,7 @@ impl Options {
5756
#[allow(clippy::too_many_lines)]
5857
pub async fn run(self, figment: &Figment) -> anyhow::Result<ExitCode> {
5958
let span = info_span!("cli.run.init").entered();
60-
let shutdown = ShutdownManager::new()?;
59+
let mut shutdown = LifecycleManager::new()?;
6160
let config = AppConfig::extract(figment)?;
6261

6362
info!(version = crate::VERSION, "Starting up");
@@ -145,6 +144,7 @@ impl Options {
145144
// Load and compile the templates
146145
let templates =
147146
templates_from_config(&config.templates, &site_config, &url_builder).await?;
147+
shutdown.register_reloadable(&templates);
148148

149149
let http_client = mas_http::reqwest_client();
150150

@@ -186,6 +186,9 @@ impl Options {
186186
shutdown.task_tracker(),
187187
shutdown.soft_shutdown_token(),
188188
);
189+
190+
shutdown.register_reloadable(&activity_tracker);
191+
189192
let trusted_proxies = config.http.trusted_proxies.clone();
190193

191194
// Build a rate limiter.
@@ -197,9 +200,6 @@ impl Options {
197200
// Explicitly the config to properly zeroize secret keys
198201
drop(config);
199202

200-
// Listen for SIGHUP
201-
register_sighup(&templates, &activity_tracker)?;
202-
203203
limiter.start();
204204

205205
let graphql_schema = mas_handlers::graphql_schema(

crates/cli/src/commands/worker.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use mas_router::UrlBuilder;
1414
use tracing::{info, info_span};
1515

1616
use crate::{
17-
shutdown::ShutdownManager,
17+
lifecycle::LifecycleManager,
1818
util::{
1919
database_pool_from_config, mailer_from_config, site_config_from_config,
2020
templates_from_config,
@@ -26,7 +26,7 @@ pub(super) struct Options {}
2626

2727
impl Options {
2828
pub async fn run(self, figment: &Figment) -> anyhow::Result<ExitCode> {
29-
let shutdown = ShutdownManager::new()?;
29+
let shutdown = LifecycleManager::new()?;
3030
let span = info_span!("cli.worker.init").entered();
3131
let config = AppConfig::extract(figment)?;
3232

crates/cli/src/lifecycle.rs

Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
// Copyright 2024, 2025 New Vector Ltd.
2+
//
3+
// SPDX-License-Identifier: AGPL-3.0-only
4+
// Please see LICENSE in the repository root for full details.
5+
6+
use std::{future::Future, process::ExitCode, time::Duration};
7+
8+
use futures_util::future::{BoxFuture, Either};
9+
use mas_handlers::ActivityTracker;
10+
use mas_templates::Templates;
11+
use tokio::signal::unix::{Signal, SignalKind};
12+
use tokio_util::{sync::CancellationToken, task::TaskTracker};
13+
14+
/// A helper to manage the lifecycle of the service, inclusing handling graceful
15+
/// shutdowns and configuration reloads.
16+
///
17+
/// It will listen for SIGTERM and SIGINT signals, and will trigger a soft
18+
/// shutdown on the first signal, and a hard shutdown on the second signal or
19+
/// after a timeout.
20+
///
21+
/// Users of this manager should use the `soft_shutdown_token` to react to a
22+
/// soft shutdown, which should gracefully finish requests and close
23+
/// connections, and the `hard_shutdown_token` to react to a hard shutdown,
24+
/// which should drop all connections and finish all requests.
25+
///
26+
/// They should also use the `task_tracker` to make it track things running, so
27+
/// that it knows when the soft shutdown is over and worked.
28+
///
29+
/// It also integrates with [`sd_notify`] to notify the service manager of the
30+
/// state of the service.
31+
pub struct LifecycleManager {
32+
hard_shutdown_token: CancellationToken,
33+
soft_shutdown_token: CancellationToken,
34+
task_tracker: TaskTracker,
35+
sigterm: Signal,
36+
sigint: Signal,
37+
sighup: Signal,
38+
timeout: Duration,
39+
reload_handlers: Vec<Box<dyn Fn() -> BoxFuture<'static, ()>>>,
40+
}
41+
42+
/// Represents a thing that can be reloaded with a SIGHUP
43+
pub trait Reloadable: Clone + Send {
44+
fn reload(&self) -> impl Future<Output = ()> + Send;
45+
}
46+
47+
impl Reloadable for ActivityTracker {
48+
async fn reload(&self) {
49+
self.flush().await;
50+
}
51+
}
52+
53+
impl Reloadable for Templates {
54+
async fn reload(&self) {
55+
if let Err(err) = self.reload().await {
56+
tracing::error!(
57+
error = &err as &dyn std::error::Error,
58+
"Failed to reload templates"
59+
);
60+
}
61+
}
62+
}
63+
64+
/// A wrapper around [`sd_notify::notify`] that logs any errors
65+
fn notify(states: &[sd_notify::NotifyState]) {
66+
if let Err(e) = sd_notify::notify(false, states) {
67+
tracing::error!(
68+
error = &e as &dyn std::error::Error,
69+
"Failed to notify service manager"
70+
);
71+
}
72+
}
73+
74+
impl LifecycleManager {
75+
/// Create a new shutdown manager, installing the signal handlers
76+
///
77+
/// # Errors
78+
///
79+
/// Returns an error if the signal handler could not be installed
80+
pub fn new() -> Result<Self, std::io::Error> {
81+
let hard_shutdown_token = CancellationToken::new();
82+
let soft_shutdown_token = hard_shutdown_token.child_token();
83+
let sigterm = tokio::signal::unix::signal(SignalKind::terminate())?;
84+
let sigint = tokio::signal::unix::signal(SignalKind::interrupt())?;
85+
let sighup = tokio::signal::unix::signal(SignalKind::hangup())?;
86+
let timeout = Duration::from_secs(60);
87+
let task_tracker = TaskTracker::new();
88+
89+
notify(&[sd_notify::NotifyState::MainPid(std::process::id())]);
90+
91+
Ok(Self {
92+
hard_shutdown_token,
93+
soft_shutdown_token,
94+
task_tracker,
95+
sigterm,
96+
sigint,
97+
sighup,
98+
timeout,
99+
reload_handlers: Vec::new(),
100+
})
101+
}
102+
103+
/// Add a handler to be called when the server gets a SIGHUP
104+
pub fn register_reloadable(&mut self, reloadable: &(impl Reloadable + 'static)) {
105+
let reloadable = reloadable.clone();
106+
self.reload_handlers.push(Box::new(move || {
107+
let reloadable = reloadable.clone();
108+
Box::pin(async move { reloadable.reload().await })
109+
}));
110+
}
111+
112+
/// Get a reference to the task tracker
113+
#[must_use]
114+
pub fn task_tracker(&self) -> &TaskTracker {
115+
&self.task_tracker
116+
}
117+
118+
/// Get a cancellation token that can be used to react to a hard shutdown
119+
#[must_use]
120+
pub fn hard_shutdown_token(&self) -> CancellationToken {
121+
self.hard_shutdown_token.clone()
122+
}
123+
124+
/// Get a cancellation token that can be used to react to a soft shutdown
125+
#[must_use]
126+
pub fn soft_shutdown_token(&self) -> CancellationToken {
127+
self.soft_shutdown_token.clone()
128+
}
129+
130+
/// Run until we finish completely shutting down.
131+
pub async fn run(mut self) -> ExitCode {
132+
notify(&[sd_notify::NotifyState::Ready]);
133+
134+
// This will be `Some` if we have the watchdog enabled, and `None` if not
135+
let mut watchdog_interval = {
136+
let mut watchdog_usec = 0;
137+
if sd_notify::watchdog_enabled(false, &mut watchdog_usec) {
138+
Some(tokio::time::interval(Duration::from_micros(
139+
watchdog_usec / 2,
140+
)))
141+
} else {
142+
None
143+
}
144+
};
145+
146+
// Wait for a first shutdown signal and trigger the soft shutdown
147+
let likely_crashed = loop {
148+
// This makes a Future that will either yield the watchdog tick if enabled, or a
149+
// pending Future if not
150+
let watchdog_tick = if let Some(watchdog_interval) = &mut watchdog_interval {
151+
Either::Left(watchdog_interval.tick())
152+
} else {
153+
Either::Right(futures_util::future::pending())
154+
};
155+
156+
tokio::select! {
157+
() = self.soft_shutdown_token.cancelled() => {
158+
tracing::warn!("Another task triggered a shutdown, it likely crashed! Shutting down");
159+
break true;
160+
},
161+
162+
_ = self.sigterm.recv() => {
163+
tracing::info!("Shutdown signal received (SIGTERM), shutting down");
164+
break false;
165+
},
166+
167+
_ = self.sigint.recv() => {
168+
tracing::info!("Shutdown signal received (SIGINT), shutting down");
169+
break false;
170+
},
171+
172+
_ = watchdog_tick => {
173+
notify(&[
174+
sd_notify::NotifyState::Watchdog,
175+
]);
176+
},
177+
178+
_ = self.sighup.recv() => {
179+
tracing::info!("Reload signal received (SIGHUP), reloading");
180+
181+
notify(&[
182+
sd_notify::NotifyState::Reloading,
183+
sd_notify::NotifyState::monotonic_usec_now()
184+
.expect("Failed to read monotonic clock")
185+
]);
186+
187+
// XXX: if one handler takes a long time, it will block the
188+
// rest of the shutdown process, which is not ideal. We
189+
// should probably have a timeout here
190+
futures_util::future::join_all(
191+
self.reload_handlers
192+
.iter()
193+
.map(|handler| handler())
194+
).await;
195+
196+
notify(&[sd_notify::NotifyState::Ready]);
197+
198+
tracing::info!("Reloading done");
199+
},
200+
}
201+
};
202+
203+
notify(&[sd_notify::NotifyState::Stopping]);
204+
205+
self.soft_shutdown_token.cancel();
206+
self.task_tracker.close();
207+
208+
// Start the timeout
209+
let timeout = tokio::time::sleep(self.timeout);
210+
tokio::select! {
211+
_ = self.sigterm.recv() => {
212+
tracing::warn!("Second shutdown signal received (SIGTERM), abort");
213+
},
214+
_ = self.sigint.recv() => {
215+
tracing::warn!("Second shutdown signal received (SIGINT), abort");
216+
},
217+
() = timeout => {
218+
tracing::warn!("Shutdown timeout reached, abort");
219+
},
220+
() = self.task_tracker.wait() => {
221+
// This is the "happy path", we have gracefully shutdown
222+
},
223+
}
224+
225+
self.hard_shutdown_token().cancel();
226+
227+
// TODO: we may want to have a time out on the task tracker, in case we have
228+
// really stuck tasks on it
229+
self.task_tracker().wait().await;
230+
231+
tracing::info!("All tasks are done, exitting");
232+
233+
if likely_crashed {
234+
ExitCode::FAILURE
235+
} else {
236+
ExitCode::SUCCESS
237+
}
238+
}
239+
}

crates/cli/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ use tracing_subscriber::{
1818

1919
mod app_state;
2020
mod commands;
21+
mod lifecycle;
2122
mod server;
22-
mod shutdown;
2323
mod sync;
2424
mod telemetry;
2525
mod util;

0 commit comments

Comments
 (0)