Skip to content

Commit ce05710

Browse files
downloader: Use TaskTracker to track thread lifecycles
1 parent 0670468 commit ce05710

File tree

3 files changed

+9
-12
lines changed

3 files changed

+9
-12
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ tonic = { workspace = true }
1212
tokio = { workspace = true }
1313
prost = { workspace = true }
1414
reqwest = "0.12"
15-
tokio-util = "0.7"
15+
tokio-util = { version = "0.7", features = ["rt"] }
1616

1717
[build-dependencies]
1818
tonic-build = "0.12.3"

src/downloader/manager.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,15 @@ use crate::{error::DownloadError, Error};
33
use reqwest::{Client, Url};
44
use std::{path::Path, sync::Arc};
55
use tokio::sync::{mpsc, Semaphore};
6-
use tokio_util::sync::CancellationToken;
6+
use tokio_util::{sync::CancellationToken, task::TaskTracker};
77

88
#[derive(Debug)]
99
pub struct DownloadManager {
1010
queue: mpsc::Sender<DownloadRequest>,
1111
semaphore: Arc<Semaphore>,
1212
cancel: CancellationToken,
1313
config: DownloadManagerConfig,
14-
}
15-
16-
impl Drop for DownloadManager {
17-
fn drop(&mut self) {
18-
// Need to manually close the semaphore to make sure dispatcher_thread stops waiting for permits
19-
self.semaphore.close();
20-
}
14+
tracker: TaskTracker,
2115
}
2216

2317
impl Default for DownloadManager {
@@ -31,14 +25,16 @@ impl DownloadManager {
3125
let (tx, rx) = mpsc::channel(config.queue_size());
3226
let client = Client::new();
3327
let semaphore = Arc::new(Semaphore::new(config.max_concurrent()));
28+
let tracker = TaskTracker::new();
3429
let manager = Self {
3530
queue: tx,
3631
semaphore: semaphore.clone(),
3732
cancel: CancellationToken::new(),
3833
config,
34+
tracker: tracker.clone(),
3935
};
4036
// Spawn the dispatcher thread to handle download requests
41-
tokio::spawn(async move { dispatcher_thread(client, rx, semaphore).await });
37+
tracker.spawn(dispatcher_thread(client, rx, semaphore, tracker.clone()));
4238
manager
4339
}
4440

@@ -98,14 +94,15 @@ async fn dispatcher_thread(
9894
client: Client,
9995
mut rx: mpsc::Receiver<DownloadRequest>,
10096
sem: Arc<Semaphore>,
97+
tracker: TaskTracker,
10198
) {
10299
while let Some(request) = rx.recv().await {
103100
let permit = match sem.clone().acquire_owned().await {
104101
Ok(permit) => permit,
105102
Err(_) => break,
106103
};
107104
let client = client.clone();
108-
tokio::spawn(async move {
105+
tracker.spawn(async move {
109106
// Move the permit into the worker thread so it's automatically released when the thread finishes
110107
let _permit = permit;
111108
download_thread(client.clone(), request).await;

src/downloader/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ mod worker;
77

88
pub use config::DownloadManagerConfig;
99
pub use handle::DownloadHandle;
10-
pub use manager::*;
10+
pub use manager::DownloadManager;
1111
pub use progress::DownloadProgress;
1212
pub use types::*;
1313
pub(self) use worker::download_thread;

0 commit comments

Comments
 (0)