Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,11 @@ version = "1.0.64"
version = "1.40.0"
features = ["full"]

# Useful async utilities
[workspace.dependencies.tokio-util]
version = "0.7.12"
features = ["rt"]

# Tower services
[workspace.dependencies.tower]
version = "0.5.1"
Expand Down
13 changes: 11 additions & 2 deletions crates/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ serde_json.workspace = true
serde_yaml = "0.9.34"
sqlx.workspace = true
tokio.workspace = true
tokio-util.workspace = true
tower.workspace = true
tower-http.workspace = true
url.workspace = true
Expand All @@ -48,12 +49,20 @@ tracing-opentelemetry.workspace = true
opentelemetry.workspace = true
opentelemetry-http.workspace = true
opentelemetry-jaeger-propagator = "0.3.0"
opentelemetry-otlp = { version = "0.17.0", default-features = false, features = ["trace", "metrics", "http-proto"] }
opentelemetry-otlp = { version = "0.17.0", default-features = false, features = [
"trace",
"metrics",
"http-proto",
] }
opentelemetry-prometheus = "0.17.0"
opentelemetry-resource-detectors = "0.3.0"
opentelemetry-semantic-conventions.workspace = true
opentelemetry-stdout = { version = "0.5.0", features = ["trace", "metrics"] }
opentelemetry_sdk = { version = "0.24.1", features = ["trace", "metrics", "rt-tokio"] }
opentelemetry_sdk = { version = "0.24.1", features = [
"trace",
"metrics",
"rt-tokio",
] }
prometheus = "0.13.4"
sentry.workspace = true
sentry-tracing.workspace = true
Expand Down
44 changes: 32 additions & 12 deletions crates/cli/src/commands/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use mas_config::{
AppConfig, ClientsConfig, ConfigurationSection, ConfigurationSectionExt, UpstreamOAuth2Config,
};
use mas_handlers::{ActivityTracker, CookieManager, HttpClientFactory, Limiter, MetadataCache};
use mas_listener::{server::Server, shutdown::ShutdownStream};
use mas_listener::server::Server;
use mas_matrix_synapse::SynapseConnection;
use mas_router::UrlBuilder;
use mas_storage::SystemClock;
Expand All @@ -24,11 +24,11 @@ use rand::{
thread_rng,
};
use sqlx::migrate::Migrate;
use tokio::signal::unix::SignalKind;
use tracing::{info, info_span, warn, Instrument};

