Skip to content

Commit 25ed794

Browse files
feat(downloads): interleave the downloads with their installations
Even though downloads are done concurrently, the installations are done sequentially. This means that, as downloads complete, they are in a queue (an mpsc channel) waiting to be consumed by the future responsible for the (sequential) installations. There was a need to relax some test cases to allow for uninstall to happen before the downloads.
1 parent 3154bf1 commit 25ed794

File tree

2 files changed

+90
-65
lines changed

2 files changed

+90
-65
lines changed

src/dist/manifestation.rs

Lines changed: 90 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ mod tests;
66

77
use std::path::Path;
88

9-
use anyhow::{Context, Result, anyhow, bail};
9+
use anyhow::{Context, Error, Result, anyhow, bail};
1010
use futures_util::stream::StreamExt;
1111
use std::sync::Arc;
12-
use tokio::sync::Semaphore;
12+
use tokio::sync::{Semaphore, mpsc};
1313
use tracing::info;
1414

1515
use crate::dist::component::{
@@ -153,7 +153,6 @@ impl Manifestation {
153153
let altered = tmp_cx.dist_server != DEFAULT_DIST_SERVER;
154154

155155
// Download component packages and validate hashes
156-
let mut things_to_install: Vec<(Component, CompressionKind, File)> = Vec::new();
157156
let mut things_downloaded: Vec<String> = Vec::new();
158157
let components = update.components_urls_and_hashes(new_manifest)?;
159158
let components_len = components.len();
@@ -172,49 +171,7 @@ impl Manifestation {
172171
.and_then(|s| s.parse().ok())
173172
.unwrap_or(DEFAULT_MAX_RETRIES);
174173

175-
info!("downloading component(s)");
176-
for (component, _, url, _) in components.clone() {
177-
(download_cfg.notify_handler)(Notification::DownloadingComponent(
178-
&component.short_name(new_manifest),
179-
&self.target_triple,
180-
component.target.as_ref(),
181-
&url,
182-
));
183-
}
184-
185-
let semaphore = Arc::new(Semaphore::new(concurrent_downloads));
186-
let component_stream =
187-
tokio_stream::iter(components.into_iter()).map(|(component, format, url, hash)| {
188-
let sem = semaphore.clone();
189-
async move {
190-
let _permit = sem.acquire().await.unwrap();
191-
self.download_component(
192-
component,
193-
format,
194-
url,
195-
hash,
196-
altered,
197-
tmp_cx,
198-
download_cfg,
199-
max_retries,
200-
new_manifest,
201-
)
202-
.await
203-
}
204-
});
205-
if components_len > 0 {
206-
let results = component_stream
207-
.buffered(components_len)
208-
.collect::<Vec<_>>()
209-
.await;
210-
for result in results {
211-
let (component, format, downloaded_file, hash) = result?;
212-
things_downloaded.push(hash);
213-
things_to_install.push((component, format, downloaded_file));
214-
}
215-
}
216-
217-
// Begin transaction
174+
// Begin transaction before the downloads, as installations are interleaved with those
218175
let mut tx = Transaction::new(
219176
prefix.clone(),
220177
tmp_cx,
@@ -226,6 +183,16 @@ impl Manifestation {
226183
// to uninstall it first.
227184
tx = self.maybe_handle_v2_upgrade(&config, tx, download_cfg.process)?;
228185

186+
info!("downloading component(s)");
187+
for (component, _, url, _) in components.clone() {
188+
(download_cfg.notify_handler)(Notification::DownloadingComponent(
189+
&component.short_name(new_manifest),
190+
&self.target_triple,
191+
component.target.as_ref(),
192+
&url,
193+
));
194+
}
195+
229196
// Uninstall components
230197
for component in &update.components_to_uninstall {
231198
let notification = if implicit_modify {
@@ -248,17 +215,76 @@ impl Manifestation {
248215
)?;
249216
}
250217

251-
// Install components
252-
for (component, format, installer_file) in things_to_install {
253-
tx = self.install_component(
254-
component,
255-
format,
256-
installer_file,
257-
tmp_cx,
258-
download_cfg,
259-
new_manifest,
260-
tx,
261-
)?;
218+
if components_len > 0 {
219+
// Create a channel to communicate whenever a download is done and the component can be installed
220+
// The `mpsc` channel was used as we need to send many messages from one producer (download's thread) to one consumer (install's thread)
221+
// This is recommended in the official docs: https://docs.rs/tokio/latest/tokio/sync/index.html#mpsc-channel
222+
let total_components = components.len();
223+
let (download_tx, mut download_rx) =
224+
mpsc::channel::<Result<(Component, CompressionKind, File)>>(total_components);
225+
226+
let semaphore = Arc::new(Semaphore::new(concurrent_downloads));
227+
let component_stream =
228+
tokio_stream::iter(components.into_iter()).map(|(component, format, url, hash)| {
229+
let sem = semaphore.clone();
230+
let download_tx_cloned = download_tx.clone();
231+
async move {
232+
let _permit = sem.acquire().await.unwrap();
233+
self.download_component(
234+
component,
235+
format,
236+
url,
237+
hash,
238+
altered,
239+
tmp_cx,
240+
download_cfg,
241+
max_retries,
242+
new_manifest,
243+
download_tx_cloned,
244+
)
245+
.await
246+
}
247+
});
248+
249+
let mut stream = component_stream.buffered(components_len);
250+
let download_handle = async {
251+
let mut hashes = Vec::new();
252+
while let Some(result) = stream.next().await {
253+
match result {
254+
Ok(hash) => {
255+
hashes.push(hash);
256+
}
257+
Err(e) => {
258+
let _ = download_tx.send(Err(e)).await;
259+
}
260+
}
261+
}
262+
hashes
263+
};
264+
let install_handle = async {
265+
let mut current_tx = tx;
266+
let mut counter = 0;
267+
while counter < total_components
268+
&& let Some(message) = download_rx.recv().await
269+
{
270+
let (component, format, installer_file) = message?;
271+
current_tx = self.install_component(
272+
component.clone(),
273+
format,
274+
installer_file,
275+
tmp_cx,
276+
download_cfg,
277+
new_manifest,
278+
current_tx,
279+
)?;
280+
counter += 1;
281+
}
282+
Ok::<_, Error>(current_tx)
283+
};
284+
285+
let (download_results, install_result) = tokio::join!(download_handle, install_handle);
286+
things_downloaded = download_results;
287+
tx = install_result?;
262288
}
263289

264290
// Install new distribution manifest
@@ -510,7 +536,8 @@ impl Manifestation {
510536
download_cfg: &DownloadCfg<'_>,
511537
max_retries: usize,
512538
new_manifest: &Manifest,
513-
) -> Result<(Component, CompressionKind, File, String)> {
539+
notification_tx: mpsc::Sender<Result<(Component, CompressionKind, File)>>,
540+
) -> Result<String> {
514541
use tokio_retry::{RetryIf, strategy::FixedInterval};
515542

516543
let url = if altered {
@@ -539,9 +566,13 @@ impl Manifestation {
539566
.await
540567
.with_context(|| RustupError::ComponentDownloadFailed(component.name(new_manifest)))?;
541568

542-
Ok((component, format, downloaded_file, hash))
569+
let _ = notification_tx
570+
.send(Ok((component.clone(), format, downloaded_file)))
571+
.await;
572+
Ok(hash)
543573
}
544574

575+
#[allow(clippy::too_many_arguments)]
545576
fn install_component<'a>(
546577
&self,
547578
component: Component,

tests/suite/cli_rustup.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ async fn rustup_stable() {
3535
.with_stderr(snapbox::str![[r#"
3636
info: syncing channel updates for 'stable-[HOST_TRIPLE]'
3737
info: latest update on 2015-01-02, rust version 1.1.0 (hash-stable-1.1.0)
38-
info: downloading component[..]
3938
...
4039
info: cleaning up downloads & tmp directories
4140
@@ -131,15 +130,12 @@ async fn rustup_all_channels() {
131130
.with_stderr(snapbox::str![[r#"
132131
info: syncing channel updates for 'stable-[HOST_TRIPLE]'
133132
info: latest update on 2015-01-02, rust version 1.1.0 (hash-stable-1.1.0)
134-
info: downloading component[..]
135133
...
136134
info: syncing channel updates for 'beta-[HOST_TRIPLE]'
137135
info: latest update on 2015-01-02, rust version 1.2.0 (hash-beta-1.2.0)
138-
info: downloading component[..]
139136
...
140137
info: syncing channel updates for 'nightly-[HOST_TRIPLE]'
141138
info: latest update on 2015-01-02, rust version 1.3.0 (hash-nightly-2)
142-
info: downloading component[..]
143139
...
144140
info: cleaning up downloads & tmp directories
145141
@@ -208,12 +204,10 @@ async fn rustup_some_channels_up_to_date() {
208204
.with_stderr(snapbox::str![[r#"
209205
info: syncing channel updates for 'stable-[HOST_TRIPLE]'
210206
info: latest update on 2015-01-02, rust version 1.1.0 (hash-stable-1.1.0)
211-
info: downloading component[..]
212207
...
213208
info: syncing channel updates for 'beta-[HOST_TRIPLE]'
214209
info: syncing channel updates for 'nightly-[HOST_TRIPLE]'
215210
info: latest update on 2015-01-02, rust version 1.3.0 (hash-nightly-2)
216-
info: downloading component[..]
217211
...
218212
info: cleaning up downloads & tmp directories
219213

0 commit comments

Comments
 (0)