Skip to content

Commit 3ff5768

Browse files
downloader: Add progress updates
1 parent 572fdd7 commit 3ff5768

File tree

4 files changed

+156
-17
lines changed

4 files changed

+156
-17
lines changed

crates/download-manager/src/download.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
1-
use crate::{DownloadError, DownloadEvent, DownloadID};
1+
use crate::{DownloadError, DownloadEvent, DownloadID, Progress};
22
use futures_core::Stream;
33
use std::path::PathBuf;
4-
use tokio::sync::{broadcast, oneshot};
5-
use tokio_stream::wrappers::BroadcastStream;
4+
use tokio::sync::{broadcast, oneshot, watch};
5+
use tokio_stream::wrappers::{BroadcastStream, WatchStream};
66
use tokio_util::sync::CancellationToken;
77

88
pub struct Download {
99
id: DownloadID,
10+
progress: watch::Receiver<Progress>,
1011
events: broadcast::Receiver<DownloadEvent>,
1112
result: oneshot::Receiver<Result<DownloadResult, DownloadError>>,
1213

@@ -16,12 +17,14 @@ pub struct Download {
1617
impl Download {
1718
pub fn new(
1819
id: DownloadID,
20+
progress: watch::Receiver<Progress>,
1921
events: broadcast::Receiver<DownloadEvent>,
2022
result: oneshot::Receiver<Result<DownloadResult, DownloadError>>,
2123
cancel_token: CancellationToken,
2224
) -> Self {
2325
Download {
2426
id,
27+
progress,
2528
events,
2629
result,
2730
cancel_token,
@@ -36,6 +39,14 @@ impl Download {
3639
self.cancel_token.cancel();
3740
}
3841

42+
pub fn progress_raw(&self) -> watch::Receiver<Progress> {
43+
self.progress.clone()
44+
}
45+
46+
pub fn progress(&self) -> impl Stream<Item = Progress> + 'static {
47+
WatchStream::new(self.progress_raw())
48+
}
49+
3950
pub fn events(&self) -> impl Stream<Item = DownloadEvent> + 'static {
4051
use tokio_stream::StreamExt as _;
4152

crates/download-manager/src/events.rs

Lines changed: 116 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::DownloadID;
22
use reqwest::Url;
33
use std::path::PathBuf;
4-
use std::time::Instant;
4+
use std::time::{Duration, Instant};
55

66
#[derive(Debug, Clone)]
77
pub enum DownloadEvent {
@@ -39,23 +39,131 @@ pub enum DownloadEvent {
3939
pub struct Progress {
4040
pub bytes_downloaded: u64,
4141
pub total_bytes: Option<u64>,
42-
pub instantaneous_bps: f64,
43-
pub avg_bps: f64,
4442

43+
pub instantaneous_bps: f64, // most recent sample
44+
pub ema_bps: f64, // exponential moving average
45+
46+
// Internal timing / sampling
4547
started_at: Instant,
46-
updated_at: Instant,
48+
updated_at: Instant, // last time we saw any bytes
49+
last_sample_at: Instant, // last time we recomputed instantaneous_bps
50+
last_sample_bytes: u64, // bytes_downloaded at last sample
51+
52+
// Sampling thresholds
53+
pub min_sample_interval: Duration,
54+
pub min_sample_bytes: u64,
55+
56+
ema_alpha: f64, // smoothing factor for EMA
4757
}
4858

4959
impl Progress {
50-
pub fn new() -> Self {
60+
pub fn new(total_bytes: Option<u64>) -> Self {
5161
let now = Instant::now();
5262
Progress {
5363
bytes_downloaded: 0,
54-
total_bytes: None,
64+
total_bytes,
65+
instantaneous_bps: 0.0,
66+
ema_bps: 0.0,
5567
started_at: now,
5668
updated_at: now,
57-
instantaneous_bps: 0.0,
58-
avg_bps: 0.0,
69+
last_sample_at: now,
70+
last_sample_bytes: 0,
71+
ema_alpha: 0.2,
72+
min_sample_bytes: 64 * 1024, // 64 KiB
73+
min_sample_interval: Duration::from_millis(200),
74+
}
75+
}
76+
77+
pub fn with_sample_interval(mut self, min_sample_interval: Duration) -> Self {
78+
self.min_sample_interval = min_sample_interval;
79+
self
80+
}
81+
82+
pub fn with_sample_bytes(mut self, min_sample_bytes: u64) -> Self {
83+
self.min_sample_bytes = min_sample_bytes;
84+
self
85+
}
86+
87+
pub fn with_ema_alpha(mut self, ema_alpha: f64) -> Self {
88+
self.ema_alpha = ema_alpha;
89+
self
90+
}
91+
92+
pub fn bytes_downloaded(&self) -> u64 {
93+
self.bytes_downloaded
94+
}
95+
96+
pub fn elapsed(&self) -> Duration {
97+
self.started_at.elapsed()
98+
}
99+
100+
pub fn percent(&self) -> Option<f64> {
101+
self.total_bytes
102+
.filter(|&total| total > 0)
103+
.map(|total| (self.bytes_downloaded as f64 / total as f64) * 100.0)
104+
}
105+
106+
pub fn remaining_bytes(&self) -> Option<u64> {
107+
self.total_bytes
108+
.map(|t| t.saturating_sub(self.bytes_downloaded))
109+
}
110+
111+
pub fn eta(&self) -> Option<Duration> {
112+
let remaining = self.remaining_bytes()?;
113+
if self.ema_bps > 0.0 {
114+
Some(Duration::from_secs_f64(remaining as f64 / self.ema_bps))
115+
} else {
116+
None
117+
}
118+
}
119+
120+
pub fn update(&mut self, chunk_len: u64) -> bool {
121+
let now = Instant::now();
122+
self.bytes_downloaded += chunk_len;
123+
self.updated_at = now;
124+
125+
let dt = now.duration_since(self.last_sample_at);
126+
let byte_delta = self.bytes_downloaded - self.last_sample_bytes;
127+
128+
if dt >= self.min_sample_interval || byte_delta >= self.min_sample_bytes {
129+
let secs = dt.as_secs_f64();
130+
if secs > 0.0 && byte_delta > 0 {
131+
// instantaneous over sample window
132+
let inst = (byte_delta as f64) / secs;
133+
self.instantaneous_bps = inst;
134+
135+
// EMA (if first sample, seed with inst)
136+
if self.ema_bps <= 0.0 {
137+
self.ema_bps = inst;
138+
} else {
139+
self.ema_bps = self.ema_alpha * inst + (1.0 - self.ema_alpha) * self.ema_bps;
140+
}
141+
}
142+
143+
self.last_sample_at = now;
144+
self.last_sample_bytes = self.bytes_downloaded;
145+
return true;
146+
}
147+
148+
false
149+
}
150+
151+
pub fn force_update(&mut self) {
152+
let now = Instant::now();
153+
let dt = now.duration_since(self.last_sample_at);
154+
let byte_delta = self.bytes_downloaded - self.last_sample_bytes;
155+
156+
if dt.as_secs_f64() > 0.0 && byte_delta > 0 {
157+
let inst = (byte_delta as f64) / dt.as_secs_f64();
158+
self.instantaneous_bps = inst;
159+
if self.ema_bps <= 0.0 {
160+
self.ema_bps = inst;
161+
} else {
162+
self.ema_bps = self.ema_alpha * inst + (1.0 - self.ema_alpha) * self.ema_bps;
163+
}
164+
self.last_sample_at = now;
165+
self.last_sample_bytes = self.bytes_downloaded;
59166
}
167+
self.updated_at = now;
60168
}
61169
}

crates/download-manager/src/request.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use crate::{Download, DownloadError, DownloadEvent, DownloadID, DownloadManager, DownloadResult};
1+
use crate::{
2+
Download, DownloadError, DownloadEvent, DownloadID, DownloadManager, DownloadResult, Progress,
3+
};
24
use derive_builder::Builder;
35
use reqwest::Url;
46
use std::{
@@ -14,6 +16,7 @@ pub struct Request {
1416
destination: PathBuf,
1517
config: DownloadConfig,
1618

19+
progress: watch::Sender<Progress>,
1720
events: broadcast::Sender<DownloadEvent>,
1821
result: oneshot::Sender<Result<DownloadResult, DownloadError>>,
1922

@@ -98,6 +101,11 @@ impl Request {
98101
let _ = self.result.send(result);
99102
}
100103

104+
pub fn update_progress(&self, progress: Progress) {
105+
// TODO: Log the error
106+
let _ = self.progress.send(progress);
107+
}
108+
101109
pub fn start(&self) {
102110
self.emit(DownloadEvent::Started {
103111
id: self.id(),
@@ -175,6 +183,7 @@ impl RequestBuilder<'_> {
175183
.ok_or_else(|| anyhow::anyhow!("Destination must be set"))?;
176184
let config = self.config.build()?;
177185

186+
let (progress_tx, progress_rx) = watch::channel(Progress::new(None));
178187
let (result_tx, result_rx) = oneshot::channel();
179188
let cancel_token = self.manager.child_token();
180189

@@ -186,13 +195,20 @@ impl RequestBuilder<'_> {
186195
url,
187196
destination,
188197
config,
198+
progress: progress_tx,
189199
events: event_tx,
190200
result: result_tx,
191201
cancel_token: cancel_token.clone(),
192202
};
193203

194204
self.manager.queue_request(request)?;
195205

196-
Ok(Download::new(id, event_rx, result_rx, cancel_token))
206+
Ok(Download::new(
207+
id,
208+
progress_rx,
209+
event_rx,
210+
result_rx,
211+
cancel_token,
212+
))
197213
}
198214
}

crates/download-manager/src/worker.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::{DownloadError, DownloadResult, Request};
1+
use crate::{DownloadError, DownloadResult, Progress, Request};
22
use reqwest::Client;
33
use std::time::Duration;
44
use tokio::{fs::File, io::AsyncWriteExt};
@@ -84,7 +84,7 @@ async fn attempt_download(
8484
}
8585

8686
let mut file = File::create(&request.destination()).await?;
87-
let mut bytes_downloaded = 0;
87+
let mut progress = Progress::new(response.content_length());
8888
loop {
8989
tokio::select! {
9090
_ = request.cancel_token.cancelled() => {
@@ -96,7 +96,9 @@ async fn attempt_download(
9696
match chunk {
9797
Ok(Some(chunk)) => {
9898
file.write_all(&chunk).await?;
99-
bytes_downloaded += chunk.len() as u64;
99+
if progress.update(chunk.len() as u64) {
100+
request.update_progress(progress);
101+
}
100102
}
101103
Ok(None) => break,
102104
Err(e) => {
@@ -108,12 +110,14 @@ async fn attempt_download(
108110
}
109111
}
110112
}
113+
progress.force_update();
114+
request.update_progress(progress);
111115

112116
// Ensure the data is written to disk
113117
file.sync_all().await?;
114118

115119
Ok(DownloadResult {
116120
path: request.destination().to_path_buf(),
117-
bytes_downloaded,
121+
bytes_downloaded: progress.bytes_downloaded(),
118122
})
119123
}

0 commit comments

Comments
 (0)