use crate::{
app_state::AppState,
shutdown::ShutdownManager,
util::{
database_pool_from_config, mailer_from_config, password_manager_from_config,
policy_factory_from_config, register_sighup, site_config_from_config,
Expand Down Expand Up @@ -61,6 +61,7 @@ impl Options {
#[allow(clippy::too_many_lines)]
pub async fn run(self, figment: &Figment) -> anyhow::Result<ExitCode> {
let span = info_span!("cli.run.init").entered();
let shutdown = ShutdownManager::new()?;
let config = AppConfig::extract(figment)?;

if self.migrate {
Expand Down Expand Up @@ -173,8 +174,21 @@ impl Options {
url_builder.clone(),
)
.await?;
// TODO: grab the handle
tokio::spawn(monitor.run());

// XXX: The monitor from apalis is a bit annoying to use for graceful shutdowns,
// ideally we'd just give it a cancellation token
let shutdown_future = shutdown.soft_shutdown_token().cancelled_owned();
shutdown.task_tracker().spawn(async move {
if let Err(e) = monitor
.run_with_signal(async move {
shutdown_future.await;
Ok(())
})
.await
{
tracing::error!(error = &e as &dyn std::error::Error, "Task worker failed");
}
});
}

let listeners_config = config.http.listeners.clone();
Expand All @@ -186,7 +200,12 @@ impl Options {

// Initialize the activity tracker
// Activity is flushed every minute
let activity_tracker = ActivityTracker::new(pool.clone(), Duration::from_secs(60));
let activity_tracker = ActivityTracker::new(
pool.clone(),
Duration::from_secs(60),
shutdown.task_tracker(),
shutdown.soft_shutdown_token(),
);
let trusted_proxies = config.http.trusted_proxies.clone();

// Build a rate limiter.
Expand Down Expand Up @@ -302,16 +321,17 @@ impl Options {
.flatten_ok()
.collect::<Result<Vec<_>, _>>()?;

let shutdown = ShutdownStream::default()
.with_timeout(Duration::from_secs(60))
.with_signal(SignalKind::terminate())?
.with_signal(SignalKind::interrupt())?;

span.exit();

mas_listener::server::run_servers(servers, shutdown).await;
shutdown
.task_tracker()
.spawn(mas_listener::server::run_servers(
servers,
shutdown.soft_shutdown_token(),
shutdown.hard_shutdown_token(),
));

state.activity_tracker.shutdown().await;
shutdown.run().await;

Ok(ExitCode::SUCCESS)
}
Expand Down
1 change: 1 addition & 0 deletions crates/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod app_state;
mod commands;
mod sentry_transport;
mod server;
mod shutdown;
mod sync;
mod telemetry;
mod util;
Expand Down
116 changes: 116 additions & 0 deletions crates/cli/src/shutdown.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// Copyright 2024 New Vector Ltd.
//
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.

use std::time::Duration;

use tokio::signal::unix::{Signal, SignalKind};
use tokio_util::{sync::CancellationToken, task::TaskTracker};

/// A helper to manage graceful shutdowns and track tasks that gracefully
/// shutdown.
///
/// It will listen for SIGTERM and SIGINT signals, and will trigger a soft
/// shutdown on the first signal, and a hard shutdown on the second signal or
/// after a timeout.
///
/// Users of this manager should use the `soft_shutdown_token` to react to a
/// soft shutdown, which should gracefully finish requests and close
/// connections, and the `hard_shutdown_token` to react to a hard shutdown,
/// which should drop all connections and finish all requests.
///
/// They should also use the `task_tracker` to make it track things running, so
/// that it knows when the soft shutdown is over and worked.
pub struct ShutdownManager {
hard_shutdown_token: CancellationToken,
soft_shutdown_token: CancellationToken,
task_tracker: TaskTracker,
sigterm: Signal,
sigint: Signal,
timeout: Duration,
}

impl ShutdownManager {
/// Create a new shutdown manager, installing the signal handlers
///
/// # Errors
///
/// Returns an error if the signal handler could not be installed
pub fn new() -> Result<Self, std::io::Error> {
let hard_shutdown_token = CancellationToken::new();
let soft_shutdown_token = hard_shutdown_token.child_token();
let sigterm = tokio::signal::unix::signal(SignalKind::terminate())?;
let sigint = tokio::signal::unix::signal(SignalKind::interrupt())?;
let timeout = Duration::from_secs(60);
let task_tracker = TaskTracker::new();

Ok(Self {
hard_shutdown_token,
soft_shutdown_token,
task_tracker,
sigterm,
sigint,
timeout,
})
}

/// Get a reference to the task tracker
#[must_use]
pub fn task_tracker(&self) -> &TaskTracker {
&self.task_tracker
}

/// Get a cancellation token that can be used to react to a hard shutdown
#[must_use]
pub fn hard_shutdown_token(&self) -> CancellationToken {
self.hard_shutdown_token.clone()
}

/// Get a cancellation token that can be used to react to a soft shutdown
#[must_use]
pub fn soft_shutdown_token(&self) -> CancellationToken {
self.soft_shutdown_token.clone()
}

/// Run until we finish completely shutting down.
pub async fn run(mut self) {
// Wait for a first signal and trigger the soft shutdown
tokio::select! {
_ = self.sigterm.recv() => {
tracing::info!("Shutdown signal received (SIGTERM), shutting down");
},
_ = self.sigint.recv() => {
tracing::info!("Shutdown signal received (SIGINT), shutting down");
},
};

self.soft_shutdown_token.cancel();
self.task_tracker.close();

// Start the timeout
let timeout = tokio::time::sleep(self.timeout);
tokio::select! {
_ = self.sigterm.recv() => {
tracing::warn!("Second shutdown signal received (SIGTERM), abort");
},
_ = self.sigint.recv() => {
tracing::warn!("Second shutdown signal received (SIGINT), abort");
},
() = timeout => {
tracing::warn!("Shutdown timeout reached, abort");
},
() = self.task_tracker.wait() => {
// This is the "happy path", we have gracefully shutdown
},
}

self.hard_shutdown_token().cancel();

// TODO: we may want to have a time out on the task tracker, in case we have
// really stuck tasks on it
self.task_tracker().wait().await;

tracing::info!("All tasks are done, exitting");
}
}
1 change: 1 addition & 0 deletions crates/handlers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ workspace = true
[dependencies]
# Async runtime
tokio.workspace = true
tokio-util.workspace = true
futures-util = "0.3.31"
async-trait.workspace = true

Expand Down
Loading
Loading