diff --git a/src/cli/download_tracker.rs b/src/cli/download_tracker.rs index 2ac4cca7e7..b2169f718a 100644 --- a/src/cli/download_tracker.rs +++ b/src/cli/download_tracker.rs @@ -1,5 +1,5 @@ use indicatif::{MultiProgress, ProgressBar, ProgressDrawTarget, ProgressStyle}; -use std::time::Duration; +use std::collections::HashMap; use crate::dist::Notification as In; use crate::notifications::Notification; @@ -13,8 +13,8 @@ use crate::utils::Notification as Un; pub(crate) struct DownloadTracker { /// MultiProgress bar for the downloads. multi_progress_bars: MultiProgress, - /// ProgressBar for the current download. - progress_bar: ProgressBar, + /// Mapping of URLs being downloaded to their corresponding progress bars. + file_progress_bars: HashMap, } impl DownloadTracker { @@ -28,22 +28,29 @@ impl DownloadTracker { Self { multi_progress_bars, - progress_bar: ProgressBar::hidden(), + file_progress_bars: HashMap::new(), } } pub(crate) fn handle_notification(&mut self, n: &Notification<'_>) -> bool { match *n { - Notification::Install(In::Utils(Un::DownloadContentLengthReceived(content_len))) => { - self.content_length_received(content_len); + Notification::Install(In::Utils(Un::DownloadContentLengthReceived( + content_len, + url, + ))) => { + self.content_length_received(content_len, url); true } - Notification::Install(In::Utils(Un::DownloadDataReceived(data))) => { - self.data_received(data.len()); + Notification::Install(In::Utils(Un::DownloadDataReceived(data, url))) => { + self.data_received(data.len(), url); true } - Notification::Install(In::Utils(Un::DownloadFinished)) => { - self.download_finished(); + Notification::Install(In::Utils(Un::DownloadFinished(url))) => { + self.download_finished(url); + true + } + Notification::Install(In::DownloadingComponent(component, _, _, url)) => { + self.create_progress_bar(component, url); true } Notification::Install(In::Utils(Un::DownloadPushUnit(_))) => true, @@ -53,30 +60,44 @@ impl DownloadTracker { } } - /// Sets the length for a new ProgressBar and gives it a style. - pub(crate) fn content_length_received(&mut self, content_len: u64) { - self.progress_bar.set_length(content_len); - self.progress_bar.set_style( + /// Creates a new ProgressBar for the given component. + pub(crate) fn create_progress_bar(&mut self, component: &str, url: &str) { + let pb = ProgressBar::hidden(); + pb.set_style( ProgressStyle::with_template( - "[{bar:40}] {bytes}/{total_bytes} ({bytes_per_sec}, ETA: {eta})", + "{msg:>12.bold} [{bar:40}] {bytes}/{total_bytes} ({bytes_per_sec}, ETA: {eta})", ) .unwrap() .progress_chars("## "), ); + pb.set_message(component.to_string()); + self.multi_progress_bars.add(pb.clone()); + self.file_progress_bars.insert(url.to_string(), pb); + } + + /// Sets the length for a new ProgressBar and gives it a style. + pub(crate) fn content_length_received(&mut self, content_len: u64, url: &str) { + if let Some(pb) = self.file_progress_bars.get(url) { + pb.set_length(content_len); + } } /// Notifies self that data of size `len` has been received. - pub(crate) fn data_received(&mut self, len: usize) { - if self.progress_bar.is_hidden() && self.progress_bar.elapsed() >= Duration::from_secs(1) { - self.multi_progress_bars.add(self.progress_bar.clone()); + pub(crate) fn data_received(&mut self, len: usize, url: &str) { + if let Some(pb) = self.file_progress_bars.get(url) { + pb.inc(len as u64); } - self.progress_bar.inc(len as u64); } /// Notifies self that the download has finished. - pub(crate) fn download_finished(&mut self) { - self.progress_bar.finish_and_clear(); - self.multi_progress_bars.remove(&self.progress_bar); - self.progress_bar = ProgressBar::hidden(); + pub(crate) fn download_finished(&mut self, url: &str) { + let Some(pb) = self.file_progress_bars.get(url) else { + return; + }; + pb.set_style( + ProgressStyle::with_template("{msg:>12.bold} downloaded {total_bytes} in {elapsed}") + .unwrap(), + ); + pb.finish(); } } diff --git a/src/cli/self_update/windows.rs b/src/cli/self_update/windows.rs index 24caab6a38..0812fa3917 100644 --- a/src/cli/self_update/windows.rs +++ b/src/cli/self_update/windows.rs @@ -274,7 +274,10 @@ pub(crate) async fn try_install_msvc( let download_tracker = Arc::new(Mutex::new(DownloadTracker::new_with_display_progress( true, process, ))); - download_tracker.lock().unwrap().download_finished(); + download_tracker + .lock() + .unwrap() + .download_finished(visual_studio_url.as_str()); info!("downloading Visual Studio installer"); download_file( diff --git a/src/diskio/threaded.rs b/src/diskio/threaded.rs index f5096b30bb..f10134df7e 100644 --- a/src/diskio/threaded.rs +++ b/src/diskio/threaded.rs @@ -263,10 +263,11 @@ impl Executor for Threaded<'_> { // pretend to have bytes to deliver. let mut prev_files = self.n_files.load(Ordering::Relaxed); if let Some(handler) = self.notify_handler { - handler(Notification::DownloadFinished); + handler(Notification::DownloadFinished("")); handler(Notification::DownloadPushUnit(Unit::IO)); handler(Notification::DownloadContentLengthReceived( prev_files as u64, + "", )); } if prev_files > 50 { @@ -284,12 +285,12 @@ impl Executor for Threaded<'_> { current_files = self.n_files.load(Ordering::Relaxed); let step_count = prev_files - current_files; if let Some(handler) = self.notify_handler { - handler(Notification::DownloadDataReceived(&buf[0..step_count])); + handler(Notification::DownloadDataReceived(&buf[0..step_count], "")); } } self.pool.join(); if let Some(handler) = self.notify_handler { - handler(Notification::DownloadFinished); + handler(Notification::DownloadFinished("")); handler(Notification::DownloadPopUnit); } // close the feedback channel so that blocking reads on it can diff --git a/src/dist/manifestation.rs b/src/dist/manifestation.rs index 210f8e98ac..e27ef9157f 100644 --- a/src/dist/manifestation.rs +++ b/src/dist/manifestation.rs @@ -6,8 +6,10 @@ mod tests; use std::path::Path; -use anyhow::{Context, Result, anyhow, bail}; +use anyhow::{Context, Error, Result, anyhow, bail}; +use futures_util::stream::StreamExt; use tokio_retry::{RetryIf, strategy::FixedInterval}; +use tracing::info; use crate::dist::component::{ Components, Package, TarGzPackage, TarXzPackage, TarZStdPackage, Transaction, @@ -153,6 +155,7 @@ impl Manifestation { let mut things_to_install: Vec<(Component, CompressionKind, File)> = Vec::new(); let mut things_downloaded: Vec = Vec::new(); let components = update.components_urls_and_hashes(new_manifest)?; + let components_len = components.len(); const DEFAULT_MAX_RETRIES: usize = 3; let max_retries: usize = download_cfg @@ -162,41 +165,61 @@ impl Manifestation { .and_then(|s| s.parse().ok()) .unwrap_or(DEFAULT_MAX_RETRIES); - for (component, format, url, hash) in components { + info!("downloading component(s)"); + for (component, _, url, _) in components.clone() { (download_cfg.notify_handler)(Notification::DownloadingComponent( &component.short_name(new_manifest), &self.target_triple, component.target.as_ref(), + &url, )); - let url = if altered { - url.replace(DEFAULT_DIST_SERVER, tmp_cx.dist_server.as_str()) - } else { - url - }; - - let url_url = utils::parse_url(&url)?; - - let downloaded_file = RetryIf::spawn( - FixedInterval::from_millis(0).take(max_retries), - || download_cfg.download(&url_url, &hash), - |e: &anyhow::Error| { - // retry only known retriable cases - match e.downcast_ref::() { - Some(RustupError::BrokenPartialFile) - | Some(RustupError::DownloadingFile { .. }) => { - (download_cfg.notify_handler)(Notification::RetryingDownload(&url)); - true - } - _ => false, - } - }, - ) - .await - .with_context(|| RustupError::ComponentDownloadFailed(component.name(new_manifest)))?; - - things_downloaded.push(hash); + } - things_to_install.push((component, format, downloaded_file)); + let component_stream = + tokio_stream::iter(components.into_iter()).map(|(component, format, url, hash)| { + async move { + let url = if altered { + url.replace(DEFAULT_DIST_SERVER, tmp_cx.dist_server.as_str()) + } else { + url + }; + + let url_url = utils::parse_url(&url)?; + + let downloaded_file = RetryIf::spawn( + FixedInterval::from_millis(0).take(max_retries), + || download_cfg.download(&url_url, &hash), + |e: &anyhow::Error| { + // retry only known retriable cases + match e.downcast_ref::() { + Some(RustupError::BrokenPartialFile) + | Some(RustupError::DownloadingFile { .. }) => { + (download_cfg.notify_handler)(Notification::RetryingDownload( + &url, + )); + true + } + _ => false, + } + }, + ) + .await + .with_context(|| { + RustupError::ComponentDownloadFailed(component.name(new_manifest)) + })?; + Ok::<_, Error>((component, format, downloaded_file, hash)) + } + }); + if components_len > 0 { + let results = component_stream + .buffered(components_len) + .collect::>() + .await; + for result in results { + let (component, format, downloaded_file, hash) = result?; + things_downloaded.push(hash); + things_to_install.push((component, format, downloaded_file)); + } } // Begin transaction @@ -452,6 +475,7 @@ impl Manifestation { "rust", &self.target_triple, Some(&self.target_triple), + &url, )); use std::path::PathBuf; diff --git a/src/dist/notifications.rs b/src/dist/notifications.rs index bd7ec6e422..3ba0471524 100644 --- a/src/dist/notifications.rs +++ b/src/dist/notifications.rs @@ -23,7 +23,7 @@ pub enum Notification<'a> { ExtensionNotInstalled(&'a str), NonFatalError(&'a anyhow::Error), MissingInstalledComponent(&'a str), - DownloadingComponent(&'a str, &'a TargetTriple, Option<&'a TargetTriple>), + DownloadingComponent(&'a str, &'a TargetTriple, Option<&'a TargetTriple>, &'a str), InstallingComponent(&'a str, &'a TargetTriple, Option<&'a TargetTriple>), RemovingComponent(&'a str, &'a TargetTriple, Option<&'a TargetTriple>), RemovingOldComponent(&'a str, &'a TargetTriple, Option<&'a TargetTriple>), @@ -61,7 +61,7 @@ impl Notification<'_> { | FileAlreadyDownloaded | DownloadingLegacyManifest => NotificationLevel::Debug, Extracting(_, _) - | DownloadingComponent(_, _, _) + | DownloadingComponent(_, _, _, _) | InstallingComponent(_, _, _) | RemovingComponent(_, _, _) | RemovingOldComponent(_, _, _) @@ -107,7 +107,7 @@ impl Display for Notification<'_> { MissingInstalledComponent(c) => { write!(f, "during uninstall component {c} was not found") } - DownloadingComponent(c, h, t) => { + DownloadingComponent(c, h, t, _) => { if Some(h) == t.as_ref() || t.is_none() { write!(f, "downloading component '{c}'") } else { diff --git a/src/download/mod.rs b/src/download/mod.rs index 348c8fbc39..a4af18cc6d 100644 --- a/src/download/mod.rs +++ b/src/download/mod.rs @@ -108,10 +108,13 @@ async fn download_file_( match msg { Event::DownloadContentLengthReceived(len) => { - notify_handler(Notification::DownloadContentLengthReceived(len)); + notify_handler(Notification::DownloadContentLengthReceived( + len, + url.as_str(), + )); } Event::DownloadDataReceived(data) => { - notify_handler(Notification::DownloadDataReceived(data)); + notify_handler(Notification::DownloadDataReceived(data, url.as_str())); } Event::ResumingPartialDownload => { notify_handler(Notification::ResumingPartialDownload); @@ -205,7 +208,7 @@ async fn download_file_( .download_to_path(url, path, resume_from_partial, Some(callback)) .await; - notify_handler(Notification::DownloadFinished); + notify_handler(Notification::DownloadFinished(url.as_str())); res } diff --git a/src/utils/mod.rs b/src/utils/mod.rs index b88c5ee49b..11a0ab0cf4 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -506,7 +506,7 @@ impl<'a> FileReaderWithProgress<'a> { // Inform the tracker of the file size let flen = fh.metadata()?.len(); - (notify_handler)(Notification::DownloadContentLengthReceived(flen)); + (notify_handler)(Notification::DownloadContentLengthReceived(flen, "")); let fh = BufReader::with_capacity(8 * 1024 * 1024, fh); @@ -525,10 +525,10 @@ impl io::Read for FileReaderWithProgress<'_> { Ok(nbytes) => { self.nbytes += nbytes as u64; if nbytes != 0 { - (self.notify_handler)(Notification::DownloadDataReceived(&buf[0..nbytes])); + (self.notify_handler)(Notification::DownloadDataReceived(&buf[0..nbytes], "")); } if (nbytes == 0) || (self.flen == self.nbytes) { - (self.notify_handler)(Notification::DownloadFinished); + (self.notify_handler)(Notification::DownloadFinished("")); } Ok(nbytes) } diff --git a/src/utils/notifications.rs b/src/utils/notifications.rs index 9ea4917277..b10cf2a4ff 100644 --- a/src/utils/notifications.rs +++ b/src/utils/notifications.rs @@ -13,12 +13,13 @@ pub enum Notification<'a> { CopyingDirectory(&'a Path, &'a Path), RemovingDirectory(&'a str, &'a Path), DownloadingFile(&'a Url, &'a Path), - /// Received the Content-Length of the to-be downloaded data. - DownloadContentLengthReceived(u64), + /// Received the Content-Length of the to-be downloaded data with + /// the respective URL of the download (for tracking concurrent downloads). + DownloadContentLengthReceived(u64, &'a str), /// Received some data. - DownloadDataReceived(&'a [u8]), + DownloadDataReceived(&'a [u8], &'a str), /// Download has finished. - DownloadFinished, + DownloadFinished(&'a str), /// The things we're tracking that are not counted in bytes. /// Must be paired with a pop-units; our other calls are not /// setup to guarantee this any better. @@ -52,11 +53,11 @@ impl Notification<'_> { | LinkingDirectory(_, _) | CopyingDirectory(_, _) | DownloadingFile(_, _) - | DownloadContentLengthReceived(_) - | DownloadDataReceived(_) + | DownloadContentLengthReceived(_, _) + | DownloadDataReceived(_, _) | DownloadPushUnit(_) | DownloadPopUnit - | DownloadFinished + | DownloadFinished(_) | ResumingPartialDownload | UsingCurl | UsingReqwest => NotificationLevel::Debug, @@ -92,11 +93,11 @@ impl Display for Notification<'_> { units::Size::new(*size, units::Unit::B) ), DownloadingFile(url, _) => write!(f, "downloading file from: '{url}'"), - DownloadContentLengthReceived(len) => write!(f, "download size is: '{len}'"), - DownloadDataReceived(data) => write!(f, "received some data of size {}", data.len()), + DownloadContentLengthReceived(len, _) => write!(f, "download size is: '{len}'"), + DownloadDataReceived(data, _) => write!(f, "received some data of size {}", data.len()), DownloadPushUnit(_) => Ok(()), DownloadPopUnit => Ok(()), - DownloadFinished => write!(f, "download finished"), + DownloadFinished(_) => write!(f, "download finished"), NoCanonicalPath(path) => write!(f, "could not canonicalize path: '{}'", path.display()), ResumingPartialDownload => write!(f, "resuming partial download"), UsingCurl => write!(f, "downloading with curl"),