Skip to content

Concurrently download components of a toolchain #4436

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 60 additions & 23 deletions src/cli/download_tracker.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use indicatif::{MultiProgress, ProgressBar, ProgressDrawTarget, ProgressStyle};
use std::time::Duration;
use std::collections::HashMap;

use crate::dist::Notification as In;
use crate::notifications::Notification;
Expand All @@ -13,8 +13,8 @@ use crate::utils::Notification as Un;
pub(crate) struct DownloadTracker {
/// MultiProgress bar for the downloads.
multi_progress_bars: MultiProgress,
/// ProgressBar for the current download.
progress_bar: ProgressBar,
/// Mapping of files to their corresponding progress bars.
file_progress_bars: HashMap<String, ProgressBar>,
}

impl DownloadTracker {
Expand All @@ -28,22 +28,29 @@ impl DownloadTracker {

Self {
multi_progress_bars,
progress_bar: ProgressBar::hidden(),
file_progress_bars: HashMap::new(),
}
}

pub(crate) fn handle_notification(&mut self, n: &Notification<'_>) -> bool {
match *n {
Notification::Install(In::Utils(Un::DownloadContentLengthReceived(content_len))) => {
self.content_length_received(content_len);
Notification::Install(In::Utils(Un::DownloadContentLengthReceived(
content_len,
file,
))) => {
self.content_length_received(content_len, file);
true
}
Notification::Install(In::Utils(Un::DownloadDataReceived(data))) => {
self.data_received(data.len());
Notification::Install(In::Utils(Un::DownloadDataReceived(data, file))) => {
self.data_received(data.len(), file);
true
}
Notification::Install(In::Utils(Un::DownloadFinished)) => {
self.download_finished();
Notification::Install(In::Utils(Un::DownloadFinished(file))) => {
self.download_finished(file);
true
}
Notification::Install(In::DownloadingComponent(component, _, _)) => {
self.create_progress_bar(component);
true
}
Notification::Install(In::Utils(Un::DownloadPushUnit(_))) => true,
Expand All @@ -53,30 +60,60 @@ impl DownloadTracker {
}
}

/// Sets the length for a new ProgressBar and gives it a style.
pub(crate) fn content_length_received(&mut self, content_len: u64) {
self.progress_bar.set_length(content_len);
self.progress_bar.set_style(
/// Helper function to find the progress bar for a given file.
fn progress_bar(&mut self, file: &str) -> Option<&ProgressBar> {
// During the installation this function can be called with an empty file/URL.
if file.is_empty() {
return None;
}
Comment on lines +65 to +68
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does that happen? Seems pretty iffy.

let component = self
.file_progress_bars
.keys()
.find(|comp| file.contains(*comp))?;

self.file_progress_bars.get(component)
Comment on lines +69 to +74
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this elaborate setup? Are file values not the same as the components?

}

/// Creates a new ProgressBar for the given component.
pub(crate) fn create_progress_bar(&mut self, component: &str) {
let pb = ProgressBar::hidden();
pb.set_style(
ProgressStyle::with_template(
"[{bar:40}] {bytes}/{total_bytes} ({bytes_per_sec}, ETA: {eta})",
"{msg:>12.bold} [{bar:40}] {bytes}/{total_bytes} ({bytes_per_sec}, ETA: {eta})",
)
.unwrap()
.progress_chars("## "),
);
pb.set_message(component.to_string());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: .to_string() doesn't seem necessary here...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may be misreading the situation here, but I think we actually need the .to_string() here.

This is due to the fact that the set_message() expects Into<Cow<'static, str>>. and we would try to pass a &str with a non-'static lifetime.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO to_owned() more clearly expresses what you're trying to do (and might be slightly more efficient). In this case since create_progress_bar() needs two copies, suggest taking the argument as String and cloning it for use in set_message().

self.multi_progress_bars.add(pb.clone());
self.file_progress_bars.insert(component.to_string(), pb);
}

/// Sets the length for a new ProgressBar and gives it a style.
pub(crate) fn content_length_received(&mut self, content_len: u64, file: &str) {
if let Some(pb) = self.progress_bar(file) {
pb.set_length(content_len);
}
}

/// Notifies self that data of size `len` has been received.
pub(crate) fn data_received(&mut self, len: usize) {
if self.progress_bar.is_hidden() && self.progress_bar.elapsed() >= Duration::from_secs(1) {
self.multi_progress_bars.add(self.progress_bar.clone());
pub(crate) fn data_received(&mut self, len: usize, file: &str) {
if let Some(pb) = self.progress_bar(file) {
pb.inc(len as u64);
}
self.progress_bar.inc(len as u64);
}

/// Notifies self that the download has finished.
pub(crate) fn download_finished(&mut self) {
self.progress_bar.finish_and_clear();
self.multi_progress_bars.remove(&self.progress_bar);
self.progress_bar = ProgressBar::hidden();
pub(crate) fn download_finished(&mut self, file: &str) {
if let Some(pb) = self.progress_bar(file) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: let Some(pb) = .. else { return };.

pb.set_style(
ProgressStyle::with_template(
"{msg:>12.bold} downloaded {total_bytes} in {elapsed}",
)
.unwrap(),
);
let msg = pb.message();
pb.finish_with_message(msg);
Comment on lines +115 to +116
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems weird? Could instead probably use one of the other ProgressFinish modes.

}
}
}
5 changes: 4 additions & 1 deletion src/cli/self_update/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,10 @@ pub(crate) async fn try_install_msvc(
let download_tracker = Arc::new(Mutex::new(DownloadTracker::new_with_display_progress(
true, process,
)));
download_tracker.lock().unwrap().download_finished();
download_tracker
.lock()
.unwrap()
.download_finished(visual_studio_url.as_str());

info!("downloading Visual Studio installer");
download_file(
Expand Down
7 changes: 4 additions & 3 deletions src/diskio/threaded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,11 @@ impl Executor for Threaded<'_> {
// pretend to have bytes to deliver.
let mut prev_files = self.n_files.load(Ordering::Relaxed);
if let Some(handler) = self.notify_handler {
handler(Notification::DownloadFinished);
handler(Notification::DownloadFinished(""));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems strange. Why pass in an empty string?

handler(Notification::DownloadPushUnit(Unit::IO));
handler(Notification::DownloadContentLengthReceived(
prev_files as u64,
"",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. What does the empty string mean? Should the type be Option<_> instead?

));
}
if prev_files > 50 {
Expand All @@ -284,12 +285,12 @@ impl Executor for Threaded<'_> {
current_files = self.n_files.load(Ordering::Relaxed);
let step_count = prev_files - current_files;
if let Some(handler) = self.notify_handler {
handler(Notification::DownloadDataReceived(&buf[0..step_count]));
handler(Notification::DownloadDataReceived(&buf[0..step_count], ""));
}
}
self.pool.join();
if let Some(handler) = self.notify_handler {
handler(Notification::DownloadFinished);
handler(Notification::DownloadFinished(""));
handler(Notification::DownloadPopUnit);
}
// close the feedback channel so that blocking reads on it can
Expand Down
82 changes: 52 additions & 30 deletions src/dist/manifestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ mod tests;

use std::path::Path;

use anyhow::{Context, Result, anyhow, bail};
use anyhow::{Context, Error, Result, anyhow, bail};
use futures_util::stream::StreamExt;
use tokio_retry::{RetryIf, strategy::FixedInterval};
use tracing::info;

use crate::dist::component::{
Components, Package, TarGzPackage, TarXzPackage, TarZStdPackage, Transaction,
Expand Down Expand Up @@ -153,6 +155,7 @@ impl Manifestation {
let mut things_to_install: Vec<(Component, CompressionKind, File)> = Vec::new();
let mut things_downloaded: Vec<String> = Vec::new();
let components = update.components_urls_and_hashes(new_manifest)?;
let components_len = components.len();

const DEFAULT_MAX_RETRIES: usize = 3;
let max_retries: usize = download_cfg
Expand All @@ -162,41 +165,60 @@ impl Manifestation {
.and_then(|s| s.parse().ok())
.unwrap_or(DEFAULT_MAX_RETRIES);

for (component, format, url, hash) in components {
info!("downloading component(s)");
for (component, _, _, _) in components.clone() {
(download_cfg.notify_handler)(Notification::DownloadingComponent(
&component.short_name(new_manifest),
&self.target_triple,
component.target.as_ref(),
));
let url = if altered {
url.replace(DEFAULT_DIST_SERVER, tmp_cx.dist_server.as_str())
} else {
url
};

let url_url = utils::parse_url(&url)?;

let downloaded_file = RetryIf::spawn(
FixedInterval::from_millis(0).take(max_retries),
|| download_cfg.download(&url_url, &hash),
|e: &anyhow::Error| {
// retry only known retriable cases
match e.downcast_ref::<RustupError>() {
Some(RustupError::BrokenPartialFile)
| Some(RustupError::DownloadingFile { .. }) => {
(download_cfg.notify_handler)(Notification::RetryingDownload(&url));
true
}
_ => false,
}
},
)
.await
.with_context(|| RustupError::ComponentDownloadFailed(component.name(new_manifest)))?;

things_downloaded.push(hash);
}

things_to_install.push((component, format, downloaded_file));
let component_stream =
tokio_stream::iter(components.into_iter()).map(|(component, format, url, hash)| {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also have to consider the restriction of concurrent downloads similar to what we have done with threadpooled diskio.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This closure is complicated enough that I think it should be extracted into a separate function or struct.

async move {
let url = if altered {
url.replace(DEFAULT_DIST_SERVER, tmp_cx.dist_server.as_str())
} else {
url
};

let url_url = utils::parse_url(&url)?;

let downloaded_file = RetryIf::spawn(
FixedInterval::from_millis(0).take(max_retries),
|| download_cfg.download(&url_url, &hash),
|e: &anyhow::Error| {
// retry only known retriable cases
match e.downcast_ref::<RustupError>() {
Some(RustupError::BrokenPartialFile)
| Some(RustupError::DownloadingFile { .. }) => {
(download_cfg.notify_handler)(Notification::RetryingDownload(
&url,
));
true
}
_ => false,
}
},
)
.await
.with_context(|| {
RustupError::ComponentDownloadFailed(component.name(new_manifest))
})?;
Ok::<_, Error>((component, format, downloaded_file, hash))
}
});
if components_len > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is the only use of components_len, suggest storing a bool instead.

let results = component_stream
.buffered(components_len)
.collect::<Vec<_>>()
.await;
for result in results {
let (component, format, downloaded_file, hash) = result?;
things_downloaded.push(hash);
things_to_install.push((component, format, downloaded_file));
}
}

// Begin transaction
Expand Down
9 changes: 6 additions & 3 deletions src/download/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,13 @@ async fn download_file_(

match msg {
Event::DownloadContentLengthReceived(len) => {
notify_handler(Notification::DownloadContentLengthReceived(len));
notify_handler(Notification::DownloadContentLengthReceived(
len,
url.as_str(),
));
}
Event::DownloadDataReceived(data) => {
notify_handler(Notification::DownloadDataReceived(data));
notify_handler(Notification::DownloadDataReceived(data, url.as_str()));
}
Event::ResumingPartialDownload => {
notify_handler(Notification::ResumingPartialDownload);
Expand Down Expand Up @@ -205,7 +208,7 @@ async fn download_file_(
.download_to_path(url, path, resume_from_partial, Some(callback))
.await;

notify_handler(Notification::DownloadFinished);
notify_handler(Notification::DownloadFinished(url.as_str()));

res
}
Expand Down
6 changes: 3 additions & 3 deletions src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ impl<'a> FileReaderWithProgress<'a> {

// Inform the tracker of the file size
let flen = fh.metadata()?.len();
(notify_handler)(Notification::DownloadContentLengthReceived(flen));
(notify_handler)(Notification::DownloadContentLengthReceived(flen, ""));

let fh = BufReader::with_capacity(8 * 1024 * 1024, fh);

Expand All @@ -525,10 +525,10 @@ impl io::Read for FileReaderWithProgress<'_> {
Ok(nbytes) => {
self.nbytes += nbytes as u64;
if nbytes != 0 {
(self.notify_handler)(Notification::DownloadDataReceived(&buf[0..nbytes]));
(self.notify_handler)(Notification::DownloadDataReceived(&buf[0..nbytes], ""));
}
if (nbytes == 0) || (self.flen == self.nbytes) {
(self.notify_handler)(Notification::DownloadFinished);
(self.notify_handler)(Notification::DownloadFinished(""));
}
Ok(nbytes)
}
Expand Down
21 changes: 11 additions & 10 deletions src/utils/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ pub enum Notification<'a> {
CopyingDirectory(&'a Path, &'a Path),
RemovingDirectory(&'a str, &'a Path),
DownloadingFile(&'a Url, &'a Path),
/// Received the Content-Length of the to-be downloaded data.
DownloadContentLengthReceived(u64),
/// Received the Content-Length of the to-be downloaded data with
/// the respective URL of the download (for tracking concurrent downloads).
DownloadContentLengthReceived(u64, &'a str),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind explaining the meaning of "" here and why they are required? It might be an interesting addition to the doc comments as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are downloading the components concurrently, we can’t know which download the Content-Length refers to. Therefore, in addition to receiving the length itself, we also need the component’s URL so we can match it to the corresponding progress bar.

Note: I've already added a doc comment. Thank you!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before resolving this conversation, I will investigate the correct use of this notification and the addition of this extra parameter to ensure that, even though the installations are not concurrent, it did not have any negative effect on them (which, unfortunately, cannot be guaranteed by the test suite).

/// Received some data.
DownloadDataReceived(&'a [u8]),
DownloadDataReceived(&'a [u8], &'a str),
/// Download has finished.
DownloadFinished,
DownloadFinished(&'a str),
/// The things we're tracking that are not counted in bytes.
/// Must be paired with a pop-units; our other calls are not
/// setup to guarantee this any better.
Expand Down Expand Up @@ -52,11 +53,11 @@ impl Notification<'_> {
| LinkingDirectory(_, _)
| CopyingDirectory(_, _)
| DownloadingFile(_, _)
| DownloadContentLengthReceived(_)
| DownloadDataReceived(_)
| DownloadContentLengthReceived(_, _)
| DownloadDataReceived(_, _)
| DownloadPushUnit(_)
| DownloadPopUnit
| DownloadFinished
| DownloadFinished(_)
| ResumingPartialDownload
| UsingCurl
| UsingReqwest => NotificationLevel::Debug,
Expand Down Expand Up @@ -92,11 +93,11 @@ impl Display for Notification<'_> {
units::Size::new(*size, units::Unit::B)
),
DownloadingFile(url, _) => write!(f, "downloading file from: '{url}'"),
DownloadContentLengthReceived(len) => write!(f, "download size is: '{len}'"),
DownloadDataReceived(data) => write!(f, "received some data of size {}", data.len()),
DownloadContentLengthReceived(len, _) => write!(f, "download size is: '{len}'"),
DownloadDataReceived(data, _) => write!(f, "received some data of size {}", data.len()),
DownloadPushUnit(_) => Ok(()),
DownloadPopUnit => Ok(()),
DownloadFinished => write!(f, "download finished"),
DownloadFinished(_) => write!(f, "download finished"),
NoCanonicalPath(path) => write!(f, "could not canonicalize path: '{}'", path.display()),
ResumingPartialDownload => write!(f, "resuming partial download"),
UsingCurl => write!(f, "downloading with curl"),
Expand Down
Loading