Skip to content

Commit a2222e0

Browse files
authored
Merge pull request #1183 from input-output-hk/ensemble/1115/download-unpack-snapshots-simultaneously
Download unpack snapshots simultaneously
2 parents e79b1b1 + 7fbf329 commit a2222e0

File tree

8 files changed

+152
-92
lines changed

8 files changed

+152
-92
lines changed

Cargo.lock

Lines changed: 26 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

mithril-client/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "mithril-client"
3-
version = "0.3.36"
3+
version = "0.3.37"
44
description = "A Mithril Client"
55
authors = { workspace = true }
66
edition = { workspace = true }
@@ -19,6 +19,7 @@ cli-table = "0.4.7"
1919
config = "0.13.3"
2020
directories = "5.0.1"
2121
flate2 = "1.0.27"
22+
flume = "0.11.0"
2223
fs2 = "0.4.3"
2324
futures = "0.3.28"
2425
hex = "0.4.3"

mithril-client/src/aggregator_client/http_client.rs

Lines changed: 45 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use anyhow::anyhow;
12
use async_recursion::async_recursion;
23
use async_trait::async_trait;
34
use futures::StreamExt;
@@ -6,14 +7,14 @@ use semver::Version;
67
use slog_scope::debug;
78
use std::{path::Path, sync::Arc};
89
use thiserror::Error;
9-
use tokio::{fs, io::AsyncWriteExt, sync::RwLock};
10+
use tokio::sync::RwLock;
1011

1112
#[cfg(test)]
1213
use mockall::automock;
1314

14-
use mithril_common::{StdError, MITHRIL_API_VERSION_HEADER};
15+
use mithril_common::{StdError, StdResult, MITHRIL_API_VERSION_HEADER};
1516

16-
use crate::utils::DownloadProgressReporter;
17+
use crate::utils::{DownloadProgressReporter, SnapshotUnpacker};
1718

1819
/// Error tied with the Aggregator client
1920
#[derive(Error, Debug)]
@@ -51,11 +52,11 @@ pub trait AggregatorClient: Sync + Send {
5152
/// Get the content back from the Aggregator, the URL is a relative path for a resource
5253
async fn get_content(&self, url: &str) -> Result<String, AggregatorHTTPClientError>;
5354

54-
/// Download large files on the disk
55-
async fn download(
55+
/// Download and unpack large archives on the disk
56+
async fn download_unpack(
5657
&self,
5758
url: &str,
58-
filepath: &Path,
59+
target_dir: &Path,
5960
progress_reporter: DownloadProgressReporter,
6061
) -> Result<(), AggregatorHTTPClientError>;
6162

@@ -179,45 +180,65 @@ impl AggregatorClient for AggregatorHTTPClient {
179180
})
180181
}
181182

