Skip to content

Commit 837bf58

Browse files
feat(downloads): concurrently download components
Some notifications needed to be updated to include the download URL, enabling the identification of the component being downloaded. This was necessary for accurate progress reporting of each component.
1 parent e43263a commit 837bf58

File tree

7 files changed

+138
-72
lines changed

7 files changed

+138
-72
lines changed

src/cli/download_tracker.rs

Lines changed: 60 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use indicatif::{MultiProgress, ProgressBar, ProgressDrawTarget, ProgressStyle};
2-
use std::time::Duration;
2+
use std::collections::HashMap;
33

44
use crate::dist::Notification as In;
55
use crate::notifications::Notification;
@@ -13,8 +13,8 @@ use crate::utils::Notification as Un;
1313
pub(crate) struct DownloadTracker {
1414
/// MultiProgress bar for the downloads.
1515
multi_progress_bars: MultiProgress,
16-
/// ProgressBar for the current download.
17-
progress_bar: ProgressBar,
16+
/// Mapping of files to their corresponding progress bars.
17+
file_progress_bars: HashMap<String, ProgressBar>,
1818
}
1919

2020
impl DownloadTracker {
@@ -28,22 +28,29 @@ impl DownloadTracker {
2828

2929
Self {
3030
multi_progress_bars,
31-
progress_bar: ProgressBar::hidden(),
31+
file_progress_bars: HashMap::new(),
3232
}
3333
}
3434

3535
pub(crate) fn handle_notification(&mut self, n: &Notification<'_>) -> bool {
3636
match *n {
37-
Notification::Install(In::Utils(Un::DownloadContentLengthReceived(content_len))) => {
38-
self.content_length_received(content_len);
37+
Notification::Install(In::Utils(Un::DownloadContentLengthReceived(
38+
content_len,
39+
file,
40+
))) => {
41+
self.content_length_received(content_len, file);
3942
true
4043
}
41-
Notification::Install(In::Utils(Un::DownloadDataReceived(data))) => {
42-
self.data_received(data.len());
44+
Notification::Install(In::Utils(Un::DownloadDataReceived(data, file))) => {
45+
self.data_received(data.len(), file);
4346
true
4447
}
45-
Notification::Install(In::Utils(Un::DownloadFinished)) => {
46-
self.download_finished();
48+
Notification::Install(In::Utils(Un::DownloadFinished(file))) => {
49+
self.download_finished(file);
50+
true
51+
}
52+
Notification::Install(In::DownloadingComponent(component, _, _)) => {
53+
self.create_progress_bar(component);
4754
true
4855
}
4956
Notification::Install(In::Utils(Un::DownloadPushUnit(_))) => true,
@@ -53,30 +60,60 @@ impl DownloadTracker {
5360
}
5461
}
5562

56-
/// Sets the length for a new ProgressBar and gives it a style.
57-
pub(crate) fn content_length_received(&mut self, content_len: u64) {
58-
self.progress_bar.set_length(content_len);
59-
self.progress_bar.set_style(
63+
/// Helper function to find the progress bar for a given file.
64+
fn progress_bar(&mut self, file: &str) -> Option<&ProgressBar> {
65+
// During the installation this function can be called with an empty file/URL.
66+
if file.is_empty() {
67+
return None;
68+
}
69+
let component = self
70+
.file_progress_bars
71+
.keys()
72+
.find(|comp| file.contains(*comp))?;
73+
74+
self.file_progress_bars.get(component)
75+
}
76+
77+
/// Creates a new ProgressBar for the given component.
78+
pub(crate) fn create_progress_bar(&mut self, component: &str) {
79+
let pb = ProgressBar::hidden();
80+
pb.set_style(
6081
ProgressStyle::with_template(
61-
"[{bar:40}] {bytes}/{total_bytes} ({bytes_per_sec}, ETA: {eta})",
82+
"{msg:>12.bold} [{bar:40}] {bytes}/{total_bytes} ({bytes_per_sec}, ETA: {eta})",
6283
)
6384
.unwrap()
6485
.progress_chars("## "),
6586
);
87+
pb.set_message(component.to_string());
88+
self.multi_progress_bars.add(pb.clone());
89+
self.file_progress_bars.insert(component.to_string(), pb);
90+
}
91+
92+
/// Sets the length for a new ProgressBar and gives it a style.
93+
pub(crate) fn content_length_received(&mut self, content_len: u64, file: &str) {
94+
if let Some(pb) = self.progress_bar(file) {
95+
pb.set_length(content_len);
96+
}
6697
}
6798

6899
/// Notifies self that data of size `len` has been received.
69-
pub(crate) fn data_received(&mut self, len: usize) {
70-
if self.progress_bar.is_hidden() && self.progress_bar.elapsed() >= Duration::from_secs(1) {
71-
self.multi_progress_bars.add(self.progress_bar.clone());
100+
pub(crate) fn data_received(&mut self, len: usize, file: &str) {
101+
if let Some(pb) = self.progress_bar(file) {
102+
pb.inc(len as u64);
72103
}
73-
self.progress_bar.inc(len as u64);
74104
}
75105

76106
/// Notifies self that the download has finished.
77-
pub(crate) fn download_finished(&mut self) {
78-
self.progress_bar.finish_and_clear();
79-
self.multi_progress_bars.remove(&self.progress_bar);
80-
self.progress_bar = ProgressBar::hidden();
107+
pub(crate) fn download_finished(&mut self, file: &str) {
108+
if let Some(pb) = self.progress_bar(file) {
109+
pb.set_style(
110+
ProgressStyle::with_template(
111+
"{msg:>12.bold} downloaded {total_bytes} in {elapsed}",
112+
)
113+
.unwrap(),
114+
);
115+
let msg = pb.message();
116+
pb.finish_with_message(msg);
117+
}
81118
}
82119
}

src/cli/self_update/windows.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,10 @@ pub(crate) async fn try_install_msvc(
274274
let download_tracker = Arc::new(Mutex::new(DownloadTracker::new_with_display_progress(
275275
true, process,
276276
)));
277-
download_tracker.lock().unwrap().download_finished();
277+
download_tracker
278+
.lock()
279+
.unwrap()
280+
.download_finished(visual_studio_url.as_str());
278281

279282
info!("downloading Visual Studio installer");
280283
download_file(

src/diskio/threaded.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -263,10 +263,11 @@ impl Executor for Threaded<'_> {
263263
// pretend to have bytes to deliver.
264264
let mut prev_files = self.n_files.load(Ordering::Relaxed);
265265
if let Some(handler) = self.notify_handler {
266-
handler(Notification::DownloadFinished);
266+
handler(Notification::DownloadFinished(""));
267267
handler(Notification::DownloadPushUnit(Unit::IO));
268268
handler(Notification::DownloadContentLengthReceived(
269269
prev_files as u64,
270+
"",
270271
));
271272
}
272273
if prev_files > 50 {
@@ -284,12 +285,12 @@ impl Executor for Threaded<'_> {
284285
current_files = self.n_files.load(Ordering::Relaxed);
285286
let step_count = prev_files - current_files;
286287
if let Some(handler) = self.notify_handler {
287-
handler(Notification::DownloadDataReceived(&buf[0..step_count]));
288+
handler(Notification::DownloadDataReceived(&buf[0..step_count], ""));
288289
}
289290
}
290291
self.pool.join();
291292
if let Some(handler) = self.notify_handler {
292-
handler(Notification::DownloadFinished);
293+
handler(Notification::DownloadFinished(""));
293294
handler(Notification::DownloadPopUnit);
294295
}
295296
// close the feedback channel so that blocking reads on it can

src/dist/manifestation.rs

Lines changed: 52 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ mod tests;
66

77
use std::path::Path;
88

9-
use anyhow::{Context, Result, anyhow, bail};
9+
use anyhow::{Context, Error, Result, anyhow, bail};
10+
use futures_util::stream::StreamExt;
1011
use tokio_retry::{RetryIf, strategy::FixedInterval};
12+
use tracing::info;
1113

1214
use crate::dist::component::{
1315
Components, Package, TarGzPackage, TarXzPackage, TarZStdPackage, Transaction,
@@ -153,6 +155,7 @@ impl Manifestation {
153155
let mut things_to_install: Vec<(Component, CompressionKind, File)> = Vec::new();
154156
let mut things_downloaded: Vec<String> = Vec::new();
155157
let components = update.components_urls_and_hashes(new_manifest)?;
158+
let components_len = components.len();
156159

157160
const DEFAULT_MAX_RETRIES: usize = 3;
158161
let max_retries: usize = download_cfg
@@ -162,41 +165,60 @@ impl Manifestation {
162165
.and_then(|s| s.parse().ok())
163166
.unwrap_or(DEFAULT_MAX_RETRIES);
164167

165-
for (component, format, url, hash) in components {
168+
info!("downloading component(s)");
169+
for (component, _, _, _) in components.clone() {
166170
(download_cfg.notify_handler)(Notification::DownloadingComponent(
167171
&component.short_name(new_manifest),
168172
&self.target_triple,
169173
component.target.as_ref(),
170174
));
171-
let url = if altered {
172-
url.replace(DEFAULT_DIST_SERVER, tmp_cx.dist_server.as_str())
173-
} else {
174-
url
175-
};
176-
177-
let url_url = utils::parse_url(&url)?;
178-
179-
let downloaded_file = RetryIf::spawn(
180-
FixedInterval::from_millis(0).take(max_retries),
181-
|| download_cfg.download(&url_url, &hash),
182-
|e: &anyhow::Error| {
183-
// retry only known retriable cases
184-
match e.downcast_ref::<RustupError>() {
185-
Some(RustupError::BrokenPartialFile)
186-
| Some(RustupError::DownloadingFile { .. }) => {
187-
(download_cfg.notify_handler)(Notification::RetryingDownload(&url));
188-
true
189-
}
190-
_ => false,
191-
}
192-
},
193-
)
194-
.await
195-
.with_context(|| RustupError::ComponentDownloadFailed(component.name(new_manifest)))?;
196-
197-
things_downloaded.push(hash);
175+
}
198176

199-
things_to_install.push((component, format, downloaded_file));
177+
let component_stream =
178+
tokio_stream::iter(components.into_iter()).map(|(component, format, url, hash)| {
179+
async move {
180+
let url = if altered {
181+
url.replace(DEFAULT_DIST_SERVER, tmp_cx.dist_server.as_str())
182+
} else {
183+
url
184+
};
185+
186+
let url_url = utils::parse_url(&url)?;
187+
188+
let downloaded_file = RetryIf::spawn(
189+
FixedInterval::from_millis(0).take(max_retries),
190+
|| download_cfg.download(&url_url, &hash),
191+
|e: &anyhow::Error| {
192+
// retry only known retriable cases
193+
match e.downcast_ref::<RustupError>() {
194+
Some(RustupError::BrokenPartialFile)
195+
| Some(RustupError::DownloadingFile { .. }) => {
196+
(download_cfg.notify_handler)(Notification::RetryingDownload(
197+
&url,
198+
));
199+
true
200+
}
201+
_ => false,
202+
}
203+
},
204+
)
205+
.await
206+
.with_context(|| {
207+
RustupError::ComponentDownloadFailed(component.name(new_manifest))
208+
})?;
209+
Ok::<_, Error>((component, format, downloaded_file, hash))
210+
}
211+
});
212+
if components_len > 0 {
213+
let results = component_stream
214+
.buffered(components_len)
215+
.collect::<Vec<_>>()
216+
.await;
217+
for result in results {
218+
let (component, format, downloaded_file, hash) = result?;
219+
things_downloaded.push(hash);
220+
things_to_install.push((component, format, downloaded_file));
221+
}
200222
}
201223

202224
// Begin transaction

src/download/mod.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -108,10 +108,13 @@ async fn download_file_(
108108

109109
match msg {
110110
Event::DownloadContentLengthReceived(len) => {
111-
notify_handler(Notification::DownloadContentLengthReceived(len));
111+
notify_handler(Notification::DownloadContentLengthReceived(
112+
len,
113+
url.as_str(),
114+
));
112115
}
113116
Event::DownloadDataReceived(data) => {
114-
notify_handler(Notification::DownloadDataReceived(data));
117+
notify_handler(Notification::DownloadDataReceived(data, url.as_str()));
115118
}
116119
Event::ResumingPartialDownload => {
117120
notify_handler(Notification::ResumingPartialDownload);
@@ -205,7 +208,7 @@ async fn download_file_(
205208
.download_to_path(url, path, resume_from_partial, Some(callback))
206209
.await;
207210

208-
notify_handler(Notification::DownloadFinished);
211+
notify_handler(Notification::DownloadFinished(url.as_str()));
209212

210213
res
211214
}

src/utils/mod.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -506,7 +506,7 @@ impl<'a> FileReaderWithProgress<'a> {
506506

507507
// Inform the tracker of the file size
508508
let flen = fh.metadata()?.len();
509-
(notify_handler)(Notification::DownloadContentLengthReceived(flen));
509+
(notify_handler)(Notification::DownloadContentLengthReceived(flen, ""));
510510

511511
let fh = BufReader::with_capacity(8 * 1024 * 1024, fh);
512512

@@ -525,10 +525,10 @@ impl io::Read for FileReaderWithProgress<'_> {
525525
Ok(nbytes) => {
526526
self.nbytes += nbytes as u64;
527527
if nbytes != 0 {
528-
(self.notify_handler)(Notification::DownloadDataReceived(&buf[0..nbytes]));
528+
(self.notify_handler)(Notification::DownloadDataReceived(&buf[0..nbytes], ""));
529529
}
530530
if (nbytes == 0) || (self.flen == self.nbytes) {
531-
(self.notify_handler)(Notification::DownloadFinished);
531+
(self.notify_handler)(Notification::DownloadFinished(""));
532532
}
533533
Ok(nbytes)
534534
}

src/utils/notifications.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@ pub enum Notification<'a> {
1414
RemovingDirectory(&'a str, &'a Path),
1515
DownloadingFile(&'a Url, &'a Path),
1616
/// Received the Content-Length of the to-be downloaded data.
17-
DownloadContentLengthReceived(u64),
17+
DownloadContentLengthReceived(u64, &'a str),
1818
/// Received some data.
19-
DownloadDataReceived(&'a [u8]),
19+
DownloadDataReceived(&'a [u8], &'a str),
2020
/// Download has finished.
21-
DownloadFinished,
21+
DownloadFinished(&'a str),
2222
/// The things we're tracking that are not counted in bytes.
2323
/// Must be paired with a pop-units; our other calls are not
2424
/// setup to guarantee this any better.
@@ -52,11 +52,11 @@ impl Notification<'_> {
5252
| LinkingDirectory(_, _)
5353
| CopyingDirectory(_, _)
5454
| DownloadingFile(_, _)
55-
| DownloadContentLengthReceived(_)
56-
| DownloadDataReceived(_)
55+
| DownloadContentLengthReceived(_, _)
56+
| DownloadDataReceived(_, _)
5757
| DownloadPushUnit(_)
5858
| DownloadPopUnit
59-
| DownloadFinished
59+
| DownloadFinished(_)
6060
| ResumingPartialDownload
6161
| UsingCurl
6262
| UsingReqwest => NotificationLevel::Debug,
@@ -92,11 +92,11 @@ impl Display for Notification<'_> {
9292
units::Size::new(*size, units::Unit::B)
9393
),
9494
DownloadingFile(url, _) => write!(f, "downloading file from: '{url}'"),
95-
DownloadContentLengthReceived(len) => write!(f, "download size is: '{len}'"),
96-
DownloadDataReceived(data) => write!(f, "received some data of size {}", data.len()),
95+
DownloadContentLengthReceived(len, _) => write!(f, "download size is: '{len}'"),
96+
DownloadDataReceived(data, _) => write!(f, "received some data of size {}", data.len()),
9797
DownloadPushUnit(_) => Ok(()),
9898
DownloadPopUnit => Ok(()),
99-
DownloadFinished => write!(f, "download finished"),
99+
DownloadFinished(_) => write!(f, "download finished"),
100100
NoCanonicalPath(path) => write!(f, "could not canonicalize path: '{}'", path.display()),
101101
ResumingPartialDownload => write!(f, "resuming partial download"),
102102
UsingCurl => write!(f, "downloading with curl"),

0 commit comments

Comments
 (0)