Skip to content

Commit faf8111

Browse files
committed
fix: [torrust#1477] shutdown event listeners on CRTL+c signal
This fixes the problem of adding the jobs for event listeners in the main app JoinHandle vector. It was not possible to add the handles for those tokio tasks becuase in the main app we wait for all jobs and those jobs never end. ```rust async fn main() { let (_app_container, jobs) = app::run().await; // handle the signals tokio::select! { _ = tokio::signal::ctrl_c() => { tracing::info!("Torrust tracker shutting down ..."); // Await for all jobs to shutdown futures::future::join_all(jobs).await; tracing::info!("Torrust tracker successfully shutdown."); } } } ``` Now, we can wait for them becuase they listen for the halt signal. We will implement the shutdown in a different way in a new PR. See torrust#1405 Instead of listen to the CRTL+c signal the main app will send a "stop" event to the listeners. The final goal it only the main app listen for this external signal and it propagates the shutdown in cascade via normal internal messages or channels.
1 parent e595190 commit faf8111

File tree

5 files changed

+94
-69
lines changed

5 files changed

+94
-69
lines changed

packages/http-tracker-core/src/statistics/event/listener.rs

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,32 @@ pub fn run_event_listener(receiver: Receiver, repository: &Arc<Repository>) -> J
2323
}
2424

