Skip to content

Commit 31f5218

Browse files
downloader: Use tokio_util::CancellationToken to handle cancellations
This fixes a bug where if we didn't store the handle to the download somewhere it would automatically cancel the download
1 parent 7973137 commit 31f5218

File tree

2 files changed

+11
-9
lines changed

2 files changed

+11
-9
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ tonic = { workspace = true }
1212
tokio = { workspace = true }
1313
prost = { workspace = true }
1414
reqwest = "0.12"
15+
tokio-util = "0.7"
1516

1617
[build-dependencies]
1718
tonic-build = "0.12.3"

src/downloader.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use tokio::{
66
io::AsyncWriteExt,
77
sync::{mpsc, oneshot, watch, Semaphore},
88
};
9+
use tokio_util::sync::CancellationToken;
910

1011
const QUEUE_SIZE: usize = 100;
1112
const MAX_RETRIES: usize = 3;
@@ -16,7 +17,7 @@ struct DownloadRequest {
1617
destination: PathBuf,
1718
result: oneshot::Sender<Result<File, Error>>,
1819
status: watch::Sender<Status>,
19-
cancel: oneshot::Receiver<()>,
20+
cancel: CancellationToken,
2021
}
2122

2223
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -29,7 +30,7 @@ pub struct DownloadProgress {
2930
pub struct DownloadHandle {
3031
result: oneshot::Receiver<Result<File, Error>>,
3132
status: watch::Receiver<Status>,
32-
cancel: oneshot::Sender<()>,
33+
cancel: CancellationToken,
3334
}
3435

3536
impl std::future::Future for DownloadHandle {
@@ -58,8 +59,8 @@ impl DownloadHandle {
5859
self.status.changed().await
5960
}
6061

61-
pub fn cancel(self) {
62-
self.cancel.send(()).ok();
62+
pub fn cancel(&self) {
63+
self.cancel.cancel();
6364
}
6465
}
6566

@@ -114,22 +115,22 @@ impl DownloadManager {
114115
pub fn add_request(&self, url: Url, destination: PathBuf) -> DownloadHandle {
115116
let (result_tx, result_rx) = oneshot::channel();
116117
let (status_tx, status_rx) = watch::channel(Status::Pending);
117-
let (cancel_tx, cancel_rx) = oneshot::channel();
118+
let cancel = CancellationToken::new();
118119

119120
let req = DownloadRequest {
120121
url,
121122
destination,
122123
result: result_tx,
123124
status: status_tx,
124-
cancel: cancel_rx,
125+
cancel: cancel.clone(),
125126
};
126127

127128
let _ = self.queue.try_send(req);
128129

129130
DownloadHandle {
130131
result: result_rx,
131132
status: status_rx,
132-
cancel: cancel_tx,
133+
cancel,
133134
}
134135
}
135136
}
@@ -193,7 +194,7 @@ async fn download_thread(client: Client, mut req: DownloadRequest) {
193194

194195
tokio::select! {
195196
_ = tokio::time::sleep(delay) => {},
196-
_ = &mut req.cancel => {
197+
_ = req.cancel.cancelled() => {
197198
req.status.send(Status::Failed).ok();
198199
req.result.send(Err(Error::Download(DownloadError::Cancelled))).ok();
199200
return;
@@ -247,7 +248,7 @@ async fn download(client: Client, req: &mut DownloadRequest) -> Result<File, Err
247248
update_progress(bytes_downloaded, total_bytes);
248249
loop {
249250
tokio::select! {
250-
_ = &mut req.cancel => {
251+
_ = req.cancel.cancelled() => {
251252
drop(file); // Manually drop the file handle to ensure that deletion doesn't fail
252253
tokio::fs::remove_file(&req.destination).await?;
253254
return Err(Error::Download(DownloadError::Cancelled));

0 commit comments

Comments
 (0)