Skip to content

Commit 2ad06c8

Browse files
feat(downloads): interleave the downloads with their installations
Even though downloads are done concurrently, the installations are done sequentially. This means that, as downloads complete, they are in a queue (an mpsc channel) waiting to be consumed by the future responsible for the (sequential) installations. There was a need to relax some test cases to allow for uninstall to happen before the downloads.
1 parent 9ff199a commit 2ad06c8

File tree

2 files changed

+85
-63
lines changed

2 files changed

+85
-63
lines changed

src/dist/manifestation.rs

Lines changed: 85 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ mod tests;
66

77
use std::path::Path;
88

9-
use anyhow::{Context, Result, anyhow, bail};
9+
use anyhow::{Context, Error, Result, anyhow, bail};
1010
use futures_util::stream::StreamExt;
1111
use std::sync::Arc;
12-
use tokio::sync::Semaphore;
12+
use tokio::sync::{Semaphore, mpsc};
1313
use tracing::info;
1414
use url::Url;
1515

@@ -154,8 +154,7 @@ impl Manifestation {
154154
let altered = tmp_cx.dist_server != DEFAULT_DIST_SERVER;
155155

156156
// Download component packages and validate hashes
157-
let mut things_to_install = Vec::new();
158-
let mut things_downloaded = Vec::new();
157+
let mut things_downloaded: Vec<String> = Vec::new();
159158
let components = update.components_urls_and_hashes(new_manifest)?;
160159
let components_len = components.len();
161160

@@ -173,49 +172,7 @@ impl Manifestation {
173172
.and_then(|s| s.parse().ok())
174173
.unwrap_or(DEFAULT_MAX_RETRIES);
175174

176-
info!("downloading component(s)");
177-
for bin in &components {
178-
(download_cfg.notify_handler)(Notification::DownloadingComponent(
179-
&bin.component.short_name(new_manifest),
180-
&self.target_triple,
181-
bin.component.target.as_ref(),
182-
&bin.binary.url,
183-
));
184-
}
185-
186-
let semaphore = Arc::new(Semaphore::new(concurrent_downloads));
187-
let component_stream = tokio_stream::iter(components.into_iter()).map(|bin| {
188-
let sem = semaphore.clone();
189-
async move {
190-
let _permit = sem.acquire().await.unwrap();
191-
let url = if altered {
192-
utils::parse_url(
193-
&bin.binary
194-
.url
195-
.replace(DEFAULT_DIST_SERVER, tmp_cx.dist_server.as_str()),
196-
)?
197-
} else {
198-
utils::parse_url(&bin.binary.url)?
199-
};
200-
201-
bin.download(&url, download_cfg, max_retries, new_manifest)
202-
.await
203-
.map(|downloaded| (bin, downloaded))
204-
}
205-
});
206-
if components_len > 0 {
207-
let results = component_stream
208-
.buffered(components_len)
209-
.collect::<Vec<_>>()
210-
.await;
211-
for result in results {
212-
let (bin, downloaded_file) = result?;
213-
things_downloaded.push(bin.binary.hash.clone());
214-
things_to_install.push((bin, downloaded_file));
215-
}
216-
}
217-
218-
// Begin transaction
175+
// Begin transaction before the downloads, as installations are interleaved with those
219176
let mut tx = Transaction::new(
220177
prefix.clone(),
221178
tmp_cx,
@@ -227,6 +184,16 @@ impl Manifestation {
227184
// to uninstall it first.
228185
tx = self.maybe_handle_v2_upgrade(&config, tx, download_cfg.process)?;
229186

187+
info!("downloading component(s)");
188+
for bin in &components {
189+
(download_cfg.notify_handler)(Notification::DownloadingComponent(
190+
&bin.component.short_name(new_manifest),
191+
&self.target_triple,
192+
bin.component.target.as_ref(),
193+
&bin.binary.url,
194+
));
195+
}
196+
230197
// Uninstall components
231198
for component in &update.components_to_uninstall {
232199
let notification = if implicit_modify {
@@ -249,16 +216,77 @@ impl Manifestation {
249216
)?;
250217
}
251218

252-
// Install components
253-
for (component_bin, installer_file) in things_to_install {
254-
tx = self.install_component(
255-
component_bin,
256-
installer_file,
257-
tmp_cx,
258-
download_cfg,
259-
new_manifest,
260-
tx,
261-
)?;
219+
if components_len > 0 {
220+
// Create a channel to communicate whenever a download is done and the component can be installed
221+
// The `mpsc` channel was used as we need to send many messages from one producer (download's thread) to one consumer (install's thread)
222+
// This is recommended in the official docs: https://docs.rs/tokio/latest/tokio/sync/index.html#mpsc-channel
223+
let total_components = components.len();
224+
let (download_tx, mut download_rx) =
225+
mpsc::channel::<Result<(ComponentBinary<'_>, File)>>(total_components);
226+
227+
let semaphore = Arc::new(Semaphore::new(concurrent_downloads));
228+
let component_stream = tokio_stream::iter(components.into_iter()).map(|bin| {
229+
let sem = semaphore.clone();
230+
let download_tx = download_tx.clone();
231+
async move {
232+
let _permit = sem.acquire().await.unwrap();
233+
let url = if altered {
234+
utils::parse_url(
235+
&bin.binary
236+
.url
237+
.replace(DEFAULT_DIST_SERVER, tmp_cx.dist_server.as_str()),
238+
)?
239+
} else {
240+
utils::parse_url(&bin.binary.url)?
241+
};
242+
243+
let installer_file = bin
244+
.download(&url, download_cfg, max_retries, new_manifest)
245+
.await?;
246+
let hash = bin.binary.hash.clone();
247+
let _ = download_tx.send(Ok((bin, installer_file))).await;
248+
Ok(hash)
249+
}
250+
});
251+
252+
let mut stream = component_stream.buffered(components_len);
253+
let download_handle = async {
254+
let mut hashes = Vec::new();
255+
while let Some(result) = stream.next().await {
256+
match result {
257+
Ok(hash) => {
258+
hashes.push(hash);
259+
}
260+
Err(e) => {
261+
let _ = download_tx.send(Err(e)).await;
262+
}
263+
}
264+
}
265+
hashes
266+
};
267+
let install_handle = async {
268+
let mut current_tx = tx;
269+
let mut counter = 0;
270+
while counter < total_components
271+
&& let Some(message) = download_rx.recv().await
272+
{
273+
let (component_bin, installer_file) = message?;
274+
current_tx = self.install_component(
275+
component_bin,
276+
installer_file,
277+
tmp_cx,
278+
download_cfg,
279+
new_manifest,
280+
current_tx,
281+
)?;
282+
counter += 1;
283+
}
284+
Ok::<_, Error>(current_tx)
285+
};
286+
287+
let (download_results, install_result) = tokio::join!(download_handle, install_handle);
288+
things_downloaded = download_results;
289+
tx = install_result?;
262290
}
263291

264292
// Install new distribution manifest

tests/suite/cli_rustup.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ async fn rustup_stable() {
3535
.with_stderr(snapbox::str![[r#"
3636
info: syncing channel updates for 'stable-[HOST_TRIPLE]'
3737
info: latest update on 2015-01-02, rust version 1.1.0 (hash-stable-1.1.0)
38-
info: downloading component[..]
3938
...
4039
info: cleaning up downloads & tmp directories
4140
@@ -131,15 +130,12 @@ async fn rustup_all_channels() {
131130
.with_stderr(snapbox::str![[r#"
132131
info: syncing channel updates for 'stable-[HOST_TRIPLE]'
133132
info: latest update on 2015-01-02, rust version 1.1.0 (hash-stable-1.1.0)
134-
info: downloading component[..]
135133
...
136134
info: syncing channel updates for 'beta-[HOST_TRIPLE]'
137135
info: latest update on 2015-01-02, rust version 1.2.0 (hash-beta-1.2.0)
138-
info: downloading component[..]
139136
...
140137
info: syncing channel updates for 'nightly-[HOST_TRIPLE]'
141138
info: latest update on 2015-01-02, rust version 1.3.0 (hash-nightly-2)
142-
info: downloading component[..]
143139
...
144140
info: cleaning up downloads & tmp directories
145141
@@ -208,12 +204,10 @@ async fn rustup_some_channels_up_to_date() {
208204
.with_stderr(snapbox::str![[r#"
209205
info: syncing channel updates for 'stable-[HOST_TRIPLE]'
210206
info: latest update on 2015-01-02, rust version 1.1.0 (hash-stable-1.1.0)
211-
info: downloading component[..]
212207
...
213208
info: syncing channel updates for 'beta-[HOST_TRIPLE]'
214209
info: syncing channel updates for 'nightly-[HOST_TRIPLE]'
215210
info: latest update on 2015-01-02, rust version 1.3.0 (hash-nightly-2)
216-
info: downloading component[..]
217211
...
218212
info: cleaning up downloads & tmp directories
219213

0 commit comments

Comments
 (0)