Skip to content

Commit 7c2d4cd

Browse files
downloader: Add event based updates
1 parent 735a4ef commit 7c2d4cd

File tree

6 files changed

+125
-53
lines changed

6 files changed

+125
-53
lines changed

crates/download-manager/src/context.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ use std::sync::{
33
Arc,
44
atomic::{AtomicU64, AtomicUsize, Ordering},
55
};
6-
use tokio::sync::Semaphore;
6+
use tokio::sync::{Semaphore, broadcast};
77
use tokio_util::sync::CancellationToken;
88

9+
use crate::DownloadEvent;
10+
911
pub type DownloadID = u64;
1012

1113
#[derive(Debug)]
@@ -18,17 +20,21 @@ pub(crate) struct Context {
1820
pub id_counter: AtomicU64,
1921
pub active: AtomicUsize,
2022
pub max_concurrent: AtomicUsize,
23+
24+
pub events: broadcast::Sender<DownloadEvent>,
2125
}
2226

2327
impl Context {
2428
pub fn new(max_concurrent: usize) -> Arc<Self> {
29+
let (events, _) = broadcast::channel(1024);
2530
Arc::new(Self {
2631
semaphore: Arc::new(Semaphore::new(max_concurrent)),
2732
max_concurrent: AtomicUsize::new(max_concurrent),
2833
cancel_root: CancellationToken::new(),
2934
active: AtomicUsize::new(0),
3035
id_counter: AtomicU64::new(1),
3136
client: Client::new(),
37+
events,
3238
})
3339
}
3440

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
1-
use crate::{DownloadError, DownloadID};
2-
use anyhow::anyhow;
1+
use crate::{DownloadError, DownloadEvent, DownloadID};
32
use std::path::PathBuf;
4-
use tokio::sync::{oneshot, watch};
3+
use tokio::sync::{broadcast, oneshot};
54
use tokio_util::sync::CancellationToken;
65

76
pub struct Download {
87
id: DownloadID,
9-
status: watch::Receiver<Status>,
8+
events: broadcast::Receiver<DownloadEvent>,
109
result: oneshot::Receiver<Result<DownloadResult, DownloadError>>,
1110

1211
cancel_token: CancellationToken,
@@ -15,13 +14,13 @@ pub struct Download {
1514
impl Download {
1615
pub fn new(
1716
id: DownloadID,
18-
status: watch::Receiver<Status>,
17+
events: broadcast::Receiver<DownloadEvent>,
1918
result: oneshot::Receiver<Result<DownloadResult, DownloadError>>,
2019
cancel_token: CancellationToken,
2120
) -> Self {
2221
Download {
2322
id,
24-
status,
23+
events,
2524
result,
2625
cancel_token,
2726
}
@@ -34,10 +33,6 @@ impl Download {
3433
pub fn cancel(&self) {
3534
self.cancel_token.cancel();
3635
}
37-
38-
pub fn status(&self) -> Status {
39-
*self.status.borrow()
40-
}
4136
}
4237

4338
impl std::future::Future for Download {
@@ -58,17 +53,8 @@ impl std::future::Future for Download {
5853
}
5954
}
6055

61-
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
62-
pub enum Status {
63-
Queued,
64-
Running,
65-
Retrying(u32),
66-
Completed,
67-
Cancelled,
68-
Failed,
69-
}
70-
7156
#[derive(Debug)]
7257
pub struct DownloadResult {
7358
pub path: PathBuf,
59+
pub bytes_downloaded: u64,
7460
}

crates/download-manager/src/download_manager.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
mod context;
22
mod download;
33
mod error;
4+
mod events;
45
mod request;
56
mod worker;
67

78
use crate::{context::Context, request::RequestBuilder, worker::download_thread};
89
pub use crate::{
910
context::DownloadID,
10-
download::{Download, DownloadResult, Status},
11+
download::{Download, DownloadResult},
1112
error::DownloadError,
13+
events::{DownloadEvent, Progress},
1214
request::Request,
1315
};
1416
use reqwest::Url;

crates/download-manager/src/events.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
use crate::DownloadID;
2+
use reqwest::Url;
3+
use std::path::PathBuf;
4+
use std::time::Instant;
5+
6+
#[derive(Debug, Clone)]
7+
pub enum DownloadEvent {
8+
Queued {
9+
id: DownloadID,
10+
url: Url,
11+
destination: PathBuf,
12+
},
13+
Started {
14+
id: DownloadID,
15+
url: Url,
16+
destination: PathBuf,
17+
total_bytes: Option<u64>,
18+
},
19+
Retrying {
20+
id: DownloadID,
21+
attempt: u32,
22+
next_delay_ms: u64,
23+
},
24+
Completed {
25+
id: DownloadID,
26+
path: PathBuf,
27+
bytes_downloaded: u64,
28+
},
29+
Failed {
30+
id: DownloadID,
31+
error: String,
32+
},
33+
Cancelled {
34+
id: DownloadID,
35+
},
36+
}
37+
38+
#[derive(Debug, Clone, Copy)]
39+
pub struct Progress {
40+
pub bytes_downloaded: u64,
41+
pub total_bytes: Option<u64>,
42+
pub instantaneous_bps: f64,
43+
pub avg_bps: f64,
44+
45+
started_at: Instant,
46+
updated_at: Instant,
47+
}
48+
49+
impl Progress {
50+
pub fn new() -> Self {
51+
let now = Instant::now();
52+
Progress {
53+
bytes_downloaded: 0,
54+
total_bytes: None,
55+
started_at: now,
56+
updated_at: now,
57+
instantaneous_bps: 0.0,
58+
avg_bps: 0.0,
59+
}
60+
}
61+
}

crates/download-manager/src/request.rs

Lines changed: 39 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1-
use crate::{Download, DownloadError, DownloadID, DownloadManager, DownloadResult, Status};
1+
use crate::{Download, DownloadError, DownloadEvent, DownloadID, DownloadManager, DownloadResult};
22
use derive_builder::Builder;
33
use reqwest::Url;
4-
use std::path::{Path, PathBuf};
5-
use tokio::sync::{oneshot, watch};
4+
use std::{
5+
path::{Path, PathBuf},
6+
time::Duration,
7+
};
8+
use tokio::sync::{broadcast, oneshot, watch};
69
use tokio_util::sync::CancellationToken;
710

811
pub struct Request {
@@ -11,7 +14,7 @@ pub struct Request {
1114
destination: PathBuf,
1215
config: DownloadConfig,
1316

14-
status: watch::Sender<Status>,
17+
events: broadcast::Sender<DownloadEvent>,
1518
result: oneshot::Sender<Result<DownloadResult, DownloadError>>,
1619

1720
pub cancel_token: CancellationToken,
@@ -62,6 +65,10 @@ impl Request {
6265
}
6366
}
6467

68+
pub fn id(&self) -> DownloadID {
69+
self.id
70+
}
71+
6572
/* TODO:
6673
* Add callbacks like `on_update`, `on_progress`, `on_complete`, etc.
6774
*/
@@ -81,39 +88,48 @@ impl Request {
8188
self.cancel_token.is_cancelled()
8289
}
8390

84-
fn mark_status(&self, status: Status) {
91+
fn emit(&self, event: DownloadEvent) {
8592
// TODO: Log the error
86-
let _ = self.status.send(status);
93+
let _ = self.events.send(event);
8794
}
8895

8996
fn send_result(self, result: Result<DownloadResult, DownloadError>) {
9097
// TODO: Log the error
9198
let _ = self.result.send(result);
9299
}
93100

94-
pub fn mark_running(&self) {
95-
self.mark_status(Status::Running)
101+
pub fn start(&self) {
102+
self.emit(DownloadEvent::Started {
103+
id: self.id(),
104+
url: self.url().clone(),
105+
destination: self.destination.clone(),
106+
total_bytes: None,
107+
});
96108
}
97109

98-
pub fn mark_failed(self, error: DownloadError) {
99-
match error {
100-
DownloadError::Cancelled => self.mark_status(Status::Cancelled),
101-
_ => self.mark_status(Status::Failed),
102-
}
110+
pub fn fail(self, error: DownloadError) {
103111
self.send_result(Err(error));
104112
}
105113

106-
pub fn mark_completed(self, result: DownloadResult) {
107-
self.mark_status(Status::Completed);
114+
pub fn finish(self, result: DownloadResult) {
115+
self.emit(DownloadEvent::Completed {
116+
id: self.id(),
117+
path: result.path.clone(),
118+
bytes_downloaded: result.bytes_downloaded,
119+
});
108120
self.send_result(Ok(result))
109121
}
110122

111-
pub fn mark_retrying(&self, retry_count: u32) {
112-
self.mark_status(Status::Retrying(retry_count))
123+
pub fn retry(&self, attempt: u32, delay: Duration) {
124+
self.emit(DownloadEvent::Retrying {
125+
id: self.id(),
126+
attempt,
127+
next_delay_ms: delay.as_millis() as u64,
128+
});
113129
}
114130

115-
pub fn mark_cancelled(self) {
116-
self.mark_status(Status::Cancelled);
131+
pub fn cancel(self) {
132+
self.emit(DownloadEvent::Cancelled { id: self.id() });
117133
self.send_result(Err(DownloadError::Cancelled))
118134
}
119135
}
@@ -159,23 +175,24 @@ impl RequestBuilder<'_> {
159175
.ok_or_else(|| anyhow::anyhow!("Destination must be set"))?;
160176
let config = self.config.build()?;
161177

162-
let (status_tx, status_rx) = watch::channel(Status::Queued);
163178
let (result_tx, result_rx) = oneshot::channel();
164179
let cancel_token = self.manager.child_token();
165180

181+
let event_tx = self.manager.ctx.events.clone();
182+
let event_rx = event_tx.subscribe();
166183
let id = self.manager.ctx.next_id();
167184
let request = Request {
168185
id,
169186
url,
170187
destination,
171188
config,
172-
status: status_tx,
189+
events: event_tx,
173190
result: result_tx,
174191
cancel_token: cancel_token.clone(),
175192
};
176193

177194
self.manager.queue_request(request)?;
178195

179-
Ok(Download::new(id, status_rx, result_rx, cancel_token))
196+
Ok(Download::new(id, event_rx, result_rx, cancel_token))
180197
}
181198
}

crates/download-manager/src/worker.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,45 +21,43 @@ const BACKOFF_STRATEGY: ExponentialBackoff = ExponentialBackoff {
2121
};
2222

2323
pub(super) async fn download_thread(client: Client, mut request: Request) {
24-
request.mark_running();
24+
request.start();
2525
let mut last_retryable_error: DownloadError =
2626
DownloadError::Unknown("Unknown Error".to_string());
2727

2828
let retries = request.config().retries();
2929
for attempt in 0..=retries {
3030
if attempt > 0 {
31-
request.mark_retrying(attempt);
32-
33-
//TODO: Add proper backoff
3431
let delay = BACKOFF_STRATEGY.next_delay(attempt);
3532
let mut interval = tokio::time::interval(delay);
3633

34+
request.retry(attempt, delay);
3735
tokio::select! {
3836
_ = interval.tick() => {},
3937
_ = request.cancel_token.cancelled() => {
40-
request.mark_cancelled();
38+
request.cancel();
4139
return;
4240
}
4341
}
4442
}
4543

4644
match attempt_download(client.clone(), &mut request).await {
4745
Ok(result) => {
48-
request.mark_completed(result);
46+
request.finish(result);
4947
return;
5048
}
5149
Err(error) if error.is_retryable() => {
5250
last_retryable_error = error;
5351
continue;
5452
}
5553
Err(error) => {
56-
request.mark_failed(error);
54+
request.fail(error);
5755
return;
5856
}
5957
};
6058
}
6159

62-
request.mark_failed(DownloadError::RetriesExhausted {
60+
request.fail(DownloadError::RetriesExhausted {
6361
last_error: Box::new(last_retryable_error),
6462
});
6563
}
@@ -86,7 +84,7 @@ async fn attempt_download(
8684
}
8785

8886
let mut file = File::create(&request.destination()).await?;
89-
87+
let mut bytes_downloaded = 0;
9088
loop {
9189
tokio::select! {
9290
_ = request.cancel_token.cancelled() => {
@@ -98,6 +96,7 @@ async fn attempt_download(
9896
match chunk {
9997
Ok(Some(chunk)) => {
10098
file.write_all(&chunk).await?;
99+
bytes_downloaded += chunk.len() as u64;
101100
}
102101
Ok(None) => break,
103102
Err(e) => {
@@ -115,5 +114,6 @@ async fn attempt_download(
115114

116115
Ok(DownloadResult {
117116
path: request.destination().to_path_buf(),
117+
bytes_downloaded,
118118
})
119119
}

0 commit comments

Comments
 (0)