Skip to content

Commit 4803c51

Browse files
downloader: Use builder pattern to build download requests
1 parent 09b47b6 commit 4803c51

File tree

7 files changed

+193
-110
lines changed

7 files changed

+193
-110
lines changed

src/downloader/config.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,21 @@ impl Default for DownloadConfig {
5353
}
5454

5555
impl DownloadConfig {
56+
pub fn with_max_retries(mut self, retries: usize) -> Self {
57+
self.max_retries = retries;
58+
self
59+
}
60+
61+
pub fn with_user_agent(mut self, user_agent: impl Into<String>) -> Self {
62+
self.user_agent = Some(user_agent.into());
63+
self
64+
}
65+
66+
pub fn with_progress_interval(mut self, interval: Duration) -> Self {
67+
self.progress_update_interval = interval;
68+
self
69+
}
70+
5671
pub fn max_retries(&self) -> usize {
5772
self.max_retries
5873
}

src/downloader/manager.rs

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use super::{
2-
config::DownloadManagerConfig, download_thread, DownloadConfig, DownloadHandle, DownloadRequest,
2+
download_thread, DownloadBuilder, DownloadConfig, DownloadManagerConfig, DownloadRequest,
33
};
44
use crate::{error::DownloadError, Error};
55
use reqwest::{Client, Url};
@@ -40,40 +40,25 @@ impl DownloadManager {
4040
manager
4141
}
4242

43-
pub fn download_with_config(
43+
pub fn download(
4444
&self,
45-
url: Url,
45+
url: impl TryInto<Url>,
4646
destination: impl AsRef<Path>,
47-
config: DownloadConfig,
48-
) -> Result<DownloadHandle, Error> {
49-
if self.cancel.is_cancelled() {
50-
return Err(Error::Download(DownloadError::ManagerShutdown));
51-
}
52-
53-
let destination = destination.as_ref();
54-
if destination.exists() {
55-
return Err(Error::Download(DownloadError::FileExists {
56-
path: destination.to_path_buf(),
57-
}));
58-
}
59-
60-
let cancel = self.cancel.child_token();
61-
let (req, handle) = DownloadRequest::new(url, destination, cancel, config);
62-
63-
self.queue.try_send(req).map_err(|e| match e {
64-
mpsc::error::TrySendError::Full(_) => Error::Download(DownloadError::QueueFull),
65-
mpsc::error::TrySendError::Closed(_) => Error::Download(DownloadError::ManagerShutdown),
66-
})?;
67-
68-
Ok(handle)
47+
) -> Result<DownloadBuilder, Error> {
48+
let url = url
49+
.try_into()
50+
.map_err(|_| Error::Download(DownloadError::InvalidUrl))?;
51+
Ok(DownloadBuilder::new(self, url, destination))
6952
}
7053

71-
pub fn download(
54+
pub fn download_with_config(
7255
&self,
7356
url: Url,
7457
destination: impl AsRef<Path>,
75-
) -> Result<DownloadHandle, Error> {
76-
self.download_with_config(url, destination, DownloadConfig::default())
58+
config: DownloadConfig,
59+
) -> Result<DownloadBuilder, Error> {
60+
self.download(url, destination)
61+
.map(|builder| builder.with_config(config))
7762
}
7863

7964
pub async fn set_max_parallel_downloads(&self, limit: usize) -> Result<(), Error> {
@@ -116,6 +101,21 @@ impl DownloadManager {
116101
drop(self.queue);
117102
Ok(())
118103
}
104+
105+
pub fn is_cancelled(&self) -> bool {
106+
self.cancel.is_cancelled()
107+
}
108+
109+
pub fn child_token(&self) -> CancellationToken {
110+
self.cancel.child_token()
111+
}
112+
113+
pub fn queue_request(&self, req: DownloadRequest) -> Result<(), Error> {
114+
self.queue.try_send(req).map_err(|e| match e {
115+
mpsc::error::TrySendError::Full(_) => Error::Download(DownloadError::QueueFull),
116+
mpsc::error::TrySendError::Closed(_) => Error::Download(DownloadError::ManagerShutdown),
117+
})
118+
}
119119
}
120120

121121
async fn dispatcher_thread(

src/downloader/mod.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,12 @@ mod config;
22
mod handle;
33
mod manager;
44
mod progress;
5-
mod types;
5+
mod request;
66
mod worker;
77

88
pub use config::{DownloadConfig, DownloadManagerConfig};
99
pub use handle::DownloadHandle;
1010
pub use manager::DownloadManager;
11-
pub use progress::DownloadProgress;
12-
pub(self) use types::DownloadRequest;
13-
pub use types::Status;
11+
pub use progress::{DownloadProgress, Status};
12+
pub use request::{DownloadBuilder, DownloadRequest};
1413
pub(self) use worker::download_thread;

src/downloader/progress.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,3 +112,13 @@ impl DownloadProgress {
112112
self.start_time.elapsed()
113113
}
114114
}
115+
116+
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
117+
pub enum Status {
118+
Queued,
119+
InProgress(DownloadProgress),
120+
Retrying,
121+
Completed,
122+
Failed,
123+
Cancelled,
124+
}

src/downloader/request.rs

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
use super::{DownloadConfig, DownloadHandle, DownloadManager, DownloadProgress, Status};
2+
use crate::{error::DownloadError, Error};
3+
use reqwest::Url;
4+
use std::{
5+
path::{Path, PathBuf},
6+
time::{Duration, Instant},
7+
};
8+
use tokio::{
9+
fs::File,
10+
sync::{oneshot, watch},
11+
};
12+
use tokio_util::sync::CancellationToken;
13+
14+
#[derive(Debug)]
15+
pub struct DownloadRequest {
16+
url: Url,
17+
destination: PathBuf,
18+
pub result: oneshot::Sender<Result<File, Error>>,
19+
pub status: watch::Sender<Status>,
20+
pub cancel: CancellationToken,
21+
config: DownloadConfig,
22+
23+
// For rate limiting
24+
last_progress_update: Instant,
25+
}
26+
27+
impl DownloadRequest {
28+
pub fn new(
29+
url: Url,
30+
destination: impl AsRef<Path>,
31+
cancel: CancellationToken,
32+
config: DownloadConfig,
33+
) -> (Self, DownloadHandle) {
34+
let (result_tx, result_rx) = oneshot::channel();
35+
let (status_tx, status_rx) = watch::channel(Status::Queued);
36+
(
37+
Self {
38+
url,
39+
destination: destination.as_ref().to_path_buf(),
40+
result: result_tx,
41+
status: status_tx,
42+
cancel: cancel.clone(),
43+
config,
44+
last_progress_update: Instant::now(),
45+
},
46+
DownloadHandle::new(result_rx, status_rx, cancel),
47+
)
48+
}
49+
50+
pub fn url(&self) -> &Url {
51+
&self.url
52+
}
53+
54+
pub fn destination(&self) -> &Path {
55+
&self.destination
56+
}
57+
58+
pub fn config(&self) -> &DownloadConfig {
59+
&self.config
60+
}
61+
62+
pub fn send_progress(&mut self, progress: DownloadProgress) {
63+
if self.last_progress_update.elapsed() >= self.config.progress_update_interval() {
64+
self.last_progress_update = Instant::now();
65+
self.status.send(Status::InProgress(progress)).ok();
66+
}
67+
}
68+
}
69+
70+
#[derive(Debug)]
71+
pub struct DownloadBuilder<'a> {
72+
manager: &'a DownloadManager,
73+
url: Url,
74+
destination: PathBuf,
75+
config: DownloadConfig,
76+
}
77+
78+
impl<'a> DownloadBuilder<'a> {
79+
pub fn new(manager: &'a DownloadManager, url: Url, destination: impl AsRef<Path>) -> Self {
80+
Self {
81+
manager,
82+
url,
83+
destination: destination.as_ref().to_path_buf(),
84+
config: DownloadConfig::default(),
85+
}
86+
}
87+
88+
pub fn with_retries(mut self, retries: usize) -> Self {
89+
self.config = self.config.with_max_retries(retries);
90+
self
91+
}
92+
93+
pub fn with_user_agent(mut self, user_agent: impl Into<String>) -> Self {
94+
self.config = self.config.with_user_agent(user_agent);
95+
self
96+
}
97+
98+
/// Set how often progress updates are sent
99+
pub fn with_progress_interval(mut self, interval: Duration) -> Self {
100+
self.config = self.config.with_progress_interval(interval);
101+
self
102+
}
103+
104+
pub fn with_config(mut self, config: DownloadConfig) -> Self {
105+
self.config = config;
106+
self
107+
}
108+
109+
pub fn url(&self) -> &Url {
110+
&self.url
111+
}
112+
113+
pub fn config(&self) -> &DownloadConfig {
114+
&self.config
115+
}
116+
117+
pub fn start(self) -> Result<DownloadHandle, Error> {
118+
if self.manager.is_cancelled() {
119+
return Err(Error::Download(DownloadError::ManagerShutdown));
120+
}
121+
122+
if self.destination.exists() {
123+
return Err(Error::Download(DownloadError::FileExists {
124+
path: self.destination,
125+
}));
126+
}
127+
128+
let cancel = self.manager.child_token();
129+
let (req, handle) = DownloadRequest::new(self.url, self.destination, cancel, self.config);
130+
131+
self.manager.queue_request(req)?;
132+
133+
Ok(handle)
134+
}
135+
}

src/downloader/types.rs

Lines changed: 0 additions & 78 deletions
This file was deleted.

src/error.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,6 @@ pub enum DownloadError {
2727
ManagerShutdown,
2828
#[error("File already exists: {path}")]
2929
FileExists { path: PathBuf },
30+
#[error("Invalid URL")]
31+
InvalidUrl,
3032
}

0 commit comments

Comments
 (0)