2525
async fn dispatch_events(mut receiver: Receiver, stats_repository: Arc<Repository>) {
26+
let shutdown_signal = tokio::signal::ctrl_c();
27+
28+
tokio::pin!(shutdown_signal);
29+
2630
loop {
27-
match receiver.recv().await {
28-
Ok(event) => handle_event(event, &stats_repository, CurrentClock::now()).await,
29-
Err(e) => {
30-
match e {
31-
RecvError::Closed => {
32-
tracing::info!(target: HTTP_TRACKER_LOG_TARGET, "Http core statistics receiver closed.");
33-
break;
34-
}
35-
RecvError::Lagged(n) => {
36-
// From now on, metrics will be imprecise
37-
tracing::warn!(target: HTTP_TRACKER_LOG_TARGET, "Http core statistics receiver lagged by {} events.", n);
31+
tokio::select! {
32+
biased;
33+
34+
_ = &mut shutdown_signal => {
35+
tracing::info!(target: HTTP_TRACKER_LOG_TARGET, "Received Ctrl+C, shutting down HTTP tracker core event listener.");
36+
break;
37+
}
38+
39+
result = receiver.recv() => {
40+
match result {
41+
Ok(event) => handle_event(event, &stats_repository, CurrentClock::now()).await,
42+
Err(e) => {
43+
match e {
44+
RecvError::Closed => {
45+
tracing::info!(target: HTTP_TRACKER_LOG_TARGET, "Http core statistics receiver closed.");
46+
break;
47+
}
48+
RecvError::Lagged(n) => {
49+
tracing::warn!(target: HTTP_TRACKER_LOG_TARGET, "Http core statistics receiver lagged by {} events.", n);
50+
}
51+
}
3852
}
3953
}
4054
}

packages/udp-tracker-core/src/statistics/event/listener.rs

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,31 @@ pub fn run_event_listener(receiver: Receiver, repository: &Arc<Repository>) -> J
2323
}
2424

2525
async fn dispatch_events(mut receiver: Receiver, stats_repository: Arc<Repository>) {
26+
let shutdown_signal = tokio::signal::ctrl_c();
27+
tokio::pin!(shutdown_signal);
28+
2629
loop {
27-
match receiver.recv().await {
28-
Ok(event) => handle_event(event, &stats_repository, CurrentClock::now()).await,
29-
Err(e) => {
30-
match e {
31-
RecvError::Closed => {
32-
tracing::info!(target: UDP_TRACKER_LOG_TARGET, "Udp core statistics receiver closed.");
33-
break;
34-
}
35-
RecvError::Lagged(n) => {
36-
// From now on, metrics will be imprecise
37-
tracing::warn!(target: UDP_TRACKER_LOG_TARGET, "Udp core statistics receiver lagged by {} events.", n);
30+
tokio::select! {
31+
biased;
32+
33+
_ = &mut shutdown_signal => {
34+
tracing::info!(target: UDP_TRACKER_LOG_TARGET, "Received Ctrl+C, shutting down UDP tracker core event listener.");
35+
break;
36+
}
37+
38+
result = receiver.recv() => {
39+
match result {
40+
Ok(event) => handle_event(event, &stats_repository, CurrentClock::now()).await,
41+
Err(e) => {
42+
match e {
43+
RecvError::Closed => {
44+
tracing::info!(target: UDP_TRACKER_LOG_TARGET, "Udp core statistics receiver closed.");
45+
break;
46+
}
47+
RecvError::Lagged(n) => {
48+
tracing::warn!(target: UDP_TRACKER_LOG_TARGET, "Udp core statistics receiver lagged by {} events.", n);
49+
}
50+
}
3851
}
3952
}
4053
}

packages/udp-tracker-server/src/statistics/event/listener.rs

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,23 +19,36 @@ pub fn run_event_listener(receiver: Receiver, repository: &Arc<Repository>) -> J
1919
tokio::spawn(async move {
2020
dispatch_events(receiver, stats_repository).await;
2121

22-
tracing::info!(target: UDP_TRACKER_LOG_TARGET, "DP tracker server event listener finished");
22+
tracing::info!(target: UDP_TRACKER_LOG_TARGET, "UDP tracker server event listener finished");
2323
})
2424
}
2525

2626
async fn dispatch_events(mut receiver: Receiver, stats_repository: Arc<Repository>) {
27+
let shutdown_signal = tokio::signal::ctrl_c();
28+
tokio::pin!(shutdown_signal);
29+
2730
loop {
28-
match receiver.recv().await {
29-
Ok(event) => handle_event(event, &stats_repository, CurrentClock::now()).await,
30-
Err(e) => {
31-
match e {
32-
RecvError::Closed => {
33-
tracing::info!(target: UDP_TRACKER_LOG_TARGET, "Udp server statistics receiver closed.");
34-
break;
35-
}
36-
RecvError::Lagged(n) => {
37-
// From now on, metrics will be imprecise
38-
tracing::warn!(target: UDP_TRACKER_LOG_TARGET, "Udp server statistics receiver lagged by {} events.", n);
31+
tokio::select! {
32+
biased;
33+
34+
_ = &mut shutdown_signal => {
35+
tracing::info!(target: UDP_TRACKER_LOG_TARGET, "Received Ctrl+C, shutting down UDP tracker server event listener.");
36+
break;
37+
}
38+
39+
result = receiver.recv() => {
40+
match result {
41+
Ok(event) => handle_event(event, &stats_repository, CurrentClock::now()).await,
42+
Err(e) => {
43+
match e {
44+
RecvError::Closed => {
45+
tracing::info!(target: UDP_TRACKER_LOG_TARGET, "Udp server statistics receiver closed.");
46+
break;
47+
}
48+
RecvError::Lagged(n) => {
49+
tracing::warn!(target: UDP_TRACKER_LOG_TARGET, "Udp server statistics receiver lagged by {} events.", n);
50+
}
51+
}
3952
}
4053
}
4154
}

src/app.rs

Lines changed: 18 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,9 @@ async fn load_data_from_database(config: &Configuration, app_container: &Arc<App
6666
async fn start_jobs(config: &Configuration, app_container: &Arc<AppContainer>) -> Vec<JoinHandle<()>> {
6767
let mut jobs: Vec<JoinHandle<()>> = Vec::new();
6868

69-
start_http_core_event_listener(config, app_container);
70-
start_udp_core_event_listener(config, app_container);
71-
start_udp_server_event_listener(config, app_container);
69+
start_http_core_event_listener(config, app_container, &mut jobs);
70+
start_udp_core_event_listener(config, app_container, &mut jobs);
71+
start_udp_server_event_listener(config, app_container, &mut jobs);
7272
start_the_udp_instances(config, app_container, &mut jobs).await;
7373
start_the_http_instances(config, app_container, &mut jobs).await;
7474
start_the_http_api(config, app_container, &mut jobs).await;
@@ -109,43 +109,28 @@ async fn load_whitelisted_torrents(config: &Configuration, app_container: &Arc<A
109109
}
110110
}
111111

112-
fn start_http_core_event_listener(config: &Configuration, app_container: &Arc<AppContainer>) {
113-
let _job = jobs::http_tracker_core::start_event_listener(config, app_container);
112+
fn start_http_core_event_listener(config: &Configuration, app_container: &Arc<AppContainer>, jobs: &mut Vec<JoinHandle<()>>) {
113+
let opt_job = jobs::http_tracker_core::start_event_listener(config, app_container);
114114

115-
// todo: this cannot be enabled otherwise the application never ends
116-
// because the event listener never stops. You see this console message
117-
// forever:
118-
//
119-
// !! shuting down in 90 seconds !!
120-
// 2025-04-24T15:27:45.454101Z INFO graceful_shutdown: torrust_axum_server::signals: remaining alive connections: 0
121-
//
122-
// Depends on: https://github.com/torrust/torrust-tracker/issues/1405
115+
if let Some(job) = opt_job {
116+
jobs.push(job);
117+
}
123118
}
124119

125-
fn start_udp_core_event_listener(config: &Configuration, app_container: &Arc<AppContainer>) {
126-
let _job = jobs::udp_tracker_core::start_event_listener(config, app_container);
120+
fn start_udp_core_event_listener(config: &Configuration, app_container: &Arc<AppContainer>, jobs: &mut Vec<JoinHandle<()>>) {
121+
let opt_job = jobs::udp_tracker_core::start_event_listener(config, app_container);
127122

128-
// todo: the job cannot be added in the jobs vector otherwise the application never ends
129-
// because the event listener never stops. You see this console message
130-
// forever:
131-
//
132-
// !! shuting down in 90 seconds !!
133-
// 2025-04-24T15:27:45.454101Z INFO graceful_shutdown: torrust_axum_server::signals: remaining alive connections: 0
134-
//
135-
// Depends on: https://github.com/torrust/torrust-tracker/issues/1405
123+
if let Some(job) = opt_job {
124+
jobs.push(job);
125+
}
136126
}
137127

138-
fn start_udp_server_event_listener(config: &Configuration, app_container: &Arc<AppContainer>) {
139-
let _job = jobs::udp_tracker_server::start_event_listener(config, app_container);
128+
fn start_udp_server_event_listener(config: &Configuration, app_container: &Arc<AppContainer>, jobs: &mut Vec<JoinHandle<()>>) {
129+
let opt_job = jobs::udp_tracker_server::start_event_listener(config, app_container);
140130

141-
// todo: the job cannot be added in the jobs vector otherwise the application never ends
142-
// because the event listener never stops. You see this console message
143-
// forever:
144-
//
145-
// !! shuting down in 90 seconds !!
146-
// 2025-04-24T15:27:45.454101Z INFO graceful_shutdown: torrust_axum_server::signals: remaining alive connections: 0
147-
//
148-
// Depends on: https://github.com/torrust/torrust-tracker/issues/1405
131+
if let Some(job) = opt_job {
132+
jobs.push(job);
133+
}
149134
}
150135

151136
async fn start_the_udp_instances(config: &Configuration, app_container: &Arc<AppContainer>, jobs: &mut Vec<JoinHandle<()>>) {

src/bootstrap/jobs/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@
77
//!
88
//! This modules contains all the functions needed to start those jobs.
99
pub mod health_check_api;
10-
pub mod http_tracker_core;
1110
pub mod http_tracker;
11+
pub mod http_tracker_core;
1212
pub mod torrent_cleanup;
1313
pub mod tracker_apis;
14+
pub mod udp_tracker;
1415
pub mod udp_tracker_core;
1516
pub mod udp_tracker_server;
16-
pub mod udp_tracker;

0 commit comments

Comments
 (0)