diff --git a/src/cli/common.rs b/src/cli/common.rs index 7d9908cb0e..79025d62b6 100644 --- a/src/cli/common.rs +++ b/src/cli/common.rs @@ -1,11 +1,10 @@ //! Just a dumping ground for cli stuff -use std::cell::RefCell; use std::fmt::Display; use std::fs; use std::io::{BufRead, Write}; use std::path::{Path, PathBuf}; -use std::sync::{Arc, LazyLock, Mutex}; +use std::sync::{Arc, LazyLock, Mutex, RwLock}; use std::{cmp, env}; use anyhow::{Context, Result, anyhow}; @@ -123,14 +122,14 @@ pub(crate) fn read_line(process: &Process) -> Result { pub(super) struct Notifier { tracker: Mutex, - ram_notice_shown: RefCell, + ram_notice_shown: RwLock, } impl Notifier { pub(super) fn new(quiet: bool, process: &Process) -> Self { Self { tracker: Mutex::new(DownloadTracker::new_with_display_progress(!quiet, process)), - ram_notice_shown: RefCell::new(false), + ram_notice_shown: RwLock::new(false), } } @@ -140,10 +139,10 @@ impl Notifier { } if let Notification::SetDefaultBufferSize(_) = &n { - if *self.ram_notice_shown.borrow() { + if *self.ram_notice_shown.read().unwrap() { return; } else { - *self.ram_notice_shown.borrow_mut() = true; + *self.ram_notice_shown.write().unwrap() = true; } }; let level = n.level(); diff --git a/src/cli/download_tracker.rs b/src/cli/download_tracker.rs index a2105dd048..5a816a4506 100644 --- a/src/cli/download_tracker.rs +++ b/src/cli/download_tracker.rs @@ -68,6 +68,14 @@ impl DownloadTracker { self.retrying_download(url); true } + Notification::InstallingComponent(component, _, _) => { + self.installing_component(component); + true + } + Notification::ComponentInstalled(component, _, _) => { + self.component_installed(component); + true + } _ => false, } } @@ -120,10 +128,13 @@ impl DownloadTracker { return; }; pb.set_style( - ProgressStyle::with_template("{msg:>12.bold} downloaded {total_bytes} in {elapsed}") - .unwrap(), + ProgressStyle::with_template(if pb.position() != 0 { + "{msg:>12.bold} downloaded {total_bytes} in {elapsed}" + } else { + "{msg:>12.bold} component already downloaded" + }) + .unwrap(), ); - pb.finish(); } /// Notifies self that the download has failed. @@ -146,4 +157,50 @@ impl DownloadTracker { *retry_time = Some(Instant::now()); pb.set_style(ProgressStyle::with_template("{msg:>12.bold} retrying download").unwrap()); } + + /// Notifies self that the component is being installed. + pub(crate) fn installing_component(&mut self, component: &str) { + let key = self + .file_progress_bars + .keys() + .find(|comp| comp.contains(component)) + .cloned(); + if let Some(key) = key + && let Some((pb, _)) = self.file_progress_bars.get(&key) + { + pb.set_style( + ProgressStyle::with_template( if pb.position() != 0 { + "{msg:>12.bold} downloaded {total_bytes} in {elapsed} and installing {spinner:.green}" + } else { + "{msg:>12.bold} component already downloaded and installing {spinner:.green}" + } + ) + .unwrap() + .tick_chars(r"|/-\ "), + ); + pb.enable_steady_tick(Duration::from_millis(100)); + } + } + + /// Notifies self that the component has been installed. + pub(crate) fn component_installed(&mut self, component: &str) { + let key = self + .file_progress_bars + .keys() + .find(|comp| comp.contains(component)) + .cloned(); + if let Some(key) = key + && let Some((pb, _)) = self.file_progress_bars.get(&key) + { + pb.set_style( + ProgressStyle::with_template(if pb.position() != 0 { + "{msg:>12.bold} downloaded {total_bytes} and installed" + } else { + "{msg:>12.bold} component already downloaded and installed" + }) + .unwrap(), + ); + pb.finish(); + } + } } diff --git a/src/config.rs b/src/config.rs index fbb173d0cb..f667ccbfcc 100644 --- a/src/config.rs +++ b/src/config.rs @@ -229,11 +229,11 @@ pub(crate) struct Cfg<'a> { pub toolchains_dir: PathBuf, pub update_hash_dir: PathBuf, pub download_dir: PathBuf, - pub tmp_cx: temp::Context, + pub tmp_cx: Arc, pub toolchain_override: Option, pub env_override: Option, pub dist_root_url: String, - pub notify_handler: Arc)>, + pub notify_handler: Arc, pub current_dir: PathBuf, pub process: &'a Process, } @@ -241,7 +241,7 @@ pub(crate) struct Cfg<'a> { impl<'a> Cfg<'a> { pub(crate) fn from_env( current_dir: PathBuf, - notify_handler: Arc)>, + notify_handler: Arc, process: &'a Process, ) -> Result { // Set up the rustup home directory @@ -292,11 +292,11 @@ impl<'a> Cfg<'a> { let dist_root_server = dist_root_server(process)?; let notify_clone = notify_handler.clone(); - let tmp_cx = temp::Context::new( + let tmp_cx = Arc::new(temp::Context::new( rustup_dir.join("tmp"), dist_root_server.as_str(), - Box::new(move |n| (notify_clone)(n)), - ); + Arc::new(move |n| (notify_clone)(n)), + )); let dist_root = dist_root_server + "/dist"; let cfg = Self { @@ -328,16 +328,13 @@ impl<'a> Cfg<'a> { } /// construct a download configuration - pub(crate) fn download_cfg( - &'a self, - notify_handler: &'a dyn Fn(Notification<'_>), - ) -> DownloadCfg<'a> { + pub(crate) fn download_cfg(&self, notify_handler: Arc) -> DownloadCfg { DownloadCfg { - dist_root: &self.dist_root_url, - tmp_cx: &self.tmp_cx, - download_dir: &self.download_dir, + dist_root: Arc::from(self.dist_root_url.clone()), + tmp_cx: Arc::clone(&self.tmp_cx), + download_dir: Arc::new(self.download_dir.clone()), notify_handler, - process: self.process, + process: Arc::new(self.process.clone()), } } diff --git a/src/dist/component/components.rs b/src/dist/component/components.rs index db9a7805c1..88e8491cec 100644 --- a/src/dist/component/components.rs +++ b/src/dist/component/components.rs @@ -55,7 +55,7 @@ impl Components { Ok(None) } } - fn write_version(&self, tx: &mut Transaction<'_>) -> Result<()> { + fn write_version(&self, tx: &mut Transaction) -> Result<()> { tx.modify_file(self.prefix.rel_manifest_file(VERSION_FILE))?; utils::write_file( VERSION_FILE, @@ -79,7 +79,7 @@ impl Components { }) .collect()) } - pub(crate) fn add<'a>(&self, name: &str, tx: Transaction<'a>) -> ComponentBuilder<'a> { + pub(crate) fn add(&self, name: &str, tx: Transaction) -> ComponentBuilder { ComponentBuilder { components: self.clone(), name: name.to_owned(), @@ -96,14 +96,14 @@ impl Components { } } -pub(crate) struct ComponentBuilder<'a> { +pub(crate) struct ComponentBuilder { components: Components, name: String, parts: Vec, - tx: Transaction<'a>, + tx: Transaction, } -impl<'a> ComponentBuilder<'a> { +impl ComponentBuilder { pub(crate) fn copy_file(&mut self, path: PathBuf, src: &Path) -> Result<()> { self.parts.push(ComponentPart { kind: ComponentPartKind::File, @@ -132,7 +132,7 @@ impl<'a> ComponentBuilder<'a> { }); self.tx.move_dir(&self.name, path, src) } - pub(crate) fn finish(mut self) -> Result> { + pub(crate) fn finish(mut self) -> Result { // Write component manifest let path = self.components.rel_component_manifest(&self.name); let abs_path = self.components.prefix.abs_path(&path); @@ -255,18 +255,20 @@ impl Component { } Ok(result) } - pub fn uninstall<'a>( - &self, - mut tx: Transaction<'a>, - process: &Process, - ) -> Result> { + pub fn uninstall(&self, mut tx: Transaction, process: &Process) -> Result { // Update components file let path = self.components.rel_components_file(); let abs_path = self.components.prefix.abs_path(&path); let temp = tx.temp().new_file()?; utils::filter_file("components", &abs_path, &temp, |l| l != self.name)?; tx.modify_file(path)?; - utils::rename("components", &temp, &abs_path, tx.notify_handler(), process)?; + utils::rename( + "components", + &temp, + &abs_path, + &*tx.notify_handler(), + process, + )?; // TODO: If this is the last component remove the components file // and the version file. diff --git a/src/dist/component/package.rs b/src/dist/component/package.rs index c89dbe1ccc..2edf096b63 100644 --- a/src/dist/component/package.rs +++ b/src/dist/component/package.rs @@ -7,6 +7,7 @@ use std::fmt; use std::io::{self, ErrorKind as IOErrorKind, Read}; use std::mem; use std::path::{Path, PathBuf}; +use std::sync::Arc; use anyhow::{Context, Result, anyhow, bail}; use tar::EntryType; @@ -27,13 +28,13 @@ pub(crate) const VERSION_FILE: &str = "rust-installer-version"; pub trait Package: fmt::Debug { fn contains(&self, component: &str, short_name: Option<&str>) -> bool; - fn install<'a>( + fn install( &self, target: &Components, component: &str, short_name: Option<&str>, - tx: Transaction<'a>, - ) -> Result>; + tx: Transaction, + ) -> Result; fn components(&self) -> Vec; } @@ -80,13 +81,13 @@ impl Package for DirectoryPackage { false } } - fn install<'a>( + fn install( &self, target: &Components, name: &str, short_name: Option<&str>, - tx: Transaction<'a>, - ) -> Result> { + tx: Transaction, + ) -> Result { let actual_name = if self.components.contains(name) { name } else if let Some(n) = short_name { @@ -138,11 +139,12 @@ impl Package for DirectoryPackage { #[derive(Debug)] #[allow(dead_code)] // temp::Dir is held for drop. -pub(crate) struct TarPackage<'a>(DirectoryPackage, temp::Dir<'a>); +pub(crate) struct TarPackage(DirectoryPackage, temp::Dir); -impl<'a> TarPackage<'a> { - pub(crate) fn new(stream: R, cx: &PackageContext<'a>) -> Result { - let temp_dir = cx.tmp_cx.new_directory()?; +impl TarPackage { + pub(crate) fn new(stream: R, cx: &PackageContext) -> Result { + let ctx = cx.tmp_cx.clone(); + let temp_dir = ctx.new_directory()?; let mut archive = tar::Archive::new(stream); // The rust-installer packages unpack to a directory called // $pkgname-$version-$target. Skip that directory when @@ -161,7 +163,7 @@ impl<'a> TarPackage<'a> { fn unpack_ram( io_chunk_size: usize, effective_max_ram: Option, - cx: &PackageContext<'_>, + cx: &PackageContext, ) -> usize { const RAM_ALLOWANCE_FOR_RUSTUP_AND_BUFFERS: usize = 200 * 1024 * 1024; let minimum_ram = io_chunk_size * 2; @@ -199,7 +201,7 @@ fn unpack_ram( } } None => { - if let Some(h) = cx.notify_handler { + if let Some(h) = &cx.notify_handler { h(Notification::SetDefaultBufferSize(default_max_unpack_ram)) } default_max_unpack_ram @@ -285,21 +287,21 @@ enum DirStatus { fn unpack_without_first_dir( archive: &mut tar::Archive, path: &Path, - cx: &PackageContext<'_>, + cx: &PackageContext, ) -> Result<()> { let entries = archive.entries()?; let effective_max_ram = match effective_limits::memory_limit() { Ok(ram) => Some(ram as usize), Err(e) => { - if let Some(h) = cx.notify_handler { + if let Some(h) = &cx.notify_handler { h(Notification::Error(e.to_string())) } None } }; let unpack_ram = unpack_ram(IO_CHUNK_SIZE, effective_max_ram, cx); - let mut io_executor: Box = - get_executor(cx.notify_handler, unpack_ram, cx.process)?; + let handler_ref = cx.notify_handler.as_ref().map(|h| h.as_ref()); + let mut io_executor: Box = get_executor(handler_ref, unpack_ram, &cx.process)?; let mut directories: HashMap = HashMap::new(); // Path is presumed to exist. Call it a precondition. @@ -528,17 +530,17 @@ fn unpack_without_first_dir( Ok(()) } -impl Package for TarPackage<'_> { +impl Package for TarPackage { fn contains(&self, component: &str, short_name: Option<&str>) -> bool { self.0.contains(component, short_name) } - fn install<'b>( + fn install( &self, target: &Components, component: &str, short_name: Option<&str>, - tx: Transaction<'b>, - ) -> Result> { + tx: Transaction, + ) -> Result { self.0.install(target, component, short_name, tx) } fn components(&self) -> Vec { @@ -547,26 +549,26 @@ impl Package for TarPackage<'_> { } #[derive(Debug)] -pub(crate) struct TarGzPackage<'a>(TarPackage<'a>); +pub(crate) struct TarGzPackage(TarPackage); -impl<'a> TarGzPackage<'a> { - pub(crate) fn new(stream: R, cx: &PackageContext<'a>) -> Result { +impl TarGzPackage { + pub(crate) fn new(stream: R, cx: &PackageContext) -> Result { let stream = flate2::read::GzDecoder::new(stream); Ok(TarGzPackage(TarPackage::new(stream, cx)?)) } } -impl Package for TarGzPackage<'_> { +impl Package for TarGzPackage { fn contains(&self, component: &str, short_name: Option<&str>) -> bool { self.0.contains(component, short_name) } - fn install<'b>( + fn install( &self, target: &Components, component: &str, short_name: Option<&str>, - tx: Transaction<'b>, - ) -> Result> { + tx: Transaction, + ) -> Result { self.0.install(target, component, short_name, tx) } fn components(&self) -> Vec { @@ -575,26 +577,26 @@ impl Package for TarGzPackage<'_> { } #[derive(Debug)] -pub(crate) struct TarXzPackage<'a>(TarPackage<'a>); +pub(crate) struct TarXzPackage(TarPackage); -impl<'a> TarXzPackage<'a> { - pub(crate) fn new(stream: R, cx: &PackageContext<'a>) -> Result { +impl TarXzPackage { + pub(crate) fn new(stream: R, cx: &PackageContext) -> Result { let stream = xz2::read::XzDecoder::new(stream); Ok(TarXzPackage(TarPackage::new(stream, cx)?)) } } -impl Package for TarXzPackage<'_> { +impl Package for TarXzPackage { fn contains(&self, component: &str, short_name: Option<&str>) -> bool { self.0.contains(component, short_name) } - fn install<'b>( + fn install( &self, target: &Components, component: &str, short_name: Option<&str>, - tx: Transaction<'b>, - ) -> Result> { + tx: Transaction, + ) -> Result { self.0.install(target, component, short_name, tx) } fn components(&self) -> Vec { @@ -603,26 +605,26 @@ impl Package for TarXzPackage<'_> { } #[derive(Debug)] -pub(crate) struct TarZStdPackage<'a>(TarPackage<'a>); +pub(crate) struct TarZStdPackage(TarPackage); -impl<'a> TarZStdPackage<'a> { - pub(crate) fn new(stream: R, cx: &PackageContext<'a>) -> Result { +impl TarZStdPackage { + pub(crate) fn new(stream: R, cx: &PackageContext) -> Result { let stream = zstd::stream::read::Decoder::new(stream)?; Ok(TarZStdPackage(TarPackage::new(stream, cx)?)) } } -impl Package for TarZStdPackage<'_> { +impl Package for TarZStdPackage { fn contains(&self, component: &str, short_name: Option<&str>) -> bool { self.0.contains(component, short_name) } - fn install<'b>( + fn install( &self, target: &Components, component: &str, short_name: Option<&str>, - tx: Transaction<'b>, - ) -> Result> { + tx: Transaction, + ) -> Result { self.0.install(target, component, short_name, tx) } fn components(&self) -> Vec { @@ -630,8 +632,8 @@ impl Package for TarZStdPackage<'_> { } } -pub(crate) struct PackageContext<'a> { - pub(crate) tmp_cx: &'a temp::Context, - pub(crate) notify_handler: Option<&'a dyn Fn(Notification<'_>)>, - pub(crate) process: &'a Process, +pub(crate) struct PackageContext { + pub(crate) tmp_cx: Arc, + pub(crate) notify_handler: Option)>>, + pub(crate) process: Arc, } diff --git a/src/dist/component/transaction.rs b/src/dist/component/transaction.rs index 4252de8a02..b96e31b717 100644 --- a/src/dist/component/transaction.rs +++ b/src/dist/component/transaction.rs @@ -9,15 +9,15 @@ //! FIXME: This uses ensure_dir_exists in some places but rollback //! does not remove any dirs created by it. -use std::fs::File; use std::path::{Path, PathBuf}; +use std::{fs::File, sync::Arc}; use anyhow::{Context, Result, anyhow}; use crate::dist::prefix::InstallPrefix; use crate::dist::temp; use crate::errors::*; -use crate::notifications::Notification; +use crate::notifications::{Notification, NotifyHandler}; use crate::process::Process; use crate::utils; @@ -34,21 +34,21 @@ use crate::utils; /// /// All operations that create files will fail if the destination /// already exists. -pub struct Transaction<'a> { +pub struct Transaction { prefix: InstallPrefix, - changes: Vec>, - tmp_cx: &'a temp::Context, - notify_handler: &'a dyn Fn(Notification<'_>), + changes: Vec, + tmp_cx: Arc, + notify_handler: Arc, committed: bool, - process: &'a Process, + process: Arc, } -impl<'a> Transaction<'a> { +impl Transaction { pub fn new( prefix: InstallPrefix, - tmp_cx: &'a temp::Context, - notify_handler: &'a dyn Fn(Notification<'_>), - process: &'a Process, + tmp_cx: Arc, + notify_handler: Arc, + process: Arc, ) -> Self { Transaction { prefix, @@ -66,7 +66,7 @@ impl<'a> Transaction<'a> { self.committed = true; } - fn change(&mut self, item: ChangedItem<'a>) { + fn change(&mut self, item: ChangedItem) { self.changes.push(item); } @@ -103,9 +103,9 @@ impl<'a> Transaction<'a> { &self.prefix, component, relpath, - self.tmp_cx, - self.notify_handler(), - self.process, + Arc::clone(&self.tmp_cx), + &*self.notify_handler, + &self.process, )?; self.change(item); Ok(()) @@ -119,9 +119,9 @@ impl<'a> Transaction<'a> { &self.prefix, component, relpath, - self.tmp_cx, - self.notify_handler(), - self.process, + Arc::clone(&self.tmp_cx), + &*self.notify_handler, + &self.process, )?; self.change(item); Ok(()) @@ -148,7 +148,7 @@ impl<'a> Transaction<'a> { /// This is used for arbitrarily manipulating a file. pub fn modify_file(&mut self, relpath: PathBuf) -> Result<()> { assert!(relpath.is_relative()); - let item = ChangedItem::modify_file(&self.prefix, relpath, self.tmp_cx)?; + let item = ChangedItem::modify_file(&self.prefix, relpath, Arc::clone(&self.tmp_cx))?; self.change(item); Ok(()) } @@ -166,8 +166,8 @@ impl<'a> Transaction<'a> { component, relpath, src, - self.notify_handler(), - self.process, + &*self.notify_handler, + &self.process, )?; self.change(item); Ok(()) @@ -181,31 +181,32 @@ impl<'a> Transaction<'a> { component, relpath, src, - self.notify_handler(), - self.process, + &*self.notify_handler, + &self.process, )?; self.change(item); Ok(()) } - pub(crate) fn temp(&self) -> &'a temp::Context { - self.tmp_cx + pub(crate) fn temp(&self) -> Arc { + Arc::clone(&self.tmp_cx) } - pub(crate) fn notify_handler(&self) -> &'a dyn Fn(Notification<'_>) { - self.notify_handler + + pub(crate) fn notify_handler(&self) -> Arc { + Arc::clone(&self.notify_handler) } } /// If a Transaction is dropped without being committed, the changes /// are automatically rolled back. -impl Drop for Transaction<'_> { +impl Drop for Transaction { fn drop(&mut self) { if !self.committed { (self.notify_handler)(Notification::RollingBack); for item in self.changes.iter().rev() { // ok_ntfy!(self.notify_handler, // Notification::NonFatalError, - match item.roll_back(&self.prefix, self.notify_handler(), self.process) { + match item.roll_back(&self.prefix, &*self.notify_handler(), &self.process) { Ok(()) => {} Err(e) => { (self.notify_handler)(Notification::NonFatalError(&e)); @@ -221,19 +222,19 @@ impl Drop for Transaction<'_> { /// package, or updating a component, distill down into a series of /// these primitives. #[derive(Debug)] -enum ChangedItem<'a> { +enum ChangedItem { AddedFile(PathBuf), AddedDir(PathBuf), - RemovedFile(PathBuf, temp::File<'a>), - RemovedDir(PathBuf, temp::Dir<'a>), - ModifiedFile(PathBuf, Option>), + RemovedFile(PathBuf, Arc), + RemovedDir(PathBuf, Arc), + ModifiedFile(PathBuf, Option>), } -impl<'a> ChangedItem<'a> { +impl ChangedItem { fn roll_back( &self, prefix: &InstallPrefix, - notify: &'a dyn Fn(Notification<'_>), + notify: &NotifyHandler, process: &Process, ) -> Result<()> { use self::ChangedItem::*; @@ -259,6 +260,7 @@ impl<'a> ChangedItem<'a> { } Ok(()) } + fn dest_abs_path(prefix: &InstallPrefix, component: &str, relpath: &Path) -> Result { let abs_path = prefix.abs_path(relpath); if utils::path_exists(&abs_path) { @@ -273,12 +275,14 @@ impl<'a> ChangedItem<'a> { Ok(abs_path) } } + fn add_file(prefix: &InstallPrefix, component: &str, relpath: PathBuf) -> Result<(Self, File)> { let abs_path = ChangedItem::dest_abs_path(prefix, component, &relpath)?; let file = File::create(&abs_path) .with_context(|| format!("error creating file '{}'", abs_path.display()))?; Ok((ChangedItem::AddedFile(relpath), file)) } + fn copy_file( prefix: &InstallPrefix, component: &str, @@ -289,6 +293,7 @@ impl<'a> ChangedItem<'a> { utils::copy_file(src, &abs_path)?; Ok(ChangedItem::AddedFile(relpath)) } + fn copy_dir( prefix: &InstallPrefix, component: &str, @@ -299,12 +304,13 @@ impl<'a> ChangedItem<'a> { utils::copy_dir(src, &abs_path, &|_: Notification<'_>| ())?; Ok(ChangedItem::AddedDir(relpath)) } + fn remove_file( prefix: &InstallPrefix, component: &str, relpath: PathBuf, - tmp_cx: &'a temp::Context, - notify: &'a dyn Fn(Notification<'_>), + tmp_cx: Arc, + notify: &NotifyHandler, process: &Process, ) -> Result { let abs_path = prefix.abs_path(&relpath); @@ -317,15 +323,16 @@ impl<'a> ChangedItem<'a> { .into()) } else { utils::rename("component", &abs_path, &backup, notify, process)?; - Ok(ChangedItem::RemovedFile(relpath, backup)) + Ok(ChangedItem::RemovedFile(relpath, Arc::new(backup))) } } + fn remove_dir( prefix: &InstallPrefix, component: &str, relpath: PathBuf, - tmp_cx: &'a temp::Context, - notify: &'a dyn Fn(Notification<'_>), + tmp_cx: Arc, + notify: &NotifyHandler, process: &Process, ) -> Result { let abs_path = prefix.abs_path(&relpath); @@ -338,20 +345,21 @@ impl<'a> ChangedItem<'a> { .into()) } else { utils::rename("component", &abs_path, &backup.join("bk"), notify, process)?; - Ok(ChangedItem::RemovedDir(relpath, backup)) + Ok(ChangedItem::RemovedDir(relpath, Arc::new(backup))) } } + fn modify_file( prefix: &InstallPrefix, relpath: PathBuf, - tmp_cx: &'a temp::Context, + tmp_cx: Arc, ) -> Result { let abs_path = prefix.abs_path(&relpath); if utils::is_file(&abs_path) { let backup = tmp_cx.new_file()?; utils::copy_file(&abs_path, &backup)?; - Ok(ChangedItem::ModifiedFile(relpath, Some(backup))) + Ok(ChangedItem::ModifiedFile(relpath, Some(Arc::new(backup)))) } else { if let Some(p) = abs_path.parent() { utils::ensure_dir_exists("component", p, &|_: Notification<'_>| {})?; @@ -359,24 +367,26 @@ impl<'a> ChangedItem<'a> { Ok(ChangedItem::ModifiedFile(relpath, None)) } } + fn move_file( prefix: &InstallPrefix, component: &str, relpath: PathBuf, src: &Path, - notify: &'a dyn Fn(Notification<'_>), + notify: &NotifyHandler, process: &Process, ) -> Result { let abs_path = ChangedItem::dest_abs_path(prefix, component, &relpath)?; utils::rename("component", src, &abs_path, notify, process)?; Ok(ChangedItem::AddedFile(relpath)) } + fn move_dir( prefix: &InstallPrefix, component: &str, relpath: PathBuf, src: &Path, - notify: &'a dyn Fn(Notification<'_>), + notify: &NotifyHandler, process: &Process, ) -> Result { let abs_path = ChangedItem::dest_abs_path(prefix, component, &relpath)?; diff --git a/src/dist/download.rs b/src/dist/download.rs index d7755a57f5..8670b8f070 100644 --- a/src/dist/download.rs +++ b/src/dist/download.rs @@ -1,6 +1,7 @@ use std::fs; use std::ops; use std::path::{Path, PathBuf}; +use std::sync::Arc; use anyhow::{Context, Result, anyhow}; use sha2::{Digest, Sha256}; @@ -10,19 +11,19 @@ use crate::dist::temp; use crate::download::download_file; use crate::download::download_file_with_resume; use crate::errors::*; -use crate::notifications::Notification; +use crate::notifications::{Notification, NotifyHandler}; use crate::process::Process; use crate::utils; const UPDATE_HASH_LEN: usize = 20; -#[derive(Copy, Clone)] -pub struct DownloadCfg<'a> { - pub dist_root: &'a str, - pub tmp_cx: &'a temp::Context, - pub download_dir: &'a PathBuf, - pub notify_handler: &'a dyn Fn(Notification<'_>), - pub process: &'a Process, +#[derive(Clone)] +pub struct DownloadCfg { + pub dist_root: Arc, + pub tmp_cx: Arc, + pub download_dir: Arc, + pub notify_handler: Arc, + pub process: Arc, } pub(crate) struct File { @@ -37,7 +38,7 @@ impl ops::Deref for File { } } -impl<'a> DownloadCfg<'a> { +impl DownloadCfg { /// Downloads a file and validates its hash. Resumes interrupted downloads. /// Partial downloads are stored in `self.download_dir`, keyed by hash. If the /// target file already exists, then the hash is checked and it is returned @@ -45,13 +46,13 @@ impl<'a> DownloadCfg<'a> { pub(crate) async fn download(&self, url: &Url, hash: &str) -> Result { utils::ensure_dir_exists( "Download Directory", - self.download_dir, - &self.notify_handler, + &self.download_dir, + &*self.notify_handler, )?; let target_file = self.download_dir.join(Path::new(hash)); if target_file.exists() { - let cached_result = file_hash(&target_file, self.notify_handler)?; + let cached_result = file_hash(&target_file, &*self.notify_handler)?; if hash == cached_result { (self.notify_handler)(Notification::FileAlreadyDownloaded); (self.notify_handler)(Notification::ChecksumValid(url.as_ref())); @@ -81,7 +82,7 @@ impl<'a> DownloadCfg<'a> { Some(&mut hasher), true, &|n| (self.notify_handler)(n), - self.process, + &self.process, ) .await { @@ -115,8 +116,8 @@ impl<'a> DownloadCfg<'a> { "downloaded", &partial_file_path, &target_file, - self.notify_handler, - self.process, + &*self.notify_handler, + &self.process, )?; Ok(File { path: target_file }) } @@ -134,14 +135,14 @@ impl<'a> DownloadCfg<'a> { async fn download_hash(&self, url: &str) -> Result { let hash_url = utils::parse_url(&(url.to_owned() + ".sha256"))?; - let hash_file = self.tmp_cx.new_file()?; + let hash_file = Arc::clone(&self.tmp_cx).new_file()?; download_file( &hash_url, &hash_file, None, &|n| (self.notify_handler)(n), - self.process, + &self.process, ) .await?; @@ -158,7 +159,7 @@ impl<'a> DownloadCfg<'a> { url_str: &str, update_hash: Option<&Path>, ext: &str, - ) -> Result, String)>> { + ) -> Result> { let hash = self.download_hash(url_str).await?; let partial_hash: String = hash.chars().take(UPDATE_HASH_LEN).collect(); @@ -178,7 +179,7 @@ impl<'a> DownloadCfg<'a> { } let url = utils::parse_url(url_str)?; - let file = self.tmp_cx.new_file_with_ext("", ext)?; + let file = Arc::clone(&self.tmp_cx).new_file_with_ext("", ext)?; let mut hasher = Sha256::new(); download_file( @@ -186,7 +187,7 @@ impl<'a> DownloadCfg<'a> { &file, Some(&mut hasher), &|n| (self.notify_handler)(n), - self.process, + &self.process, ) .await?; let actual_hash = format!("{:x}", hasher.finalize()); diff --git a/src/dist/manifestation.rs b/src/dist/manifestation.rs index e5a23cb8cb..8e9af4cf4d 100644 --- a/src/dist/manifestation.rs +++ b/src/dist/manifestation.rs @@ -6,10 +6,10 @@ mod tests; use std::path::Path; -use anyhow::{Context, Result, anyhow, bail}; +use anyhow::{Context, Error, Result, anyhow, bail}; use futures_util::stream::StreamExt; use std::sync::Arc; -use tokio::sync::Semaphore; +use tokio::sync::{Semaphore, mpsc}; use tracing::info; use url::Url; @@ -23,7 +23,7 @@ use crate::dist::prefix::InstallPrefix; use crate::dist::temp; use crate::dist::{DEFAULT_DIST_SERVER, Profile, TargetTriple}; use crate::errors::RustupError; -use crate::notifications::Notification; +use crate::notifications::{Notification, NotifyHandler}; use crate::process::Process; use crate::utils; @@ -106,16 +106,17 @@ impl Manifestation { /// It is *not* safe to run two updates concurrently. See /// https://github.com/rust-lang/rustup/issues/988 for the details. pub async fn update( - &self, - new_manifest: &Manifest, + self: Arc, + new_manifest: Arc, changes: Changes, force_update: bool, - download_cfg: &DownloadCfg<'_>, + download_cfg: DownloadCfg, toolchain_str: &str, implicit_modify: bool, ) -> Result { // Some vars we're going to need a few times - let tmp_cx = download_cfg.tmp_cx; + let download_cfg = Arc::new(download_cfg); + let tmp_cx = Arc::clone(&download_cfg.tmp_cx); let prefix = self.installation.prefix(); let rel_installed_manifest_path = prefix.rel_manifest_file(DIST_MANIFEST); let installed_manifest_path = prefix.path().join(&rel_installed_manifest_path); @@ -123,11 +124,11 @@ impl Manifestation { // Create the lists of components needed for installation let config = self.read_config()?; let mut update = Update::build_update( - self, - new_manifest, + &self, + &new_manifest, &changes, &config, - &download_cfg.notify_handler, + &*download_cfg.notify_handler, )?; if update.nothing_changes() { @@ -135,7 +136,7 @@ impl Manifestation { } // Validate that the requested components are available - if let Err(e) = update.unavailable_components(new_manifest, toolchain_str) { + if let Err(e) = update.unavailable_components(&new_manifest, toolchain_str) { if !force_update { return Err(e); } @@ -144,7 +145,7 @@ impl Manifestation { { for component in &components { (download_cfg.notify_handler)(Notification::ForcingUnavailableComponent( - &component.name(new_manifest), + &component.name(&new_manifest), )); } update.drop_components_to_install(&components); @@ -154,9 +155,8 @@ impl Manifestation { let altered = tmp_cx.dist_server != DEFAULT_DIST_SERVER; // Download component packages and validate hashes - let mut things_to_install = Vec::new(); - let mut things_downloaded = Vec::new(); - let components = update.components_urls_and_hashes(new_manifest)?; + let mut things_downloaded: Vec = Vec::new(); + let components = update.components_urls_and_hashes(&new_manifest)?; let components_len = components.len(); const DEFAULT_CONCURRENT_DOWNLOADS: usize = 2; @@ -173,60 +173,28 @@ impl Manifestation { .and_then(|s| s.parse().ok()) .unwrap_or(DEFAULT_MAX_RETRIES); + // Begin transaction before the downloads, as installations are interleaved with those + let mut tx = Transaction::new( + prefix.clone(), + Arc::clone(&download_cfg.tmp_cx), + Arc::clone(&download_cfg.notify_handler), + Arc::clone(&download_cfg.process), + ); + + // If the previous installation was from a v1 manifest we need + // to uninstall it first. + tx = self.maybe_handle_v2_upgrade(&config, tx, &download_cfg.process)?; + info!("downloading component(s)"); for bin in &components { (download_cfg.notify_handler)(Notification::DownloadingComponent( - &bin.component.short_name(new_manifest), + &bin.component.short_name(&new_manifest), &self.target_triple, bin.component.target.as_ref(), &bin.binary.url, )); } - let semaphore = Arc::new(Semaphore::new(concurrent_downloads)); - let component_stream = tokio_stream::iter(components.into_iter()).map(|bin| { - let sem = semaphore.clone(); - async move { - let _permit = sem.acquire().await.unwrap(); - let url = if altered { - utils::parse_url( - &bin.binary - .url - .replace(DEFAULT_DIST_SERVER, tmp_cx.dist_server.as_str()), - )? - } else { - utils::parse_url(&bin.binary.url)? - }; - - bin.download(&url, download_cfg, max_retries, new_manifest) - .await - .map(|downloaded| (bin, downloaded)) - } - }); - if components_len > 0 { - let results = component_stream - .buffered(components_len) - .collect::>() - .await; - for result in results { - let (bin, downloaded_file) = result?; - things_downloaded.push(bin.binary.hash.clone()); - things_to_install.push((bin.component, bin.binary.compression, downloaded_file)); - } - } - - // Begin transaction - let mut tx = Transaction::new( - prefix.clone(), - tmp_cx, - download_cfg.notify_handler, - download_cfg.process, - ); - - // If the previous installation was from a v1 manifest we need - // to uninstall it first. - tx = self.maybe_handle_v2_upgrade(&config, tx, download_cfg.process)?; - // Uninstall components for component in &update.components_to_uninstall { let notification = if implicit_modify { @@ -235,63 +203,125 @@ impl Manifestation { Notification::RemovingComponent }; (download_cfg.notify_handler)(notification( - &component.short_name(new_manifest), + &component.short_name(&new_manifest), &self.target_triple, component.target.as_ref(), )); tx = self.uninstall_component( component, - new_manifest, + &new_manifest, tx, - &download_cfg.notify_handler, - download_cfg.process, + &*download_cfg.notify_handler, + &download_cfg.process, )?; } - // Install components - for (component, format, installer_file) in things_to_install { - // For historical reasons, the rust-installer component - // names are not the same as the dist manifest component - // names. Some are just the component name some are the - // component name plus the target triple. - let pkg_name = component.name_in_manifest(); - let short_pkg_name = component.short_name_in_manifest(); - let short_name = component.short_name(new_manifest); - - (download_cfg.notify_handler)(Notification::InstallingComponent( - &short_name, - &self.target_triple, - component.target.as_ref(), - )); + if components_len > 0 { + // Create a channel to communicate whenever a download is done and the component can be installed + // The `mpsc` channel was used as we need to send many messages from one producer (download's thread) to one consumer (install's thread) + // This is recommended in the official docs: https://docs.rs/tokio/latest/tokio/sync/index.html#mpsc-channel + let total_components = components.len(); + let (download_tx, mut download_rx) = + mpsc::channel::>(total_components); + + let semaphore = Arc::new(Semaphore::new(concurrent_downloads)); + let component_stream = tokio_stream::iter(components.into_iter()).map({ + let download_tx = download_tx.clone(); + { + let new_manifest = Arc::clone(&new_manifest); + let download_cfg = download_cfg.clone(); + move |bin| { + let sem = semaphore.clone(); + let download_tx = download_tx.clone(); + let tmp_cx = Arc::clone(&tmp_cx); + let new_manifest = Arc::clone(&new_manifest); + let download_cfg = download_cfg.clone(); + async move { + let _permit = sem.acquire().await.unwrap(); + let url = if altered { + utils::parse_url( + &bin.binary + .url + .replace(DEFAULT_DIST_SERVER, tmp_cx.dist_server.as_str()), + )? + } else { + utils::parse_url(&bin.binary.url)? + }; + + let installer_file = bin + .download(&url, &download_cfg, max_retries, &new_manifest) + .await?; + let hash = bin.binary.hash.clone(); + let _ = download_tx.send(Ok((bin, installer_file))).await; + Ok(hash) + } + } + } + }); - let cx = PackageContext { - tmp_cx, - notify_handler: Some(download_cfg.notify_handler), - process: download_cfg.process, + let mut stream = component_stream.buffered(components_len); + let download_handle = async move { + let mut hashes = Vec::new(); + while let Some(result) = stream.next().await { + match result { + Ok(hash) => { + hashes.push(hash); + } + Err(e) => { + let _ = download_tx.send(Err(e)).await; + } + } + } + hashes }; - - let reader = utils::FileReaderWithProgress::new_file( - &installer_file, - download_cfg.notify_handler, - )?; - let package = match format { - CompressionKind::GZip => &TarGzPackage::new(reader, &cx)? as &dyn Package, - CompressionKind::XZ => &TarXzPackage::new(reader, &cx)?, - CompressionKind::ZStd => &TarZStdPackage::new(reader, &cx)?, + let install_handle = { + let new_manifest = Arc::clone(&new_manifest); + let download_cfg = Arc::clone(&download_cfg); + async move { + let mut current_tx = tx; + let mut counter = 0; + while counter < total_components + && let Some(message) = download_rx.recv().await + { + let (component_bin, installer_file) = message?; + let component_name = component_bin.component.short_name(&new_manifest); + let notify_handler = Arc::clone(&download_cfg.notify_handler); + current_tx = tokio::task::spawn_blocking({ + let this = Arc::clone(&self); + let new_manifest = Arc::clone(&new_manifest); + let tmp_cx = Arc::clone(&download_cfg.tmp_cx); + let download_cfg = Arc::clone(&download_cfg); + move || { + this.install_component( + component_bin, + installer_file, + tmp_cx, + download_cfg, + new_manifest, + current_tx, + ) + } + }) + .await??; + (notify_handler)(Notification::ComponentInstalled( + &component_name, + &self.target_triple, + Some(&self.target_triple), + )); + counter += 1; + } + Ok::<_, Error>(current_tx) + } }; - // If the package doesn't contain the component that the - // manifest says it does then somebody must be playing a joke on us. - if !package.contains(&pkg_name, Some(short_pkg_name)) { - return Err(RustupError::CorruptComponent(short_name).into()); - } - - tx = package.install(&self.installation, &pkg_name, Some(short_pkg_name), tx)?; + let (download_results, install_result) = tokio::join!(download_handle, install_handle); + things_downloaded = download_results; + tx = install_result?; } // Install new distribution manifest - let new_manifest_str = new_manifest.clone().stringify()?; + let new_manifest_str = (*new_manifest).clone().stringify()?; tx.modify_file(rel_installed_manifest_path)?; utils::write_file("manifest", &installed_manifest_path, &new_manifest_str)?; @@ -302,7 +332,7 @@ impl Manifestation { // `Components` *also* tracks what is installed, but it only tracks names, not // name/target. Needs to be fixed in rust-installer. let new_config = Config { - components: update.final_component_list, + components: update.final_component_list.clone(), ..Config::default() }; let config_str = new_config.stringify()?; @@ -322,14 +352,19 @@ impl Manifestation { #[cfg(test)] pub fn uninstall( &self, - manifest: &Manifest, - tmp_cx: &temp::Context, - notify_handler: &dyn Fn(Notification<'_>), - process: &Process, + manifest: Arc, + tmp_cx: Arc, + notify_handler: Arc, + process: Arc, ) -> Result<()> { let prefix = self.installation.prefix(); - let mut tx = Transaction::new(prefix.clone(), tmp_cx, notify_handler, process); + let mut tx = Transaction::new( + prefix.clone(), + Arc::clone(&tmp_cx), + Arc::clone(¬ify_handler), + Arc::clone(&process), + ); // Read configuration and delete it let rel_config_path = prefix.rel_manifest_file(CONFIG_FILE); @@ -342,21 +377,21 @@ impl Manifestation { tx.remove_file("dist config", rel_config_path)?; for component in config.components { - tx = self.uninstall_component(&component, manifest, tx, notify_handler, process)?; + tx = self.uninstall_component(&component, &manifest, tx, &*notify_handler, &process)?; } tx.commit(); Ok(()) } - fn uninstall_component<'a>( + fn uninstall_component( &self, component: &Component, manifest: &Manifest, - mut tx: Transaction<'a>, - notify_handler: &dyn Fn(Notification<'_>), + mut tx: Transaction, + notify_handler: &NotifyHandler, process: &Process, - ) -> Result> { + ) -> Result { // For historical reasons, the rust-installer component // names are not the same as the dist manifest component // names. Some are just the component name some are the @@ -417,9 +452,9 @@ impl Manifestation { &self, new_manifest: &[String], update_hash: Option<&Path>, - tmp_cx: &temp::Context, - notify_handler: &dyn Fn(Notification<'_>), - process: &Process, + tmp_cx: Arc, + notify_handler: Arc, + process: Arc, ) -> Result> { // If there's already a v2 installation then something has gone wrong if self.read_config()?.is_some() { @@ -452,11 +487,11 @@ impl Manifestation { use std::path::PathBuf; let dld_dir = PathBuf::from("bogus"); let dlcfg = DownloadCfg { - dist_root: "bogus", - download_dir: &dld_dir, - tmp_cx, - notify_handler, - process, + dist_root: Arc::from("bogus".to_string()), + download_dir: Arc::new(dld_dir), + tmp_cx: Arc::clone(&tmp_cx), + notify_handler: Arc::clone(¬ify_handler), + process: Arc::clone(&process), }; let dl = dlcfg @@ -476,19 +511,24 @@ impl Manifestation { )); // Begin transaction - let mut tx = Transaction::new(prefix, tmp_cx, notify_handler, process); + let mut tx = Transaction::new( + prefix, + Arc::clone(&tmp_cx), + Arc::clone(¬ify_handler), + Arc::clone(&process), + ); // Uninstall components let components = self.installation.list()?; for component in components { - tx = component.uninstall(tx, process)?; + tx = component.uninstall(tx, &process)?; } // Install all the components in the installer - let reader = utils::FileReaderWithProgress::new_file(&installer_file, notify_handler)?; + let reader = utils::FileReaderWithProgress::new_file(&installer_file, &*notify_handler)?; let cx = PackageContext { tmp_cx, - notify_handler: Some(notify_handler), + notify_handler: Some(notify_handler.clone()), process, }; @@ -507,12 +547,12 @@ impl Manifestation { // doesn't have a configuration or manifest-derived list of // component/target pairs. Uninstall it using the installer's // component list before upgrading. - fn maybe_handle_v2_upgrade<'a>( + fn maybe_handle_v2_upgrade( &self, config: &Option, - mut tx: Transaction<'a>, + mut tx: Transaction, process: &Process, - ) -> Result> { + ) -> Result { let installed_components = self.installation.list()?; let looks_like_v1 = config.is_none() && !installed_components.is_empty(); @@ -526,6 +566,54 @@ impl Manifestation { Ok(tx) } + + fn install_component( + &self, + component_bin: ComponentBinary, + installer_file: File, + tmp_cx: Arc, + download_cfg: Arc, + new_manifest: Arc, + tx: Transaction, + ) -> Result { + // For historical reasons, the rust-installer component + // names are not the same as the dist manifest component + // names. Some are just the component name some are the + // component name plus the target triple. + let pkg_name = component_bin.component.name_in_manifest(); + let short_pkg_name = component_bin.component.short_name_in_manifest(); + let short_name = component_bin.component.short_name(&new_manifest); + + (download_cfg.notify_handler)(Notification::InstallingComponent( + &short_name, + &self.target_triple, + component_bin.component.target.as_ref(), + )); + + let cx = PackageContext { + tmp_cx, + notify_handler: Some(download_cfg.notify_handler.clone()), + process: download_cfg.process.clone(), + }; + + let reader = utils::FileReaderWithProgress::new_file( + &installer_file, + &*download_cfg.notify_handler, + )?; + let package = match component_bin.binary.compression { + CompressionKind::GZip => &TarGzPackage::new(reader, &cx)? as &dyn Package, + CompressionKind::XZ => &TarXzPackage::new(reader, &cx)?, + CompressionKind::ZStd => &TarZStdPackage::new(reader, &cx)?, + }; + + // If the package doesn't contain the component that the + // manifest says it does then somebody must be playing a joke on us. + if !package.contains(&pkg_name, Some(short_pkg_name)) { + return Err(RustupError::CorruptComponent(short_name).into()); + } + + package.install(&self.installation, &pkg_name, Some(short_pkg_name), tx) + } } #[derive(Debug)] @@ -720,7 +808,7 @@ impl Update { fn components_urls_and_hashes<'a>( &'a self, new_manifest: &'a Manifest, - ) -> Result>> { + ) -> Result> { let mut components_urls_and_hashes = Vec::new(); for component in &self.components_to_install { let package = new_manifest.get_package(component.short_name_in_manifest())?; @@ -733,8 +821,8 @@ impl Update { // We prefer the first format in the list, since the parsing of the // manifest leaves us with the files/hash pairs in preference order. components_urls_and_hashes.push(ComponentBinary { - component, - binary: &target_package.bins[0], + component: Arc::new(component.clone()), + binary: Arc::new(target_package.bins[0].clone()), }); } @@ -742,16 +830,16 @@ impl Update { } } -struct ComponentBinary<'a> { - component: &'a Component, - binary: &'a HashedBinary, +struct ComponentBinary { + component: Arc, + binary: Arc, } -impl<'a> ComponentBinary<'a> { +impl ComponentBinary { async fn download( &self, url: &Url, - download_cfg: &DownloadCfg<'_>, + download_cfg: &DownloadCfg, max_retries: usize, new_manifest: &Manifest, ) -> Result { diff --git a/src/dist/manifestation/tests.rs b/src/dist/manifestation/tests.rs index c8564f6a39..15497065a6 100644 --- a/src/dist/manifestation/tests.rs +++ b/src/dist/manifestation/tests.rs @@ -3,12 +3,11 @@ #![allow(clippy::type_complexity)] use std::{ - cell::Cell, collections::HashMap, env, fs, path::{Path, PathBuf}, str::FromStr, - sync::Arc, + sync::{Arc, Mutex}, }; use anyhow::{Result, anyhow}; @@ -457,7 +456,7 @@ impl TestContext { let tmp_cx = temp::Context::new( work_tempdir.path().to_owned(), DEFAULT_DIST_SERVER, - Box::new(|_| ()), + Arc::new(|_| ()), ); let toolchain = ToolchainDesc::from_str("nightly-x86_64-apple-darwin").unwrap(); @@ -475,13 +474,13 @@ impl TestContext { } } - fn default_dl_cfg(&self) -> DownloadCfg<'_> { + fn default_dl_cfg(&self) -> DownloadCfg { DownloadCfg { - dist_root: "phony", - tmp_cx: &self.tmp_cx, - download_dir: &self.download_dir, - notify_handler: &|event| println!("{event}"), - process: &self.tp.process, + dist_root: Arc::from("phony".to_string()), + tmp_cx: Arc::new(self.tmp_cx.clone()), + download_dir: Arc::new(self.download_dir.clone()), + notify_handler: Arc::new(|event| println!("{event}")), + process: Arc::new(self.tp.process.clone()), } } @@ -495,7 +494,7 @@ impl TestContext { remove: &[Component], force: bool, ) -> Result { - self.update_from_dist_with_dl_cfg(add, remove, force, &self.default_dl_cfg()) + self.update_from_dist_with_dl_cfg(add, remove, force, self.default_dl_cfg()) .await } @@ -504,12 +503,19 @@ impl TestContext { add: &[Component], remove: &[Component], force: bool, - dl_cfg: &DownloadCfg<'_>, + dl_cfg: DownloadCfg, ) -> Result { // Download the dist manifest and place it into the installation prefix let manifest_url = make_manifest_url(&self.url, &self.toolchain)?; - let manifest_file = self.tmp_cx.new_file()?; - download_file(&manifest_url, &manifest_file, None, &|_| {}, dl_cfg.process).await?; + let manifest_file = Arc::new(self.tmp_cx.clone()).new_file()?; + download_file( + &manifest_url, + &manifest_file, + None, + &|_| {}, + &dl_cfg.process, + ) + .await?; let manifest_str = utils::read_file("manifest", &manifest_file)?; let manifest = Manifest::parse(&manifest_str)?; @@ -527,9 +533,9 @@ impl TestContext { remove_components: remove.to_owned(), }; - manifestation + Arc::new(manifestation) .update( - &manifest, + Arc::new(manifest), changes, force, dl_cfg, @@ -544,7 +550,12 @@ impl TestContext { let manifestation = Manifestation::open(self.prefix.clone(), trip)?; let manifest = manifestation.load_manifest()?.unwrap(); - manifestation.uninstall(&manifest, &self.tmp_cx, &|_| (), &self.tp.process)?; + manifestation.uninstall( + Arc::new(manifest), + Arc::new(self.tmp_cx.clone()), + Arc::new(|_| ()), + Arc::new(self.tp.process.clone()), + )?; Ok(()) } @@ -1491,27 +1502,30 @@ async fn reuse_downloaded_file() { let cx = TestContext::new(None, GZOnly); prevent_installation(&cx.prefix); - let reuse_notification_fired = Arc::new(Cell::new(false)); + let reuse_notification_fired = Arc::new(Mutex::new(false)); let dl_cfg = DownloadCfg { - notify_handler: &|n| { - if let Notification::FileAlreadyDownloaded = n { - reuse_notification_fired.set(true); + notify_handler: Arc::new({ + let reuse_notification_fired = Arc::clone(&reuse_notification_fired); + move |n| { + if let Notification::FileAlreadyDownloaded = n { + *reuse_notification_fired.lock().unwrap() = true; + } } - }, + }), ..cx.default_dl_cfg() }; - cx.update_from_dist_with_dl_cfg(&[], &[], false, &dl_cfg) + cx.update_from_dist_with_dl_cfg(&[], &[], false, dl_cfg.clone()) .await .unwrap_err(); - assert!(!reuse_notification_fired.get()); + assert!(!*reuse_notification_fired.lock().unwrap()); allow_installation(&cx.prefix); - cx.update_from_dist_with_dl_cfg(&[], &[], false, &dl_cfg) + cx.update_from_dist_with_dl_cfg(&[], &[], false, dl_cfg) .await .unwrap(); - assert!(reuse_notification_fired.get()); + assert!(*reuse_notification_fired.lock().unwrap()); } #[tokio::test] @@ -1530,21 +1544,24 @@ async fn checks_files_hashes_before_reuse() { utils::write_file("bad previous download", &prev_download, "bad content").unwrap(); println!("wrote previous download to {}", prev_download.display()); - let noticed_bad_checksum = Arc::new(Cell::new(false)); + let noticed_bad_checksum = Arc::new(Mutex::new(false)); let dl_cfg = DownloadCfg { - notify_handler: &|n| { - if let Notification::CachedFileChecksumFailed = n { - noticed_bad_checksum.set(true); + notify_handler: Arc::new({ + let noticed_bad_checksum = Arc::clone(¬iced_bad_checksum); + move |n| { + if let Notification::CachedFileChecksumFailed = n { + *noticed_bad_checksum.lock().unwrap() = true; + } } - }, + }), ..cx.default_dl_cfg() }; - cx.update_from_dist_with_dl_cfg(&[], &[], false, &dl_cfg) + cx.update_from_dist_with_dl_cfg(&[], &[], false, dl_cfg) .await .unwrap(); - assert!(noticed_bad_checksum.get()); + assert!(*noticed_bad_checksum.lock().unwrap()); } #[tokio::test] diff --git a/src/dist/mod.rs b/src/dist/mod.rs index faef0a0360..0b05d36125 100644 --- a/src/dist/mod.rs +++ b/src/dist/mod.rs @@ -1,7 +1,13 @@ //! Installation from a Rust distribution server use std::{ - collections::HashSet, env, fmt, io::Write, ops::Deref, path::Path, str::FromStr, sync::LazyLock, + collections::HashSet, + env, fmt, + io::Write, + ops::Deref, + path::Path, + str::FromStr, + sync::{Arc, LazyLock}, }; use anyhow::{Context, Result, anyhow, bail}; @@ -883,7 +889,7 @@ pub(crate) struct DistOptions<'a> { pub(crate) toolchain: &'a ToolchainDesc, pub(crate) profile: Profile, pub(crate) update_hash: Option<&'a Path>, - pub(crate) dl_cfg: DownloadCfg<'a>, + pub(crate) dl_cfg: DownloadCfg, /// --force bool is whether to force an update/install pub(crate) force: bool, /// --allow-downgrade @@ -966,7 +972,7 @@ pub(crate) async fn update_from_dist( let mut toolchain = opts.toolchain.clone(); let res = loop { let result = try_update_from_dist_( - opts.dl_cfg, + opts.dl_cfg.clone(), opts.update_hash, &toolchain, match opts.exists { @@ -1050,7 +1056,7 @@ pub(crate) async fn update_from_dist( // Don't leave behind an empty / broken installation directory if res.is_err() && fresh_install { // FIXME Ignoring cascading errors - let _ = utils::remove_dir("toolchain", prefix.path(), opts.dl_cfg.notify_handler); + let _ = utils::remove_dir("toolchain", prefix.path(), &*opts.dl_cfg.notify_handler); } res @@ -1058,7 +1064,7 @@ pub(crate) async fn update_from_dist( #[allow(clippy::too_many_arguments)] async fn try_update_from_dist_( - download: DownloadCfg<'_>, + download: DownloadCfg, update_hash: Option<&Path>, toolchain: &ToolchainDesc, profile: Option, @@ -1070,11 +1076,12 @@ async fn try_update_from_dist_( ) -> Result> { let toolchain_str = toolchain.to_string(); let manifestation = Manifestation::open(prefix.clone(), toolchain.target.clone())?; + let notify_handler = Arc::clone(&download.notify_handler); // TODO: Add a notification about which manifest version is going to be used - (download.notify_handler)(Notification::DownloadingManifest(&toolchain_str)); + notify_handler(Notification::DownloadingManifest(&toolchain_str)); match dl_v2_manifest( - download, + download.clone(), // Even if manifest has not changed, we must continue to install requested components. // So if components or targets is not empty, we skip passing `update_hash` so that // we essentially degenerate to `rustup component add` / `rustup target add` @@ -1088,7 +1095,7 @@ async fn try_update_from_dist_( .await { Ok(Some((m, hash))) => { - (download.notify_handler)(Notification::DownloadedManifest( + notify_handler(Notification::DownloadedManifest( &m.date, m.get_rust_version().ok(), )); @@ -1138,12 +1145,12 @@ async fn try_update_from_dist_( fetched.clone_from(&m.date); - return match manifestation + return match Arc::new(manifestation) .update( - &m, + Arc::new(m), changes, force_update, - &download, + download, &toolchain.manifest_name(), true, ) @@ -1173,7 +1180,7 @@ async fn try_update_from_dist_( Some(RustupError::ChecksumFailed { .. }) => return Ok(None), Some(RustupError::DownloadNotExists { .. }) => { // Proceed to try v1 as a fallback - (download.notify_handler)(Notification::DownloadingLegacyManifest) + notify_handler(Notification::DownloadingLegacyManifest) } _ => return Err(err), } @@ -1181,7 +1188,7 @@ async fn try_update_from_dist_( } // If the v2 manifest is not found then try v1 - let manifest = match dl_v1_manifest(download, toolchain).await { + let manifest = match dl_v1_manifest(download.clone(), toolchain).await { Ok(m) => m, Err(err) => match err.downcast_ref::() { Some(RustupError::ChecksumFailed { .. }) => return Err(err), @@ -1201,12 +1208,13 @@ async fn try_update_from_dist_( }, }; + let download = download.to_owned(); let result = manifestation .update_v1( &manifest, update_hash, download.tmp_cx, - &download.notify_handler, + download.notify_handler, download.process, ) .await; @@ -1224,11 +1232,11 @@ async fn try_update_from_dist_( } pub(crate) async fn dl_v2_manifest( - download: DownloadCfg<'_>, + download: DownloadCfg, update_hash: Option<&Path>, toolchain: &ToolchainDesc, ) -> Result> { - let manifest_url = toolchain.manifest_v2_url(download.dist_root, download.process); + let manifest_url = toolchain.manifest_v2_url(&download.dist_root, &download.process); match download .download_and_check(&manifest_url, update_hash, ".toml") .await @@ -1254,7 +1262,7 @@ pub(crate) async fn dl_v2_manifest( // Manifest checksum mismatched. warn!("{err}"); - let server = dist_root_server(download.process)?; + let server = dist_root_server(&download.process)?; if server == DEFAULT_DIST_SERVER { info!( "this is likely due to an ongoing update of the official release server, please try again later" @@ -1272,11 +1280,8 @@ pub(crate) async fn dl_v2_manifest( } } -async fn dl_v1_manifest( - download: DownloadCfg<'_>, - toolchain: &ToolchainDesc, -) -> Result> { - let root_url = toolchain.package_dir(download.dist_root); +async fn dl_v1_manifest(download: DownloadCfg, toolchain: &ToolchainDesc) -> Result> { + let root_url = toolchain.package_dir(&download.dist_root); if let Channel::Version(ver) = &toolchain.channel { // This is an explicit version. In v1 there was no manifest, @@ -1285,7 +1290,7 @@ async fn dl_v1_manifest( return Ok(vec![installer_name]); } - let manifest_url = toolchain.manifest_v1_url(download.dist_root, download.process); + let manifest_url = toolchain.manifest_v1_url(&download.dist_root, &download.process); let manifest_dl = download.download_and_check(&manifest_url, None, "").await?; let (manifest_file, _) = manifest_dl.unwrap(); let manifest_str = utils::read_file("manifest", &manifest_file)?; diff --git a/src/dist/temp.rs b/src/dist/temp.rs index d18f6d3005..d1b602f04c 100644 --- a/src/dist/temp.rs +++ b/src/dist/temp.rs @@ -1,4 +1,5 @@ use std::path::{Path, PathBuf}; +use std::sync::Arc; use std::{fmt, fs, ops}; pub(crate) use anyhow::{Context as _, Result}; @@ -18,12 +19,12 @@ pub(crate) enum CreatingError { } #[derive(Debug)] -pub(crate) struct Dir<'a> { - cfg: &'a Context, +pub(crate) struct Dir { + cfg: Arc, path: PathBuf, } -impl ops::Deref for Dir<'_> { +impl ops::Deref for Dir { type Target = Path; fn deref(&self) -> &Path { @@ -31,7 +32,7 @@ impl ops::Deref for Dir<'_> { } } -impl Drop for Dir<'_> { +impl Drop for Dir { fn drop(&mut self) { if raw::is_directory(&self.path) { let n = Notification::DirectoryDeletion( @@ -44,12 +45,12 @@ impl Drop for Dir<'_> { } #[derive(Debug)] -pub struct File<'a> { - cfg: &'a Context, +pub struct File { + cfg: Arc, path: PathBuf, } -impl ops::Deref for File<'_> { +impl ops::Deref for File { type Target = Path; fn deref(&self) -> &Path { @@ -57,7 +58,7 @@ impl ops::Deref for File<'_> { } } -impl Drop for File<'_> { +impl Drop for File { fn drop(&mut self) { if raw::is_file(&self.path) { let n = Notification::FileDeletion(&self.path, fs::remove_file(&self.path)); @@ -66,17 +67,20 @@ impl Drop for File<'_> { } } +pub type NotifyHandler = dyn for<'a> Fn(Notification<'a>) + Sync + Send; + +#[derive(Clone)] pub struct Context { root_directory: PathBuf, pub dist_server: String, - notify_handler: Box)>, + notify_handler: Arc, } impl Context { pub fn new( root_directory: PathBuf, dist_server: &str, - notify_handler: Box)>, + notify_handler: Arc, ) -> Self { Self { root_directory, @@ -92,7 +96,7 @@ impl Context { .with_context(|| CreatingError::Root(PathBuf::from(&self.root_directory))) } - pub(crate) fn new_directory(&self) -> Result> { + pub(crate) fn new_directory(self: Arc) -> Result { self.create_root()?; loop { @@ -114,11 +118,11 @@ impl Context { } } - pub fn new_file(&self) -> Result> { + pub fn new_file(self: Arc) -> Result { self.new_file_with_ext("", "") } - pub(crate) fn new_file_with_ext(&self, prefix: &str, ext: &str) -> Result> { + pub(crate) fn new_file_with_ext(self: Arc, prefix: &str, ext: &str) -> Result { self.create_root()?; loop { diff --git a/src/notifications.rs b/src/notifications.rs index 8669d12663..ccc8614656 100644 --- a/src/notifications.rs +++ b/src/notifications.rs @@ -10,6 +10,8 @@ use crate::settings::MetadataVersion; use crate::utils::units; use crate::{dist::ToolchainDesc, toolchain::ToolchainName, utils::notify::NotificationLevel}; +pub(crate) type NotifyHandler = dyn for<'a> Fn(Notification<'a>) + Sync + Send; + #[derive(Debug)] pub enum Notification<'a> { Extracting(&'a Path, &'a Path), @@ -26,6 +28,7 @@ pub enum Notification<'a> { /// The URL of the download is passed as the last argument, to allow us to track concurrent downloads. DownloadingComponent(&'a str, &'a TargetTriple, Option<&'a TargetTriple>, &'a str), InstallingComponent(&'a str, &'a TargetTriple, Option<&'a TargetTriple>), + ComponentInstalled(&'a str, &'a TargetTriple, Option<&'a TargetTriple>), RemovingComponent(&'a str, &'a TargetTriple, Option<&'a TargetTriple>), RemovingOldComponent(&'a str, &'a TargetTriple, Option<&'a TargetTriple>), DownloadingManifest(&'a str), @@ -106,6 +109,7 @@ impl Notification<'_> { Extracting(_, _) | DownloadingComponent(_, _, _, _) | InstallingComponent(_, _, _) + | ComponentInstalled(_, _, _) | RemovingComponent(_, _, _) | RemovingOldComponent(_, _, _) | ComponentAlreadyInstalled(_) @@ -201,6 +205,10 @@ impl Display for Notification<'_> { write!(f, "installing component '{}' for '{}'", c, t.unwrap()) } } + ComponentInstalled(c, h, t) => match t { + Some(t) if t != h => write!(f, "component '{c}' for '{t}' installed"), + _ => write!(f, "component '{c}' installed"), + }, RemovingComponent(c, h, t) => { if Some(h) == t.as_ref() || t.is_none() { write!(f, "removing component '{c}'") diff --git a/src/settings.rs b/src/settings.rs index 44305d727a..8d6cf1c3c3 100644 --- a/src/settings.rs +++ b/src/settings.rs @@ -1,8 +1,8 @@ -use std::cell::RefCell; use std::collections::BTreeMap; use std::fmt; use std::path::{Path, PathBuf}; use std::str::FromStr; +use std::sync::RwLock; use anyhow::{Context, Result}; use serde::{Deserialize, Serialize}; @@ -13,22 +13,22 @@ use crate::errors::*; use crate::notifications::*; use crate::utils; -#[derive(Clone, Debug, Eq, PartialEq)] +#[derive(Debug)] pub struct SettingsFile { path: PathBuf, - cache: RefCell>, + cache: RwLock>, } impl SettingsFile { pub(crate) fn new(path: PathBuf) -> Self { Self { path, - cache: RefCell::new(None), + cache: RwLock::default(), } } fn write_settings(&self) -> Result<()> { - let settings = self.cache.borrow(); + let settings = self.cache.read().unwrap(); utils::write_file( "settings", &self.path, @@ -40,10 +40,10 @@ impl SettingsFile { fn read_settings(&self) -> Result<()> { let mut needs_save = false; { - let b = self.cache.borrow(); + let b = self.cache.read().unwrap(); if b.is_none() { drop(b); - *self.cache.borrow_mut() = Some(if utils::is_file(&self.path) { + *self.cache.write().unwrap() = Some(if utils::is_file(&self.path) { let content = utils::read_file("settings", &self.path)?; Settings::parse(&content).with_context(|| RustupError::ParsingFile { name: "settings", @@ -65,14 +65,17 @@ impl SettingsFile { self.read_settings()?; // Settings can no longer be None so it's OK to unwrap - f(self.cache.borrow().as_ref().unwrap()) + f(self.cache.read().unwrap().as_ref().unwrap()) } pub(crate) fn with_mut Result>(&self, f: F) -> Result { self.read_settings()?; // Settings can no longer be None so it's OK to unwrap - let result = { f(self.cache.borrow_mut().as_mut().unwrap())? }; + let result = { + let mut result = self.cache.write().unwrap(); + f(result.as_mut().unwrap())? + }; self.write_settings()?; Ok(result) } diff --git a/src/test/dist.rs b/src/test/dist.rs index f5164c3d23..e6c5e6b146 100644 --- a/src/test/dist.rs +++ b/src/test/dist.rs @@ -5,7 +5,7 @@ use std::collections::HashMap; use std::fs::{self, File}; use std::io::{self, Read, Write}; use std::path::{Path, PathBuf}; -use std::sync::{LazyLock, Mutex}; +use std::sync::{Arc, LazyLock, Mutex}; use url::Url; @@ -52,26 +52,26 @@ impl DistContext { cx: temp::Context::new( tmp_dir.path().to_owned(), DEFAULT_DIST_SERVER, - Box::new(|_| ()), + Arc::new(|_| ()), ), tp: TestProcess::default(), _tmp_dir: tmp_dir, }) } - pub fn start(&self) -> anyhow::Result<(Transaction<'_>, Components, DirectoryPackage)> { + pub fn start(&self) -> anyhow::Result<(Transaction, Components, DirectoryPackage)> { let tx = self.transaction(); let components = Components::open(self.prefix.clone())?; let pkg = DirectoryPackage::new(self.pkg_dir.path().to_owned(), true)?; Ok((tx, components, pkg)) } - pub fn transaction(&self) -> Transaction<'_> { + pub fn transaction(&self) -> Transaction { Transaction::new( self.prefix.clone(), - &self.cx, - &|_: Notification<'_>| (), - &self.tp.process, + Arc::new(self.cx.clone()), + Arc::new(|_: Notification<'_>| ()), + Arc::new(self.tp.process.clone()), ) } } diff --git a/src/toolchain/distributable.rs b/src/toolchain/distributable.rs index 6d91312e26..6cc27a10d9 100644 --- a/src/toolchain/distributable.rs +++ b/src/toolchain/distributable.rs @@ -1,6 +1,9 @@ #[cfg(windows)] use std::fs; -use std::{convert::Infallible, env::consts::EXE_SUFFIX, ffi::OsStr, path::Path, process::Command}; +use std::{ + convert::Infallible, env::consts::EXE_SUFFIX, ffi::OsStr, path::Path, process::Command, + sync::Arc, +}; #[cfg(windows)] use anyhow::Context; @@ -18,6 +21,7 @@ use crate::{ prefix::InstallPrefix, }, install::{InstallMethod, UpdateStatus}, + notifications::Notification, }; use super::{ @@ -109,16 +113,16 @@ impl<'a> DistributableToolchain<'a> { remove_components: vec![], }; - let download_cfg = self - .toolchain - .cfg - .download_cfg(&*self.toolchain.cfg.notify_handler); - manifestation + let notify_handler = Arc::clone(&self.toolchain.cfg.notify_handler); + let notify_handler = move |n: Notification<'_>| (notify_handler)(n); + let download_cfg = self.toolchain.cfg.download_cfg(Arc::new(notify_handler)); + + Arc::new(manifestation) .update( - &manifest, + Arc::new(manifest), changes, false, - &download_cfg, + download_cfg, &self.desc.manifest_name(), false, ) @@ -350,12 +354,14 @@ impl<'a> DistributableToolchain<'a> { let hash_path = cfg.get_hash_file(toolchain, true)?; let update_hash = Some(&hash_path as &Path); + let notify_handler = Arc::clone(&cfg.notify_handler); + let notify_handler = move |n: Notification<'_>| (notify_handler)(n); let status = InstallMethod::Dist(DistOptions { cfg, toolchain, profile, update_hash, - dl_cfg: cfg.download_cfg(&|n| (cfg.notify_handler)(n)), + dl_cfg: cfg.download_cfg(Arc::new(notify_handler)), force, allow_downgrade: false, exists: false, @@ -408,12 +414,14 @@ impl<'a> DistributableToolchain<'a> { let hash_path = cfg.get_hash_file(&self.desc, true)?; let update_hash = Some(&hash_path as &Path); + let notify_handler = Arc::clone(&self.toolchain.cfg.notify_handler); + let notify_handler = move |n: Notification<'_>| (notify_handler)(n); InstallMethod::Dist(DistOptions { cfg, toolchain: &self.desc, profile, update_hash, - dl_cfg: cfg.download_cfg(&|n| (cfg.notify_handler)(n)), + dl_cfg: self.toolchain.cfg.download_cfg(Arc::new(notify_handler)), force, allow_downgrade, exists: true, @@ -509,16 +517,16 @@ impl<'a> DistributableToolchain<'a> { remove_components: vec![component], }; - let download_cfg = self - .toolchain - .cfg - .download_cfg(&*self.toolchain.cfg.notify_handler); - manifestation + let notify_handler = Arc::clone(&self.toolchain.cfg.notify_handler); + let notify_handler = move |n: Notification<'_>| (notify_handler)(n); + let download_cfg = self.toolchain.cfg.download_cfg(Arc::new(notify_handler)); + + Arc::new(manifestation) .update( - &manifest, + Arc::new(manifest), changes, false, - &download_cfg, + download_cfg, &self.desc.manifest_name(), false, ) @@ -529,10 +537,9 @@ impl<'a> DistributableToolchain<'a> { pub async fn show_dist_version(&self) -> anyhow::Result> { let update_hash = self.toolchain.cfg.get_hash_file(&self.desc, false)?; - let download_cfg = self - .toolchain - .cfg - .download_cfg(&*self.toolchain.cfg.notify_handler); + let notify_handler = Arc::clone(&self.toolchain.cfg.notify_handler); + let notify_handler = move |n: Notification<'_>| (notify_handler)(n); + let download_cfg = self.toolchain.cfg.download_cfg(Arc::new(notify_handler)); match crate::dist::dl_v2_manifest(download_cfg, Some(&update_hash), &self.desc).await? { Some((manifest, _)) => Ok(Some(manifest.get_rust_version()?.to_string())), diff --git a/tests/suite/cli_rustup.rs b/tests/suite/cli_rustup.rs index 407585a1d1..25e94493fe 100644 --- a/tests/suite/cli_rustup.rs +++ b/tests/suite/cli_rustup.rs @@ -35,7 +35,6 @@ async fn rustup_stable() { .with_stderr(snapbox::str![[r#" info: syncing channel updates for 'stable-[HOST_TRIPLE]' info: latest update on 2015-01-02, rust version 1.1.0 (hash-stable-1.1.0) -info: downloading component[..] ... info: cleaning up downloads & tmp directories @@ -131,15 +130,12 @@ async fn rustup_all_channels() { .with_stderr(snapbox::str![[r#" info: syncing channel updates for 'stable-[HOST_TRIPLE]' info: latest update on 2015-01-02, rust version 1.1.0 (hash-stable-1.1.0) -info: downloading component[..] ... info: syncing channel updates for 'beta-[HOST_TRIPLE]' info: latest update on 2015-01-02, rust version 1.2.0 (hash-beta-1.2.0) -info: downloading component[..] ... info: syncing channel updates for 'nightly-[HOST_TRIPLE]' info: latest update on 2015-01-02, rust version 1.3.0 (hash-nightly-2) -info: downloading component[..] ... info: cleaning up downloads & tmp directories @@ -208,12 +204,10 @@ async fn rustup_some_channels_up_to_date() { .with_stderr(snapbox::str![[r#" info: syncing channel updates for 'stable-[HOST_TRIPLE]' info: latest update on 2015-01-02, rust version 1.1.0 (hash-stable-1.1.0) -info: downloading component[..] ... info: syncing channel updates for 'beta-[HOST_TRIPLE]' info: syncing channel updates for 'nightly-[HOST_TRIPLE]' info: latest update on 2015-01-02, rust version 1.3.0 (hash-nightly-2) -info: downloading component[..] ... info: cleaning up downloads & tmp directories diff --git a/tests/suite/dist_install.rs b/tests/suite/dist_install.rs index 00ff32165f..26381a27da 100644 --- a/tests/suite/dist_install.rs +++ b/tests/suite/dist_install.rs @@ -1,5 +1,6 @@ use std::fs::File; use std::io::Write; +use std::sync::Arc; use rustup::dist::component::Components; use rustup::dist::component::Transaction; @@ -171,7 +172,12 @@ fn uninstall() { // Now uninstall let notify = |_: Notification<'_>| (); - let mut tx = Transaction::new(cx.prefix.clone(), &cx.cx, ¬ify, &cx.tp.process); + let mut tx = Transaction::new( + cx.prefix.clone(), + Arc::new(cx.cx), + Arc::new(notify), + Arc::new(cx.tp.process.clone()), + ); for component in components.list().unwrap() { tx = component.uninstall(tx, &cx.tp.process).unwrap(); }