Skip to content

Commit 2aac716

Browse files
downloader: Better download interface with errors
1 parent 5566d67 commit 2aac716

File tree

4 files changed

+75
-39
lines changed

4 files changed

+75
-39
lines changed

src/downloader/handle.rs

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,18 @@ impl DownloadHandle {
2525
cancel,
2626
}
2727
}
28+
29+
pub fn status(&self) -> Status {
30+
*self.status.borrow()
31+
}
32+
33+
pub async fn wait_for_status_update(&mut self) -> Result<(), watch::error::RecvError> {
34+
self.status.changed().await
35+
}
36+
37+
pub fn cancel(&self) {
38+
self.cancel.cancel();
39+
}
2840
}
2941

3042
impl std::future::Future for DownloadHandle {
@@ -43,17 +55,3 @@ impl std::future::Future for DownloadHandle {
4355
}
4456
}
4557
}
46-
47-
impl DownloadHandle {
48-
pub fn status(&self) -> Status {
49-
*self.status.borrow()
50-
}
51-
52-
pub async fn wait_for_status_update(&mut self) -> Result<(), watch::error::RecvError> {
53-
self.status.changed().await
54-
}
55-
56-
pub fn cancel(&self) {
57-
self.cancel.cancel();
58-
}
59-
}

src/downloader/manager.rs

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1+
use super::{download_thread, DownloadHandle, DownloadRequest};
2+
use crate::{error::DownloadError, Error};
13
use reqwest::{Client, Url};
2-
use std::{path::PathBuf, sync::Arc};
3-
use tokio::sync::{mpsc, oneshot, watch, Semaphore};
4+
use std::{path::Path, sync::Arc};
5+
use tokio::sync::{mpsc, Semaphore};
46
use tokio_util::sync::CancellationToken;
57

6-
use super::{download_thread, DownloadHandle, DownloadRequest, Status};
7-
88
const QUEUE_SIZE: usize = 100;
99

1010
#[derive(Debug)]
@@ -36,6 +36,33 @@ impl DownloadManager {
3636
manager
3737
}
3838

39+
pub fn download(
40+
&self,
41+
url: Url,
42+
destination: impl AsRef<Path>,
43+
) -> Result<DownloadHandle, Error> {
44+
if self.cancel.is_cancelled() {
45+
return Err(Error::Download(DownloadError::ManagerShutdown));
46+
}
47+
48+
let destination = destination.as_ref();
49+
if destination.exists() {
50+
return Err(Error::Download(DownloadError::FileExists {
51+
path: destination.to_path_buf(),
52+
}));
53+
}
54+
55+
let cancel = self.cancel.child_token();
56+
let (req, handle) = DownloadRequest::new_req_handle_pair(url, destination, cancel);
57+
58+
self.queue.try_send(req).map_err(|e| match e {
59+
mpsc::error::TrySendError::Full(_) => Error::Download(DownloadError::QueueFull),
60+
mpsc::error::TrySendError::Closed(_) => Error::Download(DownloadError::ManagerShutdown),
61+
})?;
62+
63+
Ok(handle)
64+
}
65+
3966
pub fn set_max_parallel_downloads(&self, limit: usize) {
4067
let current = self.semaphore.available_permits();
4168
if limit > current {
@@ -48,24 +75,6 @@ impl DownloadManager {
4875
}
4976
}
5077

51-
pub fn add_request(&self, url: Url, destination: PathBuf) -> DownloadHandle {
52-
let (result_tx, result_rx) = oneshot::channel();
53-
let (status_tx, status_rx) = watch::channel(Status::Queued);
54-
let cancel = self.cancel.child_token();
55-
56-
let req = DownloadRequest {
57-
url,
58-
destination,
59-
result: result_tx,
60-
status: status_tx,
61-
cancel: cancel.clone(),
62-
};
63-
64-
let _ = self.queue.try_send(req);
65-
66-
DownloadHandle::new(result_rx, status_rx, cancel)
67-
}
68-
6978
pub fn cancel_all(&self) {
7079
self.cancel.cancel();
7180
}

src/downloader/types.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
use super::DownloadProgress;
1+
use super::{DownloadHandle, DownloadProgress};
22
use crate::Error;
33
use reqwest::Url;
4-
use std::path::PathBuf;
4+
use std::path::{Path, PathBuf};
55
use tokio::{
66
fs::File,
77
sync::{oneshot, watch},
@@ -17,6 +17,28 @@ pub(crate) struct DownloadRequest {
1717
pub cancel: CancellationToken,
1818
}
1919

20+
impl DownloadRequest {
21+
pub fn new_req_handle_pair(
22+
url: Url,
23+
destination: impl AsRef<Path>,
24+
cancel: CancellationToken,
25+
) -> (Self, DownloadHandle) {
26+
let (result_tx, result_rx) = oneshot::channel();
27+
let (status_tx, status_rx) = watch::channel(Status::Queued);
28+
29+
(
30+
Self {
31+
url,
32+
destination: destination.as_ref().to_path_buf(),
33+
result: result_tx,
34+
status: status_tx,
35+
cancel: cancel.clone(),
36+
},
37+
DownloadHandle::new(result_rx, status_rx, cancel),
38+
)
39+
}
40+
}
41+
2042
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2143
pub enum Status {
2244
Queued,

src/error.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::path::PathBuf;
12
use thiserror::Error;
23

34
#[derive(Error, Debug)]
@@ -18,6 +19,12 @@ pub enum Error {
1819
pub enum DownloadError {
1920
#[error("Download was cancelled")]
2021
Cancelled,
21-
#[error("Retry limit exceeded")]
22+
#[error("Retry limit exceeded: {last_error_msg}")]
2223
RetriesExhausted { last_error_msg: String },
24+
#[error("Download queue is full")]
25+
QueueFull,
26+
#[error("Download manager has been shut down")]
27+
ManagerShutdown,
28+
#[error("File already exists: {path}")]
29+
FileExists { path: PathBuf },
2330
}

0 commit comments

Comments
 (0)