Skip to content

Commit 646e490

Browse files
committed
refactor(client): batch download now include the ancillary file
1 parent d8bec33 commit 646e490

File tree

1 file changed

+82
-75
lines changed

1 file changed

+82
-75
lines changed

mithril-client/src/cardano_database_client/download_unpack.rs

Lines changed: 82 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ impl InternalArtifactDownloader {
126126
.immutables
127127
.average_size_uncompressed;
128128

129-
let tasks = VecDeque::from(self.build_download_tasks_for_immutables(
129+
let mut tasks = VecDeque::from(self.build_download_tasks_for_immutables(
130130
immutable_locations,
131131
immutable_file_number_range,
132132
target_dir,
@@ -135,13 +135,12 @@ impl InternalArtifactDownloader {
135135
)?);
136136
if download_unpack_options.include_ancillary {
137137
let ancillary = &cardano_database_snapshot.ancillary;
138-
self.download_unpack_ancillary_file(
138+
tasks.push_back(self.new_ancillary_download_task(
139139
&ancillary.locations,
140-
ancillary.size_uncompressed,
141140
target_dir,
141+
ancillary.size_uncompressed,
142142
&download_id,
143-
)
144-
.await?;
143+
)?);
145144
}
146145
self.batch_download_unpack(tasks, download_unpack_options.max_parallel_downloads)
147146
.await?;
@@ -290,6 +289,50 @@ impl InternalArtifactDownloader {
290289
})
291290
}
292291

292+
fn new_ancillary_download_task(
293+
&self,
294+
locations: &[AncillaryLocation],
295+
ancillary_file_target_dir: &Path,
296+
size_uncompressed: u64,
297+
download_id: &str,
298+
) -> MithrilResult<DownloadTask> {
299+
let mut locations_to_try = vec![];
300+
let mut locations_sorted = locations.to_owned();
301+
locations_sorted.sort();
302+
for location in locations_sorted {
303+
let location_to_try = match location {
304+
AncillaryLocation::CloudStorage {
305+
uri,
306+
compression_algorithm,
307+
} => {
308+
let file_downloader = self.http_file_downloader.clone();
309+
let file_downloader_uri = FileDownloaderUri::from(uri);
310+
311+
LocationToDownload {
312+
file_downloader,
313+
file_downloader_uri,
314+
compression_algorithm: compression_algorithm.to_owned(),
315+
}
316+
}
317+
AncillaryLocation::Unknown => {
318+
return Err(anyhow!("Unknown location type to download immutable"));
319+
}
320+
};
321+
322+
locations_to_try.push(location_to_try);
323+
}
324+
325+
Ok(DownloadTask {
326+
name: "ancillary".to_string(),
327+
locations_to_try,
328+
target_dir: ancillary_file_target_dir.to_path_buf(),
329+
size_uncompressed,
330+
download_event: DownloadEvent::Ancillary {
331+
download_id: download_id.to_string(),
332+
},
333+
})
334+
}
335+
293336
/// Download and unpack the files in parallel.
294337
async fn batch_download_unpack(
295338
&self,
@@ -355,56 +398,6 @@ impl InternalArtifactDownloader {
355398

356399
Box::pin(download_future)
357400
}
358-
359-
/// Download and unpack the ancillary files.
360-
pub(crate) async fn download_unpack_ancillary_file(
361-
&self,
362-
locations: &[AncillaryLocation],
363-
file_size: u64,
364-
ancillary_file_target_dir: &Path,
365-
download_id: &str,
366-
) -> MithrilResult<()> {
367-
let mut locations_sorted = locations.to_owned();
368-
locations_sorted.sort();
369-
for location in locations_sorted {
370-
let (file_downloader, compression_algorithm) = match &location {
371-
AncillaryLocation::CloudStorage {
372-
uri: _,
373-
compression_algorithm,
374-
} => (self.http_file_downloader.clone(), *compression_algorithm),
375-
AncillaryLocation::Unknown => {
376-
continue;
377-
}
378-
};
379-
let file_downloader_uri = location.try_into()?;
380-
let downloaded = file_downloader
381-
.download_unpack(
382-
&file_downloader_uri,
383-
file_size,
384-
ancillary_file_target_dir,
385-
compression_algorithm,
386-
DownloadEvent::Ancillary {
387-
download_id: download_id.to_string(),
388-
},
389-
)
390-
.await;
391-
match downloaded {
392-
Ok(_) => {
393-
return Ok(());
394-
}
395-
Err(e) => {
396-
slog::error!(
397-
self.logger,
398-
"Failed downloading and unpacking ancillaries for location {file_downloader_uri:?}"; "error" => ?e
399-
);
400-
}
401-
}
402-
}
403-
404-
Err(anyhow!(
405-
"Failed downloading and unpacking ancillaries for all locations"
406-
))
407-
}
408401
}
409402

410403
#[cfg(test)]
@@ -992,38 +985,44 @@ mod tests {
992985
test_utils::test_logger(),
993986
);
994987

995-
artifact_downloader
996-
.download_unpack_ancillary_file(
988+
let task = artifact_downloader
989+
.new_ancillary_download_task(
997990
&[AncillaryLocation::CloudStorage {
998991
uri: "http://whatever-1/ancillary.tar.gz".to_string(),
999992
compression_algorithm: Some(CompressionAlgorithm::default()),
1000993
}],
1001-
0,
1002994
target_dir,
995+
0,
1003996
"download_id",
1004997
)
998+
.unwrap();
999+
1000+
artifact_downloader
1001+
.batch_download_unpack(vec![task].into(), 1)
10051002
.await
1006-
.expect_err("download_unpack_ancillary_file should fail");
1003+
.expect_err("batch_download_unpack of ancillary file should fail");
10071004
}
10081005

10091006
#[tokio::test]
1010-
async fn download_unpack_ancillary_files_fails_if_location_is_unknown() {
1007+
async fn building_ancillary_download_task_fails_if_location_is_unknown() {
10111008
let target_dir = Path::new(".");
10121009
let artifact_downloader = InternalArtifactDownloader::new(
10131010
Arc::new(MockFileDownloader::new()),
10141011
FeedbackSender::new(&[]),
10151012
test_utils::test_logger(),
10161013
);
10171014

1018-
artifact_downloader
1019-
.download_unpack_ancillary_file(
1020-
&[AncillaryLocation::Unknown {}],
1021-
0,
1022-
target_dir,
1023-
"download_id",
1024-
)
1025-
.await
1026-
.expect_err("download_unpack_ancillary_file should fail");
1015+
let build_tasks_result = artifact_downloader.new_ancillary_download_task(
1016+
&[AncillaryLocation::Unknown {}],
1017+
target_dir,
1018+
0,
1019+
"download_id",
1020+
);
1021+
1022+
assert!(
1023+
build_tasks_result.is_err(),
1024+
"building tasks should fail if a location is unknown"
1025+
);
10271026
}
10281027

10291028
#[tokio::test]
@@ -1045,8 +1044,8 @@ mod tests {
10451044
test_utils::test_logger(),
10461045
);
10471046

1048-
artifact_downloader
1049-
.download_unpack_ancillary_file(
1047+
let task = artifact_downloader
1048+
.new_ancillary_download_task(
10501049
&[
10511050
AncillaryLocation::CloudStorage {
10521051
uri: "http://whatever-1/ancillary.tar.gz".to_string(),
@@ -1057,10 +1056,14 @@ mod tests {
10571056
compression_algorithm: Some(CompressionAlgorithm::default()),
10581057
},
10591058
],
1060-
0,
10611059
target_dir,
1060+
0,
10621061
"download_id",
10631062
)
1063+
.unwrap();
1064+
1065+
artifact_downloader
1066+
.batch_download_unpack(vec![task].into(), 1)
10641067
.await
10651068
.unwrap();
10661069
}
@@ -1080,8 +1083,8 @@ mod tests {
10801083
test_utils::test_logger(),
10811084
);
10821085

1083-
artifact_downloader
1084-
.download_unpack_ancillary_file(
1086+
let task = artifact_downloader
1087+
.new_ancillary_download_task(
10851088
&[
10861089
AncillaryLocation::CloudStorage {
10871090
uri: "http://whatever-1/ancillary.tar.gz".to_string(),
@@ -1092,10 +1095,14 @@ mod tests {
10921095
compression_algorithm: Some(CompressionAlgorithm::default()),
10931096
},
10941097
],
1095-
0,
10961098
target_dir,
1099+
0,
10971100
"download_id",
10981101
)
1102+
.unwrap();
1103+
1104+
artifact_downloader
1105+
.batch_download_unpack(vec![task].into(), 1)
10991106
.await
11001107
.unwrap();
11011108
}

0 commit comments

Comments
 (0)