Skip to content

Commit ad41c23

Browse files
downloader: Implement cancelling downloads
1 parent 9c29890 commit ad41c23

File tree

1 file changed

+37
-4
lines changed

1 file changed

+37
-4
lines changed

src/downloader.rs

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ struct DownloadRequest {
1717
destination: PathBuf,
1818
result: oneshot::Sender<Result<File, Error>>,
1919
status: watch::Sender<Status>,
20+
cancel: oneshot::Receiver<()>,
2021
}
2122

2223
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -29,6 +30,7 @@ pub struct DownloadProgress {
2930
pub struct DownloadHandle {
3031
result: oneshot::Receiver<Result<File, Error>>,
3132
status: watch::Receiver<Status>,
33+
cancel: oneshot::Sender<()>,
3234
}
3335

3436
impl DownloadHandle {
@@ -42,13 +44,18 @@ impl DownloadHandle {
4244
pub fn status(&self) -> Status {
4345
self.status.borrow().clone()
4446
}
47+
48+
pub fn cancel(self) {
49+
self.cancel.send(()).ok();
50+
}
4551
}
4652

4753
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
4854
pub enum Status {
4955
Pending,
5056
InProgress(DownloadProgress),
5157
Completed,
58+
Cancelled,
5259
Failed,
5360
}
5461

@@ -94,19 +101,22 @@ impl DownloadManager {
94101
pub fn add_request(&self, url: Url, destination: PathBuf) -> DownloadHandle {
95102
let (result_tx, result_rx) = oneshot::channel();
96103
let (status_tx, status_rx) = watch::channel(Status::Pending);
104+
let (cancel_tx, cancel_rx) = oneshot::channel();
97105

98106
let req = DownloadRequest {
99107
url,
100108
destination,
101109
result: result_tx,
102110
status: status_tx,
111+
cancel: cancel_rx,
103112
};
104113

105114
let _ = self.queue.try_send(req);
106115

107116
DownloadHandle {
108117
result: result_rx,
109118
status: status_rx,
119+
cancel: cancel_tx,
110120
}
111121
}
112122
}
@@ -131,7 +141,17 @@ async fn dispatcher_thread(
131141
let _ = request.result.send(Ok(file));
132142
}
133143
Err(e) => {
134-
let _ = request.status.send(Status::Failed);
144+
let status = match e {
145+
Error::Io(ref io_err) => {
146+
if io_err.kind() == std::io::ErrorKind::Interrupted {
147+
Status::Cancelled
148+
} else {
149+
Status::Failed
150+
}
151+
}
152+
_ => Status::Failed,
153+
};
154+
let _ = request.status.send(status);
135155
let _ = request.result.send(Err(e));
136156
}
137157
}
@@ -143,13 +163,18 @@ async fn download_thread(client: Client, req: &DownloadRequest) -> Result<File,
143163
let mut resp = client.get(req.url.as_ref()).send().await?;
144164
let total_bytes = resp.content_length();
145165
let mut bytes_downloaded = 0u64;
166+
let mut cancelled = false;
146167
let mut file = File::options()
147168
.read(true)
148169
.write(true)
149170
.create(true)
150171
.open(&req.destination)?;
151172

152173
while let Some(chunk) = resp.chunk().await.transpose() {
174+
cancelled = !req.cancel.is_empty();
175+
if cancelled {
176+
break;
177+
}
153178
let chunk = chunk?;
154179
file.write_all(&chunk)?;
155180
bytes_downloaded += chunk.len() as u64;
@@ -159,7 +184,15 @@ async fn download_thread(client: Client, req: &DownloadRequest) -> Result<File,
159184
}));
160185
}
161186

162-
// Reset the cursor to the beginning of the file
163-
file.seek(SeekFrom::Start(0))?;
164-
Ok(file)
187+
if cancelled {
188+
std::fs::remove_file(&req.destination)?;
189+
Err(Error::Io(std::io::Error::new(
190+
std::io::ErrorKind::Interrupted,
191+
"Download cancelled",
192+
)))
193+
} else {
194+
// Reset the cursor to the beginning of the file
195+
file.seek(SeekFrom::Start(0))?;
196+
Ok(file)
197+
}
165198
}

0 commit comments

Comments
 (0)