182-
async fn download(
183+
async fn download_unpack(
183184
&self,
184185
url: &str,
185-
filepath: &Path,
186+
target_dir: &Path,
186187
progress_reporter: DownloadProgressReporter,
187188
) -> Result<(), AggregatorHTTPClientError> {
188-
let response = self.get(url).await?;
189-
let mut local_file = fs::File::create(filepath).await.map_err(|e| {
190-
AggregatorHTTPClientError::SubsystemError {
191-
message: format!(
192-
"Download: could not create download archive '{}'.",
193-
filepath.display()
189+
if !target_dir.is_dir() {
190+
Err(AggregatorHTTPClientError::SubsystemError {
191+
message: "Download-Unpack: prerequisite error".to_string(),
192+
error: anyhow!(
193+
"target path is not a directory or does not exist: `{}`",
194+
target_dir.display()
194195
),
195-
error: e.into(),
196-
}
197-
})?;
196+
})?;
197+
}
198+
198199
let mut downloaded_bytes: u64 = 0;
199-
let mut remote_stream = response.bytes_stream();
200+
let mut remote_stream = self.get(url).await?.bytes_stream();
201+
let (sender, receiver) = flume::bounded(5);
202+
203+
let dest_dir = target_dir.to_path_buf();
204+
let unpack_thread = tokio::task::spawn_blocking(move || -> StdResult<()> {
205+
let unpacker = SnapshotUnpacker;
206+
unpacker.unpack_snapshot(receiver, &dest_dir)
207+
});
200208

201209
while let Some(item) = remote_stream.next().await {
202210
let chunk = item.map_err(|e| {
203211
AggregatorHTTPClientError::RemoteServerTechnical(format!(
204212
"Download: Could not read from byte stream: {e}"
205213
))
206214
})?;
207-
local_file.write_all(&chunk).await.map_err(|e| {
215+
216+
sender.send_async(chunk.to_vec()).await.map_err(|e| {
208217
AggregatorHTTPClientError::SubsystemError {
209-
message: format!(
210-
"Download: could not write {} bytes to file '{}'.",
211-
chunk.len(),
212-
filepath.display()
213-
),
218+
message: format!("Download: could not write {} bytes to stream.", chunk.len()),
214219
error: e.into(),
215220
}
216221
})?;
222+
217223
downloaded_bytes += chunk.len() as u64;
218224
progress_reporter.report(downloaded_bytes);
219225
}
220226

227+
drop(sender); // Signal EOF
228+
unpack_thread
229+
.await
230+
.map_err(|join_error| AggregatorHTTPClientError::SubsystemError {
231+
message: format!(
232+
"Unpack: panic while unpacking to dir '{}'",
233+
target_dir.display()
234+
),
235+
error: join_error.into(),
236+
})?
237+
.map_err(|unpack_error| AggregatorHTTPClientError::SubsystemError {
238+
message: format!("Unpack: could not unpack to dir '{}'", target_dir.display()),
239+
error: unpack_error,
240+
})?;
241+
221242
Ok(())
222243
}
223244

mithril-client/src/aggregator_client/snapshot_client.rs

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
//! This module contains a struct to exchange snapshot information with the Aggregator
22
33
use slog_scope::warn;
4-
use std::{
5-
path::{Path, PathBuf},
6-
sync::Arc,
7-
};
4+
use std::{path::Path, sync::Arc};
85
use thiserror::Error;
96

107
use mithril_common::{
@@ -59,28 +56,24 @@ impl SnapshotClient {
5956
Ok(message)
6057
}
6158

62-
/// Download the snapshot identified by the given snapshot in the given directory
63-
pub async fn download(
59+
/// Download and unpack the given snapshot to the given directory
60+
pub async fn download_unpack(
6461
&self,
6562
snapshot: &Snapshot,
66-
download_dir: &Path,
63+
target_dir: &Path,
6764
progress_reporter: DownloadProgressReporter,
68-
) -> StdResult<PathBuf> {
69-
let filepath = PathBuf::new()
70-
.join(download_dir)
71-
.join(format!("snapshot-{}.tar.gz", snapshot.digest));
72-
65+
) -> StdResult<()> {
7366
for url in snapshot.locations.as_slice() {
7467
if self.http_client.probe(url).await.is_ok() {
75-
match self
68+
return match self
7669
.http_client
77-
.download(url, &filepath, progress_reporter)
70+
.download_unpack(url, target_dir, progress_reporter)
7871
.await
7972
{
80-
Ok(()) => return Ok(filepath),
73+
Ok(()) => Ok(()),
8174
Err(e) => {
8275
warn!("Failed downloading snapshot from '{url}' Error: {e}.");
83-
return Err(e.into());
76+
Err(e.into())
8477
}
8578
};
8679
}

mithril-client/src/services/snapshot.rs

Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,10 @@ pub enum SnapshotServiceError {
3939

4040
/// Error raised when the certificate verification failed for the downloaded
4141
/// archive.
42-
#[error("Certificate verification failed (snapshot digest = '{digest}'). The archive has been downloaded in '{path}'.")]
42+
#[error("Certificate verification failed (snapshot digest = '{digest}').")]
4343
CouldNotVerifySnapshot {
4444
/// snapshot digest
4545
digest: String,
46-
47-
/// The path of the downloaded archive
48-
path: PathBuf,
4946
},
5047

5148
/// The given certificate could not be found, contains the certificate hash
@@ -218,14 +215,21 @@ impl SnapshotService for MithrilClientSnapshotService {
218215
debug!("Snapshot service: download.");
219216

220217
let db_dir = download_dir.join("db");
221-
let progress_bar = ProgressPrinter::new(progress_output_type, 7);
218+
let progress_bar = ProgressPrinter::new(progress_output_type, 6);
222219
progress_bar.report_step(1, "Checking local disk info…")?;
223220
let unpacker = SnapshotUnpacker;
224221

225222
if let Err(e) = unpacker.check_prerequisites(&db_dir, snapshot_entity.artifact.size) {
226223
self.check_disk_space_error(e)?;
227224
}
228225

226+
std::fs::create_dir_all(&db_dir).with_context(|| {
227+
format!(
228+
"Download: could not create target directory '{}'.",
229+
db_dir.display()
230+
)
231+
})?;
232+
229233
progress_bar.report_step(2, "Fetching the certificate's information…")?;
230234
let certificate = self
231235
.certificate_client
@@ -241,26 +245,21 @@ impl SnapshotService for MithrilClientSnapshotService {
241245
let verifier = self.verify_certificate_chain(genesis_verification_key, &certificate);
242246
self.wait_spinner(&progress_bar, verifier).await?;
243247

244-
progress_bar.report_step(4, "Downloading the snapshot…")?;
248+
progress_bar.report_step(4, "Downloading and unpacking the snapshot…")?;
245249
let pb = progress_bar.add(ProgressBar::new(snapshot_entity.artifact.size));
246250
pb.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})")
247251
.unwrap()
248252
.with_key("eta", |state: &ProgressState, w: &mut dyn Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap())
249253
.progress_chars("#>-"));
250-
let snapshot_path = self
251-
.snapshot_client
252-
.download(
254+
self.snapshot_client
255+
.download_unpack(
253256
&snapshot_entity.artifact,
254-
download_dir,
257+
&db_dir,
255258
DownloadProgressReporter::new(pb, progress_output_type),
256259
)
257260
.await
258261
.with_context(|| format!("Could not download file in '{}'", download_dir.display()))?;
259262

260-
progress_bar.report_step(5, "Unpacking the snapshot…")?;
261-
let unpacker = unpacker.unpack_snapshot(&snapshot_path, &db_dir);
262-
self.wait_spinner(&progress_bar, unpacker).await?;
263-
264263
// Append 'clean' file to speedup node bootstrap
265264
if let Err(error) = File::create(db_dir.join("clean")) {
266265
warn!(
@@ -269,14 +268,14 @@ impl SnapshotService for MithrilClientSnapshotService {
269268
);
270269
};
271270

272-
progress_bar.report_step(6, "Computing the snapshot digest…")?;
271+
progress_bar.report_step(5, "Computing the snapshot digest…")?;
273272
let unpacked_snapshot_digest = self
274273
.immutable_digester
275274
.compute_digest(&db_dir, &certificate.beacon)
276275
.await
277276
.with_context(|| format!("Could not compute digest in '{}'", db_dir.display()))?;
278277

279-
progress_bar.report_step(7, "Verifying the snapshot signature…")?;
278+
progress_bar.report_step(6, "Verifying the snapshot signature…")?;
280279
let expected_message = {
281280
let mut protocol_message = certificate.protocol_message.clone();
282281
protocol_message.set_message_part(
@@ -295,7 +294,6 @@ impl SnapshotService for MithrilClientSnapshotService {
295294

296295
return Err(SnapshotServiceError::CouldNotVerifySnapshot {
297296
digest: snapshot_entity.artifact.digest.clone(),
298-
path: snapshot_path.canonicalize().unwrap(),
299297
}
300298
.into());
301299
}
@@ -400,7 +398,7 @@ mod tests {
400398
let mut http_client = MockAggregatorHTTPClient::new();
401399
http_client.expect_probe().returning(|_| Ok(()));
402400
http_client
403-
.expect_download()
401+
.expect_download_unpack()
404402
.returning(move |_, _, _| Ok(()))
405403
.times(1);
406404
http_client.expect_get_content().returning(|_| {
@@ -650,10 +648,7 @@ mod tests {
650648

651649
if let Some(e) = err.downcast_ref::<SnapshotServiceError>() {
652650
match e {
653-
SnapshotServiceError::CouldNotVerifySnapshot {
654-
digest,
655-
path: _path,
656-
} => {
651+
SnapshotServiceError::CouldNotVerifySnapshot { digest } => {
657652
assert_eq!("digest-10", digest.as_str());
658653
}
659654
_ => panic!("Wrong error type when snapshot could not be verified."),

mithril-client/src/utils/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
//! This module contains tools needed mostly in services layers.
33
44
mod progress_reporter;
5+
mod stream_reader;
56
mod unpacker;
67

78
pub use progress_reporter::*;
9+
pub use stream_reader::*;
810
pub use unpacker::*;

0 commit comments

Comments
 (0)