Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 85 additions & 57 deletions src/dist/manifestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ mod tests;

use std::path::Path;

use anyhow::{Context, Result, anyhow, bail};
use anyhow::{Context, Error, Result, anyhow, bail};
use futures_util::stream::StreamExt;
use std::sync::Arc;
use tokio::sync::Semaphore;
use tokio::sync::{Semaphore, mpsc};
use tracing::info;
use url::Url;

Expand Down Expand Up @@ -154,8 +154,7 @@ impl Manifestation {
let altered = tmp_cx.dist_server != DEFAULT_DIST_SERVER;

// Download component packages and validate hashes
let mut things_to_install = Vec::new();
let mut things_downloaded = Vec::new();
let mut things_downloaded: Vec<String> = Vec::new();
let components = update.components_urls_and_hashes(new_manifest)?;
let components_len = components.len();

Expand All @@ -173,49 +172,7 @@ impl Manifestation {
.and_then(|s| s.parse().ok())
.unwrap_or(DEFAULT_MAX_RETRIES);

info!("downloading component(s)");
for bin in &components {
(download_cfg.notify_handler)(Notification::DownloadingComponent(
&bin.component.short_name(new_manifest),
&self.target_triple,
bin.component.target.as_ref(),
&bin.binary.url,
));
}

let semaphore = Arc::new(Semaphore::new(concurrent_downloads));
let component_stream = tokio_stream::iter(components.into_iter()).map(|bin| {
let sem = semaphore.clone();
async move {
let _permit = sem.acquire().await.unwrap();
let url = if altered {
utils::parse_url(
&bin.binary
.url
.replace(DEFAULT_DIST_SERVER, tmp_cx.dist_server.as_str()),
)?
} else {
utils::parse_url(&bin.binary.url)?
};

bin.download(&url, download_cfg, max_retries, new_manifest)
.await
.map(|downloaded| (bin, downloaded))
}
});
if components_len > 0 {
let results = component_stream
.buffered(components_len)
.collect::<Vec<_>>()
.await;
for result in results {
let (bin, downloaded_file) = result?;
things_downloaded.push(bin.binary.hash.clone());
things_to_install.push((bin, downloaded_file));
}
}

// Begin transaction
// Begin transaction before the downloads, as installations are interleaved with those
let mut tx = Transaction::new(
prefix.clone(),
tmp_cx,
Expand All @@ -227,6 +184,16 @@ impl Manifestation {
// to uninstall it first.
tx = self.maybe_handle_v2_upgrade(&config, tx, download_cfg.process)?;

info!("downloading component(s)");
for bin in &components {
(download_cfg.notify_handler)(Notification::DownloadingComponent(
&bin.component.short_name(new_manifest),
&self.target_triple,
bin.component.target.as_ref(),
&bin.binary.url,
));
}

// Uninstall components
for component in &update.components_to_uninstall {
let notification = if implicit_modify {
Expand All @@ -249,16 +216,77 @@ impl Manifestation {
)?;
}

// Install components
for (component_bin, installer_file) in things_to_install {
tx = self.install_component(
component_bin,
installer_file,
tmp_cx,
download_cfg,
new_manifest,
tx,
)?;
if components_len > 0 {
// Create a channel to communicate whenever a download is done and the component can be installed
// The `mpsc` channel was used as we need to send many messages from one producer (download's thread) to one consumer (install's thread)
// This is recommended in the official docs: https://docs.rs/tokio/latest/tokio/sync/index.html#mpsc-channel
Comment on lines +221 to +223
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't find it now, but I thought I wrote a comment on wondering why we're using a channel here, when we could instead compose the futures that download and install together.

Copy link
Member

@rami3l rami3l Sep 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm would you mind clarifying a little more what you meant by composing the futures?

The current constraints as we previously agreed was that the downloads are concurrent but the installations happening alongside them aren't concurrent within themselves, so I think a channel would be ideal for that case in favor of, say, a mutex.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you're saying, we only do one installation at a time? If that's the case, I would suggest something more like a FuturesUnordered containing all the downloads, and then pulling from that to run the installations.

let total_components = components.len();
let (download_tx, mut download_rx) =
mpsc::channel::<Result<(ComponentBinary<'_>, File)>>(total_components);

let semaphore = Arc::new(Semaphore::new(concurrent_downloads));
let component_stream = tokio_stream::iter(components.into_iter()).map(|bin| {
let sem = semaphore.clone();
let download_tx = download_tx.clone();
async move {
let _permit = sem.acquire().await.unwrap();
let url = if altered {
utils::parse_url(
&bin.binary
.url
.replace(DEFAULT_DIST_SERVER, tmp_cx.dist_server.as_str()),
)?
} else {
utils::parse_url(&bin.binary.url)?
};

let installer_file = bin
.download(&url, download_cfg, max_retries, new_manifest)
.await?;
let hash = bin.binary.hash.clone();
let _ = download_tx.send(Ok((bin, installer_file))).await;
Ok(hash)
}
});

let mut stream = component_stream.buffered(components_len);
let download_handle = async {
let mut hashes = Vec::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest writing this as a named function.

while let Some(result) = stream.next().await {
match result {
Ok(hash) => {
hashes.push(hash);
}
Err(e) => {
let _ = download_tx.send(Err(e)).await;
}
}
}
hashes
};
let install_handle = async {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest writing this as a named function.

let mut current_tx = tx;
let mut counter = 0;
while counter < total_components
&& let Some(message) = download_rx.recv().await
{
let (component_bin, installer_file) = message?;
current_tx = self.install_component(
component_bin,
installer_file,
tmp_cx,
download_cfg,
new_manifest,
current_tx,
)?;
counter += 1;
}
Ok::<_, Error>(current_tx)
};

let (download_results, install_result) = tokio::join!(download_handle, install_handle);
things_downloaded = download_results;
tx = install_result?;
}

// Install new distribution manifest
Expand Down
6 changes: 0 additions & 6 deletions tests/suite/cli_rustup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ async fn rustup_stable() {
.with_stderr(snapbox::str![[r#"
info: syncing channel updates for 'stable-[HOST_TRIPLE]'
info: latest update on 2015-01-02, rust version 1.1.0 (hash-stable-1.1.0)
info: downloading component[..]
...
info: cleaning up downloads & tmp directories

Expand Down Expand Up @@ -131,15 +130,12 @@ async fn rustup_all_channels() {
.with_stderr(snapbox::str![[r#"
info: syncing channel updates for 'stable-[HOST_TRIPLE]'
info: latest update on 2015-01-02, rust version 1.1.0 (hash-stable-1.1.0)
info: downloading component[..]
...
info: syncing channel updates for 'beta-[HOST_TRIPLE]'
info: latest update on 2015-01-02, rust version 1.2.0 (hash-beta-1.2.0)
info: downloading component[..]
...
info: syncing channel updates for 'nightly-[HOST_TRIPLE]'
info: latest update on 2015-01-02, rust version 1.3.0 (hash-nightly-2)
info: downloading component[..]
...
info: cleaning up downloads & tmp directories

Expand Down Expand Up @@ -208,12 +204,10 @@ async fn rustup_some_channels_up_to_date() {
.with_stderr(snapbox::str![[r#"
info: syncing channel updates for 'stable-[HOST_TRIPLE]'
info: latest update on 2015-01-02, rust version 1.1.0 (hash-stable-1.1.0)
info: downloading component[..]
...
info: syncing channel updates for 'beta-[HOST_TRIPLE]'
info: syncing channel updates for 'nightly-[HOST_TRIPLE]'
info: latest update on 2015-01-02, rust version 1.3.0 (hash-nightly-2)
info: downloading component[..]
...
info: cleaning up downloads & tmp directories

Expand Down