diff --git a/crates/cli/src/commands/server.rs b/crates/cli/src/commands/server.rs index b974f26e2..9234e45bb 100644 --- a/crates/cli/src/commands/server.rs +++ b/crates/cli/src/commands/server.rs @@ -311,8 +311,8 @@ impl Options { shutdown.hard_shutdown_token(), )); - shutdown.run().await; + let exit_code = shutdown.run().await; - Ok(ExitCode::SUCCESS) + Ok(exit_code) } } diff --git a/crates/cli/src/commands/worker.rs b/crates/cli/src/commands/worker.rs index 3d976d46e..3e3df8402 100644 --- a/crates/cli/src/commands/worker.rs +++ b/crates/cli/src/commands/worker.rs @@ -80,8 +80,8 @@ impl Options { span.exit(); - shutdown.run().await; + let exit_code = shutdown.run().await; - Ok(ExitCode::SUCCESS) + Ok(exit_code) } } diff --git a/crates/cli/src/shutdown.rs b/crates/cli/src/shutdown.rs index 080be0f2e..5386166af 100644 --- a/crates/cli/src/shutdown.rs +++ b/crates/cli/src/shutdown.rs @@ -3,7 +3,7 @@ // SPDX-License-Identifier: AGPL-3.0-only // Please see LICENSE in the repository root for full details. -use std::time::Duration; +use std::{process::ExitCode, time::Duration}; use tokio::signal::unix::{Signal, SignalKind}; use tokio_util::{sync::CancellationToken, task::TaskTracker}; @@ -74,14 +74,22 @@ impl ShutdownManager { } /// Run until we finish completely shutting down. - pub async fn run(mut self) { + pub async fn run(mut self) -> ExitCode { // Wait for a first signal and trigger the soft shutdown - tokio::select! { + let likely_crashed = tokio::select! { + () = self.soft_shutdown_token.cancelled() => { + tracing::warn!("Another task triggered a shutdown, it likely crashed! Shutting down"); + true + }, + _ = self.sigterm.recv() => { tracing::info!("Shutdown signal received (SIGTERM), shutting down"); + false }, + _ = self.sigint.recv() => { tracing::info!("Shutdown signal received (SIGINT), shutting down"); + false }, }; @@ -112,5 +120,11 @@ impl ShutdownManager { self.task_tracker().wait().await; tracing::info!("All tasks are done, exitting"); + + if likely_crashed { + ExitCode::FAILURE + } else { + ExitCode::SUCCESS + } } } diff --git a/crates/handlers/src/activity_tracker/mod.rs b/crates/handlers/src/activity_tracker/mod.rs index d314fc16e..232c636ef 100644 --- a/crates/handlers/src/activity_tracker/mod.rs +++ b/crates/handlers/src/activity_tracker/mod.rs @@ -182,6 +182,10 @@ impl ActivityTracker { interval: std::time::Duration, cancellation_token: CancellationToken, ) { + // This guard on the shutdown token is to ensure that if this task crashes for + // any reason, the server will shut down + let _guard = cancellation_token.clone().drop_guard(); + loop { tokio::select! { biased; diff --git a/crates/handlers/src/activity_tracker/worker.rs b/crates/handlers/src/activity_tracker/worker.rs index 6fc3f8a86..7d1f2ec6d 100644 --- a/crates/handlers/src/activity_tracker/worker.rs +++ b/crates/handlers/src/activity_tracker/worker.rs @@ -93,6 +93,10 @@ impl Worker { mut receiver: tokio::sync::mpsc::Receiver, cancellation_token: CancellationToken, ) { + // This guard on the shutdown token is to ensure that if this task crashes for + // any reason, the server will shut down + let _guard = cancellation_token.clone().drop_guard(); + loop { let message = tokio::select! { // Because we want the cancellation token to trigger only once, diff --git a/crates/listener/src/server.rs b/crates/listener/src/server.rs index 06eb03128..169a2256f 100644 --- a/crates/listener/src/server.rs +++ b/crates/listener/src/server.rs @@ -314,6 +314,10 @@ pub async fn run_servers( B::Data: Send, B::Error: std::error::Error + Send + Sync + 'static, { + // This guard on the shutdown token is to ensure that if this task crashes for + // any reason, the server will shut down + let _guard = soft_shutdown_token.clone().drop_guard(); + // Create a stream of accepted connections out of the listeners let mut accept_stream: SelectAll<_> = listeners .into_iter() @@ -360,7 +364,7 @@ pub async fn run_servers( connection_tasks.spawn(conn); }, Some(Ok(Err(_e))) => { /* Connection did not finish handshake, error should be logged in `accept` */ }, - Some(Err(e)) => tracing::error!("Join error: {e}"), + Some(Err(e)) => tracing::error!(error = &e as &dyn std::error::Error, "Join error"), None => tracing::error!("Join set was polled even though it was empty"), } }, @@ -369,8 +373,8 @@ pub async fn run_servers( res = connection_tasks.join_next(), if !connection_tasks.is_empty() => { match res { Some(Ok(Ok(()))) => tracing::trace!("Connection finished"), - Some(Ok(Err(e))) => tracing::error!("Error while serving connection: {e}"), - Some(Err(e)) => tracing::error!("Join error: {e}"), + Some(Ok(Err(e))) => tracing::error!(error = &*e as &dyn std::error::Error, "Error while serving connection"), + Some(Err(e)) => tracing::error!(error = &e as &dyn std::error::Error, "Join error"), None => tracing::error!("Join set was polled even though it was empty"), } }, @@ -412,7 +416,7 @@ pub async fn run_servers( connection_tasks.spawn(conn); } Some(Ok(Err(_e))) => { /* Connection did not finish handshake, error should be logged in `accept` */ }, - Some(Err(e)) => tracing::error!("Join error: {e}"), + Some(Err(e)) => tracing::error!(error = &e as &dyn std::error::Error, "Join error"), None => tracing::error!("Join set was polled even though it was empty"), } }, @@ -421,8 +425,8 @@ pub async fn run_servers( res = connection_tasks.join_next(), if !connection_tasks.is_empty() => { match res { Some(Ok(Ok(()))) => tracing::trace!("Connection finished"), - Some(Ok(Err(e))) => tracing::error!("Error while serving connection: {e}"), - Some(Err(e)) => tracing::error!("Join error: {e}"), + Some(Ok(Err(e))) => tracing::error!(error = &*e as &dyn std::error::Error, "Error while serving connection"), + Some(Err(e)) => tracing::error!(error = &e as &dyn std::error::Error, "Join error"), None => tracing::error!("Join set was polled even though it was empty"), } }, diff --git a/crates/tasks/src/lib.rs b/crates/tasks/src/lib.rs index 0785cd9c4..38be3a91c 100644 --- a/crates/tasks/src/lib.rs +++ b/crates/tasks/src/lib.rs @@ -133,14 +133,7 @@ pub async fn init( mas_storage::queue::CleanupExpiredTokensJob, ); - task_tracker.spawn(async move { - if let Err(e) = worker.run().await { - tracing::error!( - error = &e as &dyn std::error::Error, - "Failed to run new queue" - ); - } - }); + task_tracker.spawn(worker.run()); Ok(()) } diff --git a/crates/tasks/src/new_queue.rs b/crates/tasks/src/new_queue.rs index 3bf09d071..4959203e4 100644 --- a/crates/tasks/src/new_queue.rs +++ b/crates/tasks/src/new_queue.rs @@ -200,6 +200,8 @@ pub struct QueueWorker { am_i_leader: bool, last_heartbeat: DateTime, cancellation_token: CancellationToken, + #[expect(dead_code, reason = "This is used on Drop")] + cancellation_guard: tokio_util::sync::DropGuard, state: State, schedules: Vec, tracker: JobTracker, @@ -269,6 +271,10 @@ impl QueueWorker { ) .build(); + // We put a cancellation drop guard in the structure, so that when it gets + // dropped, we're sure to cancel the token + let cancellation_guard = cancellation_token.clone().drop_guard(); + Ok(Self { rng, clock, @@ -277,6 +283,7 @@ impl QueueWorker { am_i_leader: false, last_heartbeat: now, cancellation_token, + cancellation_guard, state, schedules: Vec::new(), tracker: JobTracker::new(), @@ -316,7 +323,16 @@ impl QueueWorker { self } - pub async fn run(&mut self) -> Result<(), QueueRunnerError> { + pub async fn run(mut self) { + if let Err(e) = self.run_inner().await { + tracing::error!( + error = &e as &dyn std::error::Error, + "Failed to run new queue" + ); + } + } + + async fn run_inner(&mut self) -> Result<(), QueueRunnerError> { self.setup_schedules().await?; while !self.cancellation_token.is_cancelled() {