Skip to content

Commit 53fdafd

Browse files
committed
feat: [#1477] extract JobManager to handle jobs
It: - Give a name to all jobs so they can be identify later in logs. - Wait for all jobs to finish when the app receives teh halt signal (CRTL+c) - Only waits for a grace period per job. - Shows a message when a job don't complete in time. This could be improved in the future: - By showing a message every second while we are waiting for a job to finish. - Waiting for all of them in paralell.
1 parent faf8111 commit 53fdafd

File tree

5 files changed

+154
-58
lines changed

5 files changed

+154
-58
lines changed

src/app.rs

Lines changed: 60 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,15 @@
2323
//! - Tracker REST API: the tracker API can be enabled/disabled.
2424
use std::sync::Arc;
2525

26-
use tokio::task::JoinHandle;
2726
use torrust_tracker_configuration::{Configuration, HttpTracker, UdpTracker};
2827
use tracing::instrument;
2928

29+
use crate::bootstrap::jobs::manager::JobManager;
3030
use crate::bootstrap::jobs::{self, health_check_api, http_tracker, torrent_cleanup, tracker_apis, udp_tracker};
3131
use crate::bootstrap::{self};
3232
use crate::container::AppContainer;
3333

34-
pub async fn run() -> (Arc<AppContainer>, Vec<JoinHandle<()>>) {
34+
pub async fn run() -> (Arc<AppContainer>, JobManager) {
3535
let (config, app_container) = bootstrap::app::setup();
3636

3737
let app_container = Arc::new(app_container);
@@ -50,7 +50,7 @@ pub async fn run() -> (Arc<AppContainer>, Vec<JoinHandle<()>>) {
5050
/// - Can't retrieve tracker keys from database.
5151
/// - Can't load whitelist from database.
5252
#[instrument(skip(config, app_container))]
53-
pub async fn start(config: &Configuration, app_container: &Arc<AppContainer>) -> Vec<JoinHandle<()>> {
53+
pub async fn start(config: &Configuration, app_container: &Arc<AppContainer>) -> JobManager {
5454
warn_if_no_services_enabled(config);
5555

5656
load_data_from_database(config, app_container).await;
@@ -63,19 +63,19 @@ async fn load_data_from_database(config: &Configuration, app_container: &Arc<App
6363
load_whitelisted_torrents(config, app_container).await;
6464
}
6565

66-
async fn start_jobs(config: &Configuration, app_container: &Arc<AppContainer>) -> Vec<JoinHandle<()>> {
67-
let mut jobs: Vec<JoinHandle<()>> = Vec::new();
66+
async fn start_jobs(config: &Configuration, app_container: &Arc<AppContainer>) -> JobManager {
67+
let mut job_manager = JobManager::new();
6868

69-
start_http_core_event_listener(config, app_container, &mut jobs);
70-
start_udp_core_event_listener(config, app_container, &mut jobs);
71-
start_udp_server_event_listener(config, app_container, &mut jobs);
72-
start_the_udp_instances(config, app_container, &mut jobs).await;
73-
start_the_http_instances(config, app_container, &mut jobs).await;
74-
start_the_http_api(config, app_container, &mut jobs).await;
75-
start_torrent_cleanup(config, app_container, &mut jobs);
76-
start_health_check_api(config, app_container, &mut jobs).await;
69+
start_http_core_event_listener(config, app_container, &mut job_manager);
70+
start_udp_core_event_listener(config, app_container, &mut job_manager);
71+
start_udp_server_event_listener(config, app_container, &mut job_manager);
72+
start_the_udp_instances(config, app_container, &mut job_manager).await;
73+
start_the_http_instances(config, app_container, &mut job_manager).await;
74+
start_the_http_api(config, app_container, &mut job_manager).await;
75+
start_torrent_cleanup(config, app_container, &mut job_manager);
76+
start_health_check_api(config, app_container, &mut job_manager).await;
7777

78-
jobs
78+
job_manager
7979
}
8080

8181
fn warn_if_no_services_enabled(config: &Configuration) {
@@ -109,94 +109,100 @@ async fn load_whitelisted_torrents(config: &Configuration, app_container: &Arc<A
109109
}
110110
}
111111

112-
fn start_http_core_event_listener(config: &Configuration, app_container: &Arc<AppContainer>, jobs: &mut Vec<JoinHandle<()>>) {
113-
let opt_job = jobs::http_tracker_core::start_event_listener(config, app_container);
112+
fn start_http_core_event_listener(config: &Configuration, app_container: &Arc<AppContainer>, job_manager: &mut JobManager) {
113+
let opt_handle = jobs::http_tracker_core::start_event_listener(config, app_container);
114114

115-
if let Some(job) = opt_job {
116-
jobs.push(job);
115+
if let Some(handle) = opt_handle {
116+
job_manager.push("http_core_event_listener", handle);
117117
}
118118
}
119119

120-
fn start_udp_core_event_listener(config: &Configuration, app_container: &Arc<AppContainer>, jobs: &mut Vec<JoinHandle<()>>) {
121-
let opt_job = jobs::udp_tracker_core::start_event_listener(config, app_container);
120+
fn start_udp_core_event_listener(config: &Configuration, app_container: &Arc<AppContainer>, job_manager: &mut JobManager) {
121+
let opt_handle = jobs::udp_tracker_core::start_event_listener(config, app_container);
122122

123-
if let Some(job) = opt_job {
124-
jobs.push(job);
123+
if let Some(handle) = opt_handle {
124+
job_manager.push("udp_core_event_listener", handle);
125125
}
126126
}
127127

128-
fn start_udp_server_event_listener(config: &Configuration, app_container: &Arc<AppContainer>, jobs: &mut Vec<JoinHandle<()>>) {
129-
let opt_job = jobs::udp_tracker_server::start_event_listener(config, app_container);
128+
fn start_udp_server_event_listener(config: &Configuration, app_container: &Arc<AppContainer>, job_manager: &mut JobManager) {
129+
let opt_handle = jobs::udp_tracker_server::start_event_listener(config, app_container);
130130

131-
if let Some(job) = opt_job {
132-
jobs.push(job);
131+
if let Some(handle) = opt_handle {
132+
job_manager.push("udp_server_event_listener", handle);
133133
}
134134
}
135135

136-
async fn start_the_udp_instances(config: &Configuration, app_container: &Arc<AppContainer>, jobs: &mut Vec<JoinHandle<()>>) {
136+
async fn start_the_udp_instances(config: &Configuration, app_container: &Arc<AppContainer>, job_manager: &mut JobManager) {
137137
if let Some(udp_trackers) = &config.udp_trackers {
138-
for udp_tracker_config in udp_trackers {
138+
for (idx, udp_tracker_config) in udp_trackers.iter().enumerate() {
139139
if config.core.private {
140140
tracing::warn!(
141141
"Could not start UDP tracker on: {} while in private mode. UDP is not safe for private trackers!",
142142
udp_tracker_config.bind_address
143143
);
144144
} else {
145-
start_udp_instance(udp_tracker_config, app_container, jobs).await;
145+
start_udp_instance(idx, udp_tracker_config, app_container, job_manager).await;
146146
}
147147
}
148148
} else {
149149
tracing::info!("No UDP blocks in configuration");
150150
}
151151
}
152152

153-
async fn start_udp_instance(udp_tracker_config: &UdpTracker, app_container: &Arc<AppContainer>, jobs: &mut Vec<JoinHandle<()>>) {
153+
async fn start_udp_instance(
154+
idx: usize,
155+
udp_tracker_config: &UdpTracker,
156+
app_container: &Arc<AppContainer>,
157+
job_manager: &mut JobManager,
158+
) {
154159
let udp_tracker_container = app_container
155160
.udp_tracker_container(udp_tracker_config.bind_address)
156161
.expect("Could not create UDP tracker container");
157162
let udp_tracker_server_container = app_container.udp_tracker_server_container();
158163

159-
jobs.push(
160-
udp_tracker::start_job(
161-
udp_tracker_container,
162-
udp_tracker_server_container,
163-
app_container.registar.give_form(),
164-
)
165-
.await,
166-
);
164+
let handle = udp_tracker::start_job(
165+
udp_tracker_container,
166+
udp_tracker_server_container,
167+
app_container.registar.give_form(),
168+
)
169+
.await;
170+
171+
job_manager.push(format!("udp_instance_{}_{}", idx, udp_tracker_config.bind_address), handle);
167172
}
168173

169-
async fn start_the_http_instances(config: &Configuration, app_container: &Arc<AppContainer>, jobs: &mut Vec<JoinHandle<()>>) {
174+
async fn start_the_http_instances(config: &Configuration, app_container: &Arc<AppContainer>, job_manager: &mut JobManager) {
170175
if let Some(http_trackers) = &config.http_trackers {
171-
for http_tracker_config in http_trackers {
172-
start_http_instance(http_tracker_config, app_container, jobs).await;
176+
for (idx, http_tracker_config) in http_trackers.iter().enumerate() {
177+
start_http_instance(idx, http_tracker_config, app_container, job_manager).await;
173178
}
174179
} else {
175180
tracing::info!("No HTTP blocks in configuration");
176181
}
177182
}
178183

179184
async fn start_http_instance(
185+
idx: usize,
180186
http_tracker_config: &HttpTracker,
181187
app_container: &Arc<AppContainer>,
182-
jobs: &mut Vec<JoinHandle<()>>,
188+
job_manager: &mut JobManager,
183189
) {
184190
let http_tracker_container = app_container
185191
.http_tracker_container(http_tracker_config.bind_address)
186192
.expect("Could not create HTTP tracker container");
187193

188-
if let Some(job) = http_tracker::start_job(
194+
if let Some(handle) = http_tracker::start_job(
189195
http_tracker_container,
190196
app_container.registar.give_form(),
191197
torrust_axum_http_tracker_server::Version::V1,
192198
)
193199
.await
194200
{
195-
jobs.push(job);
201+
job_manager.push(format!("http_instance_{}_{}", idx, http_tracker_config.bind_address), handle);
196202
}
197203
}
198204

199-
async fn start_the_http_api(config: &Configuration, app_container: &Arc<AppContainer>, jobs: &mut Vec<JoinHandle<()>>) {
205+
async fn start_the_http_api(config: &Configuration, app_container: &Arc<AppContainer>, job_manager: &mut JobManager) {
200206
if let Some(http_api_config) = &config.http_api {
201207
let http_api_config = Arc::new(http_api_config.clone());
202208
let http_api_container = app_container.tracker_http_api_container(&http_api_config);
@@ -208,22 +214,23 @@ async fn start_the_http_api(config: &Configuration, app_container: &Arc<AppConta
208214
)
209215
.await
210216
{
211-
jobs.push(job);
217+
job_manager.push("http_api", job);
212218
}
213219
} else {
214220
tracing::info!("No API block in configuration");
215221
}
216222
}
217223

218-
fn start_torrent_cleanup(config: &Configuration, app_container: &Arc<AppContainer>, jobs: &mut Vec<JoinHandle<()>>) {
224+
fn start_torrent_cleanup(config: &Configuration, app_container: &Arc<AppContainer>, job_manager: &mut JobManager) {
219225
if config.core.inactive_peer_cleanup_interval > 0 {
220-
jobs.push(torrent_cleanup::start_job(
221-
&config.core,
222-
&app_container.tracker_core_container.torrents_manager,
223-
));
226+
let handle = torrent_cleanup::start_job(&config.core, &app_container.tracker_core_container.torrents_manager);
227+
228+
job_manager.push("torrent_cleanup", handle);
224229
}
225230
}
226231

227-
async fn start_health_check_api(config: &Configuration, app_container: &Arc<AppContainer>, jobs: &mut Vec<JoinHandle<()>>) {
228-
jobs.push(health_check_api::start_job(&config.health_check_api, app_container.registar.entries()).await);
232+
async fn start_health_check_api(config: &Configuration, app_container: &Arc<AppContainer>, job_manager: &mut JobManager) {
233+
let handle = health_check_api::start_job(&config.health_check_api, app_container.registar.entries()).await;
234+
235+
job_manager.push("health_check_api", handle);
229236
}

src/bootstrap/jobs/manager.rs

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
use std::time::Duration;
2+
3+
use tokio::task::JoinHandle;
4+
use tokio::time::timeout;
5+
use tracing::{info, warn};
6+
7+
/// Represents a named background job.
8+
#[derive(Debug)]
9+
pub struct Job {
10+
pub name: String,
11+
pub handle: JoinHandle<()>,
12+
}
13+
14+
impl Job {
15+
pub fn new<N: Into<String>>(name: N, handle: JoinHandle<()>) -> Self {
16+
Self {
17+
name: name.into(),
18+
handle,
19+
}
20+
}
21+
}
22+
23+
/// Manages multiple background jobs.
24+
#[derive(Debug, Default)]
25+
pub struct JobManager {
26+
jobs: Vec<Job>,
27+
}
28+
29+
impl JobManager {
30+
#[must_use]
31+
pub fn new() -> Self {
32+
Self { jobs: Vec::new() }
33+
}
34+
35+
pub fn push<N: Into<String>>(&mut self, name: N, handle: JoinHandle<()>) {
36+
self.jobs.push(Job::new(name, handle));
37+
}
38+
39+
/// Waits sequentially for all jobs to complete, with a graceful timeout per
40+
/// job.
41+
pub async fn wait_for_all(mut self, grace_period: Duration) {
42+
for job in self.jobs.drain(..) {
43+
let name = job.name.clone();
44+
45+
info!(job = %name, "Waiting for job to finish (timeout of {} seconds) ...", grace_period.as_secs());
46+
47+
if let Ok(result) = timeout(grace_period, job.handle).await {
48+
if let Err(e) = result {
49+
warn!(job = %name, "Job return an error: {:?}", e);
50+
} else {
51+
info!(job = %name, "Job completed gracefully");
52+
}
53+
} else {
54+
warn!(job = %name, "Job did not complete in time");
55+
}
56+
}
57+
}
58+
}
59+
60+
#[cfg(test)]
61+
mod tests {
62+
use tokio::time::Duration;
63+
64+
use super::*;
65+
66+
#[tokio::test]
67+
async fn it_should_wait_for_all_jobs_to_finish() {
68+
let mut manager = JobManager::new();
69+
70+
manager.push("job1", tokio::spawn(async {}));
71+
manager.push("job2", tokio::spawn(async {}));
72+
73+
manager.wait_for_all(Duration::from_secs(1)).await;
74+
}
75+
76+
#[tokio::test]
77+
async fn it_should_log_when_a_job_panics() {
78+
let mut manager = JobManager::new();
79+
80+
manager.push(
81+
"panic_job",
82+
tokio::spawn(async {
83+
panic!("expected panic");
84+
}),
85+
);
86+
87+
manager.wait_for_all(Duration::from_secs(1)).await;
88+
}
89+
}

src/bootstrap/jobs/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
pub mod health_check_api;
1010
pub mod http_tracker;
1111
pub mod http_tracker_core;
12+
pub mod manager;
1213
pub mod torrent_cleanup;
1314
pub mod tracker_apis;
1415
pub mod udp_tracker;

src/console/profiling.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,8 +191,7 @@ pub async fn run() {
191191
_ = tokio::signal::ctrl_c() => {
192192
tracing::info!("Torrust tracker shutting down via Ctrl+C ...");
193193

194-
// Await for all jobs to shutdown
195-
futures::future::join_all(jobs).await;
194+
jobs.wait_for_all(Duration::from_secs(10)).await;
196195
}
197196
}
198197

src/main.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
1+
use std::time::Duration;
2+
13
use torrust_tracker_lib::app;
24

35
#[tokio::main]
46
async fn main() {
57
let (_app_container, jobs) = app::run().await;
68

7-
// handle the signals
89
tokio::select! {
910
_ = tokio::signal::ctrl_c() => {
1011
tracing::info!("Torrust tracker shutting down ...");
1112

12-
// Await for all jobs to shutdown
13-
futures::future::join_all(jobs).await;
13+
jobs.wait_for_all(Duration::from_secs(10)).await;
1414

1515
tracing::info!("Torrust tracker successfully shutdown.");
1616
}

0 commit comments

Comments
 (0)