Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 128 additions & 83 deletions src/dist/manifestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ mod tests;

use std::path::Path;

use anyhow::{Context, Result, anyhow, bail};
use anyhow::{Context, Error, Result, anyhow, bail};
use futures_util::stream::StreamExt;
use std::sync::Arc;
use tokio::sync::Semaphore;
use tokio::sync::{Semaphore, mpsc};
use tracing::info;
use url::Url;

Expand Down Expand Up @@ -154,8 +154,7 @@ impl Manifestation {
let altered = tmp_cx.dist_server != DEFAULT_DIST_SERVER;

// Download component packages and validate hashes
let mut things_to_install = Vec::new();
let mut things_downloaded = Vec::new();
let mut things_downloaded: Vec<String> = Vec::new();
let components = update.components_urls_and_hashes(new_manifest)?;
let components_len = components.len();

Expand All @@ -173,49 +172,7 @@ impl Manifestation {
.and_then(|s| s.parse().ok())
.unwrap_or(DEFAULT_MAX_RETRIES);

info!("downloading component(s)");
for bin in &components {
(download_cfg.notify_handler)(Notification::DownloadingComponent(
&bin.component.short_name(new_manifest),
&self.target_triple,
bin.component.target.as_ref(),
&bin.binary.url,
));
}

let semaphore = Arc::new(Semaphore::new(concurrent_downloads));
let component_stream = tokio_stream::iter(components.into_iter()).map(|bin| {
let sem = semaphore.clone();
async move {
let _permit = sem.acquire().await.unwrap();
let url = if altered {
utils::parse_url(
&bin.binary
.url
.replace(DEFAULT_DIST_SERVER, tmp_cx.dist_server.as_str()),
)?
} else {
utils::parse_url(&bin.binary.url)?
};

bin.download(&url, download_cfg, max_retries, new_manifest)
.await
.map(|downloaded| (bin, downloaded))
}
});
if components_len > 0 {
let results = component_stream
.buffered(components_len)
.collect::<Vec<_>>()
.await;
for result in results {
let (bin, downloaded_file) = result?;
things_downloaded.push(bin.binary.hash.clone());
things_to_install.push((bin.component, bin.binary.compression, downloaded_file));
}
}

// Begin transaction
// Begin transaction before the downloads, as installations are interleaved with those
let mut tx = Transaction::new(
prefix.clone(),
tmp_cx,
Expand All @@ -227,6 +184,16 @@ impl Manifestation {
// to uninstall it first.
tx = self.maybe_handle_v2_upgrade(&config, tx, download_cfg.process)?;

info!("downloading component(s)");
for bin in &components {
(download_cfg.notify_handler)(Notification::DownloadingComponent(
&bin.component.short_name(new_manifest),
&self.target_triple,
bin.component.target.as_ref(),
&bin.binary.url,
));
}

// Uninstall components
for component in &update.components_to_uninstall {
let notification = if implicit_modify {
Expand All @@ -249,45 +216,77 @@ impl Manifestation {
)?;
}

// Install components
for (component, format, installer_file) in things_to_install {
// For historical reasons, the rust-installer component
// names are not the same as the dist manifest component
// names. Some are just the component name some are the
// component name plus the target triple.
let pkg_name = component.name_in_manifest();
let short_pkg_name = component.short_name_in_manifest();
let short_name = component.short_name(new_manifest);

(download_cfg.notify_handler)(Notification::InstallingComponent(
&short_name,
&self.target_triple,
component.target.as_ref(),
));
if components_len > 0 {
// Create a channel to communicate whenever a download is done and the component can be installed
// The `mpsc` channel was used as we need to send many messages from one producer (download's thread) to one consumer (install's thread)
// This is recommended in the official docs: https://docs.rs/tokio/latest/tokio/sync/index.html#mpsc-channel
Comment on lines +221 to +223
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't find it now, but I thought I wrote a comment on wondering why we're using a channel here, when we could instead compose the futures that download and install together.

Copy link
Member

@rami3l rami3l Sep 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm would you mind clarifying a little more what you meant by composing the futures?

The current constraints as we previously agreed was that the downloads are concurrent but the installations happening alongside them aren't concurrent within themselves, so I think a channel would be ideal for that case in favor of, say, a mutex.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you're saying, we only do one installation at a time? If that's the case, I would suggest something more like a FuturesUnordered containing all the downloads, and then pulling from that to run the installations.

let total_components = components.len();
let (download_tx, mut download_rx) =
mpsc::channel::<Result<(ComponentBinary<'_>, File)>>(total_components);

let semaphore = Arc::new(Semaphore::new(concurrent_downloads));
let component_stream = tokio_stream::iter(components.into_iter()).map(|bin| {
let sem = semaphore.clone();
let download_tx = download_tx.clone();
async move {
let _permit = sem.acquire().await.unwrap();
let url = if altered {
utils::parse_url(
&bin.binary
.url
.replace(DEFAULT_DIST_SERVER, tmp_cx.dist_server.as_str()),
)?
} else {
utils::parse_url(&bin.binary.url)?
};

let installer_file = bin
.download(&url, download_cfg, max_retries, new_manifest)
.await?;
let hash = bin.binary.hash.clone();
let _ = download_tx.send(Ok((bin, installer_file))).await;
Ok(hash)
}
});

let cx = PackageContext {
tmp_cx,
notify_handler: Some(download_cfg.notify_handler),
process: download_cfg.process,
let mut stream = component_stream.buffered(components_len);
let download_handle = async {
let mut hashes = Vec::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest writing this as a named function.

while let Some(result) = stream.next().await {
match result {
Ok(hash) => {
hashes.push(hash);
}
Err(e) => {
let _ = download_tx.send(Err(e)).await;
}
}
}
hashes
};

let reader = utils::FileReaderWithProgress::new_file(
&installer_file,
download_cfg.notify_handler,
)?;
let package = match format {
CompressionKind::GZip => &TarGzPackage::new(reader, &cx)? as &dyn Package,
CompressionKind::XZ => &TarXzPackage::new(reader, &cx)?,
CompressionKind::ZStd => &TarZStdPackage::new(reader, &cx)?,
let install_handle = async {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest writing this as a named function.

let mut current_tx = tx;
let mut counter = 0;
while counter < total_components
&& let Some(message) = download_rx.recv().await
{
let (component_bin, installer_file) = message?;
current_tx = self.install_component(
component_bin,
installer_file,
tmp_cx,
download_cfg,
new_manifest,
current_tx,
)?;
counter += 1;
}
Ok::<_, Error>(current_tx)
};

// If the package doesn't contain the component that the
// manifest says it does then somebody must be playing a joke on us.
if !package.contains(&pkg_name, Some(short_pkg_name)) {
return Err(RustupError::CorruptComponent(short_name).into());
}

tx = package.install(&self.installation, &pkg_name, Some(short_pkg_name), tx)?;
let (download_results, install_result) = tokio::join!(download_handle, install_handle);
things_downloaded = download_results;
tx = install_result?;
}

// Install new distribution manifest
Expand All @@ -302,7 +301,7 @@ impl Manifestation {
// `Components` *also* tracks what is installed, but it only tracks names, not
// name/target. Needs to be fixed in rust-installer.
let new_config = Config {
components: update.final_component_list,
components: update.final_component_list.clone(),
..Config::default()
};
let config_str = new_config.stringify()?;
Expand Down Expand Up @@ -526,6 +525,52 @@ impl Manifestation {

Ok(tx)
}

fn install_component<'a>(
&self,
component_bin: ComponentBinary<'a>,
installer_file: File,
tmp_cx: &temp::Context,
download_cfg: &DownloadCfg<'_>,
new_manifest: &Manifest,
tx: Transaction<'a>,
) -> Result<Transaction<'a>> {
// For historical reasons, the rust-installer component
// names are not the same as the dist manifest component
// names. Some are just the component name some are the
// component name plus the target triple.
let pkg_name = component_bin.component.name_in_manifest();
let short_pkg_name = component_bin.component.short_name_in_manifest();
let short_name = component_bin.component.short_name(new_manifest);

(download_cfg.notify_handler)(Notification::InstallingComponent(
&short_name,
&self.target_triple,
component_bin.component.target.as_ref(),
));

let cx = PackageContext {
tmp_cx,
notify_handler: Some(download_cfg.notify_handler),
process: download_cfg.process,
};

let reader =
utils::FileReaderWithProgress::new_file(&installer_file, download_cfg.notify_handler)?;
let package = match component_bin.binary.compression {
CompressionKind::GZip => &TarGzPackage::new(reader, &cx)? as &dyn Package,
CompressionKind::XZ => &TarXzPackage::new(reader, &cx)?,
CompressionKind::ZStd => &TarZStdPackage::new(reader, &cx)?,
};

// If the package doesn't contain the component that the
// manifest says it does then somebody must be playing a joke on us.
if !package.contains(&pkg_name, Some(short_pkg_name)) {
return Err(RustupError::CorruptComponent(short_name).into());
}

package.install(&self.installation, &pkg_name, Some(short_pkg_name), tx)
}
}

#[derive(Debug)]
Expand Down
6 changes: 0 additions & 6 deletions tests/suite/cli_rustup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ async fn rustup_stable() {
.with_stderr(snapbox::str![[r#"
info: syncing channel updates for 'stable-[HOST_TRIPLE]'
info: latest update on 2015-01-02, rust version 1.1.0 (hash-stable-1.1.0)
info: downloading component[..]
...
info: cleaning up downloads & tmp directories

Expand Down Expand Up @@ -131,15 +130,12 @@ async fn rustup_all_channels() {
.with_stderr(snapbox::str![[r#"
info: syncing channel updates for 'stable-[HOST_TRIPLE]'
info: latest update on 2015-01-02, rust version 1.1.0 (hash-stable-1.1.0)
info: downloading component[..]
...
info: syncing channel updates for 'beta-[HOST_TRIPLE]'
info: latest update on 2015-01-02, rust version 1.2.0 (hash-beta-1.2.0)
info: downloading component[..]
...
info: syncing channel updates for 'nightly-[HOST_TRIPLE]'
info: latest update on 2015-01-02, rust version 1.3.0 (hash-nightly-2)
info: downloading component[..]
...
info: cleaning up downloads & tmp directories

Expand Down Expand Up @@ -208,12 +204,10 @@ async fn rustup_some_channels_up_to_date() {
.with_stderr(snapbox::str![[r#"
info: syncing channel updates for 'stable-[HOST_TRIPLE]'
info: latest update on 2015-01-02, rust version 1.1.0 (hash-stable-1.1.0)
info: downloading component[..]
...
info: syncing channel updates for 'beta-[HOST_TRIPLE]'
info: syncing channel updates for 'nightly-[HOST_TRIPLE]'
info: latest update on 2015-01-02, rust version 1.3.0 (hash-nightly-2)
info: downloading component[..]
...
info: cleaning up downloads & tmp directories

Expand Down