Skip to content

Commit 5566d67

Browse files
downloader: Add better progress updates with rate limiting
1 parent 3a14a0d commit 5566d67

File tree

4 files changed

+143
-19
lines changed

4 files changed

+143
-19
lines changed

src/downloader/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
mod handle;
22
mod manager;
3+
mod progress;
34
mod types;
45
mod worker;
56

67
pub use handle::DownloadHandle;
78
pub use manager::*;
9+
pub use progress::DownloadProgress;
810
pub use types::*;
911
pub(self) use worker::download_thread;

src/downloader/progress.rs

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
use std::time::{Duration, Instant};
2+
3+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
4+
pub struct DownloadProgress {
5+
bytes_downloaded: u64,
6+
total_bytes: Option<u64>,
7+
speed_bps: Option<u64>,
8+
eta: Option<Duration>,
9+
10+
// For calculations
11+
start_time: Instant,
12+
last_update: Instant,
13+
last_speed_update: Instant,
14+
last_bytes_for_speed: u64,
15+
update_interval: Duration,
16+
}
17+
18+
impl std::fmt::Display for DownloadProgress {
19+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20+
let percent = self.percent().unwrap_or(0.0);
21+
let speed = self
22+
.speed()
23+
.map(|s| format!("{:.2} B/s", s))
24+
.unwrap_or("N/A".to_string());
25+
let eta = self
26+
.eta
27+
.map(|d| format!("{:.2?}", d))
28+
.unwrap_or("N/A".to_string());
29+
let elapsed = self.elapsed();
30+
31+
write!(
32+
f,
33+
"Downloaded: {} bytes, Total: {:?}, Speed: {}, ETA: {}, Elapsed: {:.2?}, Progress: {:.2}%",
34+
self.bytes_downloaded,
35+
self.total_bytes,
36+
speed,
37+
eta,
38+
elapsed,
39+
percent
40+
)
41+
}
42+
}
43+
44+
impl DownloadProgress {
45+
pub fn new(bytes_downloaded: u64, total_bytes: Option<u64>, update_interval: Duration) -> Self {
46+
let now = Instant::now();
47+
Self {
48+
bytes_downloaded,
49+
total_bytes,
50+
speed_bps: None,
51+
eta: None,
52+
53+
start_time: now,
54+
last_update: now,
55+
last_speed_update: now,
56+
last_bytes_for_speed: bytes_downloaded,
57+
update_interval,
58+
}
59+
}
60+
61+
pub fn update(&self, bytes_downloaded: u64) -> Option<Self> {
62+
fn new_update(
63+
progress: &DownloadProgress,
64+
bytes_downloaded: u64,
65+
instant: Instant,
66+
) -> DownloadProgress {
67+
DownloadProgress {
68+
eta: None,
69+
last_update: instant,
70+
bytes_downloaded,
71+
total_bytes: progress.total_bytes,
72+
speed_bps: progress.speed_bps,
73+
start_time: progress.start_time,
74+
last_speed_update: progress.last_speed_update,
75+
last_bytes_for_speed: progress.last_bytes_for_speed,
76+
update_interval: progress.update_interval,
77+
}
78+
}
79+
80+
let now = Instant::now();
81+
82+
if now.duration_since(self.last_update) < self.update_interval {
83+
return None;
84+
}
85+
let mut new_update = new_update(self, bytes_downloaded, now);
86+
87+
if now.duration_since(self.last_speed_update) >= Duration::from_secs(1) {
88+
let byte_diff = (bytes_downloaded - self.last_bytes_for_speed) as f64;
89+
let time_diff = now.duration_since(self.last_speed_update).as_secs_f64();
90+
91+
new_update.last_speed_update = now;
92+
new_update.last_bytes_for_speed = bytes_downloaded;
93+
new_update.speed_bps = Some((byte_diff / time_diff) as u64);
94+
};
95+
96+
if let (Some(speed), Some(total)) = (new_update.speed_bps, self.total_bytes) {
97+
if speed > 0 {
98+
let remaining = total.saturating_sub(bytes_downloaded);
99+
new_update.eta = Some(Duration::from_secs(remaining / speed));
100+
}
101+
};
102+
103+
Some(new_update)
104+
}
105+
106+
pub fn percent(&self) -> Option<f64> {
107+
self.total_bytes.map(|total| {
108+
if total == 0 {
109+
0.0
110+
} else {
111+
(self.bytes_downloaded as f64 / total as f64) * 100.0
112+
}
113+
})
114+
}
115+
116+
pub fn total_bytes(&self) -> Option<u64> {
117+
self.total_bytes
118+
}
119+
120+
pub fn bytes_downloaded(&self) -> u64 {
121+
self.bytes_downloaded
122+
}
123+
124+
pub fn speed(&self) -> Option<u64> {
125+
self.speed_bps
126+
}
127+
128+
pub fn elapsed(&self) -> Duration {
129+
self.start_time.elapsed()
130+
}
131+
}

src/downloader/types.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1+
use super::DownloadProgress;
12
use crate::Error;
23
use reqwest::Url;
3-
use std::path::{Path, PathBuf};
4+
use std::path::PathBuf;
45
use tokio::{
56
fs::File,
67
sync::{oneshot, watch},
@@ -16,12 +17,6 @@ pub(crate) struct DownloadRequest {
1617
pub cancel: CancellationToken,
1718
}
1819

19-
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
20-
pub struct DownloadProgress {
21-
pub bytes_downloaded: u64,
22-
pub total_bytes: Option<u64>,
23-
}
24-
2520
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2621
pub enum Status {
2722
Queued,

src/downloader/worker.rs

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -80,15 +80,6 @@ pub(super) async fn download_thread(client: Client, mut req: DownloadRequest) {
8080
}
8181

8282
async fn download(client: Client, req: &mut DownloadRequest) -> Result<File, Error> {
83-
let update_progress = |bytes_downloaded: u64, total_bytes: Option<u64>| {
84-
req.status
85-
.send(Status::InProgress(DownloadProgress {
86-
bytes_downloaded,
87-
total_bytes,
88-
}))
89-
.ok();
90-
};
91-
9283
let mut response = client
9384
.get(req.url.as_ref())
9485
.send()
@@ -103,7 +94,10 @@ async fn download(client: Client, req: &mut DownloadRequest) -> Result<File, Err
10394
}
10495
let mut file = File::create(&req.destination).await?;
10596

106-
update_progress(bytes_downloaded, total_bytes);
97+
let update_interval = Duration::from_millis(250);
98+
let mut progress = DownloadProgress::new(bytes_downloaded, total_bytes, update_interval);
99+
req.status.send(Status::InProgress(progress)).ok();
100+
107101
loop {
108102
tokio::select! {
109103
_ = req.cancel.cancelled() => {
@@ -116,7 +110,10 @@ async fn download(client: Client, req: &mut DownloadRequest) -> Result<File, Err
116110
Ok(Some(chunk)) => {
117111
file.write_all(&chunk).await?;
118112
bytes_downloaded += chunk.len() as u64;
119-
update_progress(bytes_downloaded, total_bytes);
113+
if let Some(new_progress) = progress.update(bytes_downloaded) {
114+
progress = new_progress;
115+
}
116+
req.status.send(Status::InProgress(progress)).ok();
120117
}
121118
Ok(None) => break,
122119
Err(e) => {
@@ -128,7 +125,6 @@ async fn download(client: Client, req: &mut DownloadRequest) -> Result<File, Err
128125
}
129126
}
130127
}
131-
update_progress(bytes_downloaded, total_bytes);
132128

133129
// Ensure the data is written to disk
134130
file.sync_all().await?;

0 commit comments

Comments
 (0)