Skip to content

Commit e5aad49

Browse files
committed
feat: make sure mithril package files are stable
Signed-off-by: jeluard <jeluard@users.noreply.github.com>
1 parent acc113f commit e5aad49

File tree

1 file changed

+234
-22
lines changed

1 file changed

+234
-22
lines changed

crates/amaru/src/bin/ledger/cmd/mithril.rs

Lines changed: 234 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,20 @@
1313
// limitations under the License.
1414

1515
use std::{
16-
collections::BTreeMap,
16+
collections::{BTreeMap, BTreeSet},
1717
fmt,
1818
fs::{self, File},
1919
io::{self, Cursor, Write},
2020
path::PathBuf,
2121
sync::Arc,
22-
time::{SystemTime, UNIX_EPOCH},
2322
};
2423

2524
use amaru::{default_data_dir, default_ledger_dir};
26-
use amaru_kernel::{Hasher, NetworkName, cbor};
25+
use amaru_kernel::{Hasher, NetworkName, Point, cbor};
2726
use amaru_network::point::to_network_point;
2827
use async_trait::async_trait;
2928
use clap::Parser;
30-
use flate2::{Compression, write::GzEncoder};
29+
use flate2::{Compression, GzBuilder};
3130
use indicatif::{MultiProgress, ProgressBar, ProgressState, ProgressStyle};
3231
use mithril_client::{
3332
ClientBuilder, MessageBuilder,
@@ -37,10 +36,12 @@ use mithril_client::{
3736
use pallas_hardano::storage::immutable::read_blocks_from_point;
3837
use tar::{Builder, Header};
3938
use tokio::sync::RwLock;
40-
use tracing::info;
39+
use tracing::{info, warn};
4140

4241
use crate::cmd::new_block_validator;
4342

43+
const BLOCKS_PER_ARCHIVE: usize = 20000;
44+
4445
#[derive(Debug, Parser)]
4546
pub struct Args {
4647
/// The target network to choose from.
@@ -161,15 +162,27 @@ impl FeedbackReceiver for IndicatifFeedbackReceiver {
161162

162163
#[allow(clippy::expect_used)]
163164
fn package_blocks(network: &NetworkName, blocks: &BTreeMap<String, &Vec<u8>>) -> io::Result<String> {
164-
let encoder = GzEncoder::new(Vec::new(), Compression::default());
165+
let compressed = build_archive_bytes(blocks)?;
166+
167+
let dir = blocks_dir(*network);
168+
fs::create_dir_all(&dir)?;
169+
let archive_path = archive_path_for_blocks(network, blocks).expect("blocks map is non-empty here by construction");
170+
let mut file = File::create(&archive_path)?;
171+
file.write_all(&compressed)?;
172+
173+
Ok(archive_path)
174+
}
175+
176+
fn build_archive_bytes(blocks: &BTreeMap<String, &Vec<u8>>) -> io::Result<Vec<u8>> {
177+
let encoder = GzBuilder::new().mtime(0).write(Vec::new(), Compression::default());
165178
let mut tar = Builder::new(encoder);
166179

167180
for (name, data) in blocks {
168181
let mut header = Header::new_gnu();
169182
header.set_size(data.len() as u64);
170183
header.set_mode(0o644);
171184
header.set_entry_type(tar::EntryType::Regular);
172-
header.set_mtime(SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs());
185+
header.set_mtime(0);
173186
header.set_uid(0);
174187
header.set_gid(0);
175188
header.set_cksum();
@@ -178,18 +191,95 @@ fn package_blocks(network: &NetworkName, blocks: &BTreeMap<String, &Vec<u8>>) ->
178191
}
179192

180193
let encoder = tar.into_inner()?;
181-
let compressed = encoder.finish()?;
194+
encoder.finish()
195+
}
182196

183-
let dir = format!("{}/blocks", default_data_dir(*network));
184-
fs::create_dir_all(&dir)?;
185-
let first_block =
186-
blocks.first_key_value().map(|kv| kv.0).cloned().expect("blocks map is non-empty here by construction");
187-
let archive_path = format!("{}/{}.tar.gz", dir, first_block);
188-
let mut f = File::create(&archive_path)?;
197+
fn blocks_dir(network: NetworkName) -> String {
198+
format!("{}/blocks", default_data_dir(network))
199+
}
189200

190-
f.write_all(&compressed)?;
201+
fn archive_name_for_blocks(blocks: &BTreeMap<String, &Vec<u8>>) -> Option<String> {
202+
let (first_block, _) = blocks.first_key_value()?;
203+
let (last_block, _) = blocks.last_key_value()?;
204+
let first_point = first_block.strip_suffix(".cbor")?;
205+
let last_point = last_block.strip_suffix(".cbor")?;
191206

192-
Ok(archive_path)
207+
Some(format!("{first_point}__{last_point}.tar.gz"))
208+
}
209+
210+
fn archive_path_for_blocks(network: &NetworkName, blocks: &BTreeMap<String, &Vec<u8>>) -> Option<String> {
211+
archive_name_for_blocks(blocks).map(|archive_name| format!("{}/{}", blocks_dir(*network), archive_name))
212+
}
213+
214+
fn list_existing_archives(network: NetworkName) -> Result<BTreeSet<String>, io::Error> {
215+
let dir = PathBuf::from(blocks_dir(network));
216+
if !dir.try_exists()? {
217+
return Ok(BTreeSet::new());
218+
}
219+
220+
Ok(fs::read_dir(dir)?
221+
.filter_map(Result::ok)
222+
.filter_map(|entry| entry.file_name().into_string().ok())
223+
.filter(|name| name.ends_with(".tar.gz"))
224+
.collect())
225+
}
226+
227+
fn parse_archive_point(name: &str) -> Option<Point> {
228+
Point::try_from(name).ok()
229+
}
230+
231+
#[derive(Debug, Clone, PartialEq, Eq)]
232+
struct ArchiveMetadata {
233+
file_name: String,
234+
first_block: Point,
235+
last_block: Point,
236+
}
237+
238+
fn parse_archive_metadata(archive_name: &str) -> Option<ArchiveMetadata> {
239+
let archive_name = archive_name.strip_suffix(".tar.gz")?;
240+
let (first_block, last_block) = archive_name.split_once("__")?;
241+
242+
Some(ArchiveMetadata {
243+
file_name: format!("{archive_name}.tar.gz"),
244+
first_block: parse_archive_point(first_block)?,
245+
last_block: parse_archive_point(last_block)?,
246+
})
247+
}
248+
249+
#[cfg(test)]
250+
fn parse_archive_bounds(archive_name: &str) -> Option<(Point, Point)> {
251+
let metadata = parse_archive_metadata(archive_name)?;
252+
253+
Some((metadata.first_block, metadata.last_block))
254+
}
255+
256+
#[cfg(test)]
257+
fn latest_archive_end_point<'a>(archives: impl IntoIterator<Item = &'a String>) -> Option<Point> {
258+
archives
259+
.into_iter()
260+
.filter_map(|archive_name| parse_archive_metadata(archive_name))
261+
.map(|archive| archive.last_block)
262+
.max()
263+
}
264+
265+
fn sorted_archives<'a>(archives: impl IntoIterator<Item = &'a String>) -> Vec<ArchiveMetadata> {
266+
let mut parsed: Vec<_> =
267+
archives.into_iter().filter_map(|archive_name| parse_archive_metadata(archive_name)).collect();
268+
parsed.sort_by_key(|archive| archive.last_block);
269+
parsed
270+
}
271+
272+
fn latest_archive<'a>(archives: impl IntoIterator<Item = &'a String>) -> Option<ArchiveMetadata> {
273+
sorted_archives(archives).into_iter().last()
274+
}
275+
276+
fn resume_point_for_archives<'a>(tip: Point, archives: impl IntoIterator<Item = &'a String>) -> Point {
277+
let parsed = sorted_archives(archives);
278+
279+
match parsed.last() {
280+
Some(_) => parsed.iter().rev().nth(1).map(|archive| archive.last_block).unwrap_or(tip),
281+
None => tip,
282+
}
193283
}
194284

195285
/// Returns the latest chunk number present in the given immutable directory.
@@ -333,23 +423,27 @@ pub async fn run(args: Args) -> Result<(), Box<dyn std::error::Error>> {
333423

334424
let ledger = new_block_validator(network, ledger_dir)?;
335425
let tip = ledger.get_tip();
426+
let mut existing_archives = list_existing_archives(network)?;
427+
let tail_archive = latest_archive(&existing_archives);
336428

337429
// Determine the chunk to start from
430+
let resume_point = resume_point_for_archives(tip, &existing_archives);
431+
338432
let latest_chunk = get_latest_chunk(&immutable_dir)?;
339-
let from_chunk = latest_chunk.unwrap_or(infer_chunk_from_slot(tip.slot_or_default().into()) - 1);
433+
let from_chunk = latest_chunk.unwrap_or(infer_chunk_from_slot(resume_point.slot_or_default().into()) - 1);
340434

341435
info!(
342-
tip=%tip, from_chunk=%from_chunk,
436+
tip=%tip, resume_point=%resume_point, from_chunk=%from_chunk,
343437
"Downloading mithril immutable chunks"
344438
);
345439

346440
download_from_mithril(network, target_dir, from_chunk).await?;
347441

348442
info!("Packaging blocks into .tar.gz files");
349443

350-
// Read blocks from the immutable storage and package them into .tar.gz files
351-
const BLOCKS_PER_ARCHIVE: usize = 20000;
352-
let mut iter = read_blocks_from_point(&immutable_dir, to_network_point(tip))?.map_while(Result::ok).skip(1); // Exclude the tip itself
444+
// Read blocks from the immutable storage and package them into .tar.gz files.
445+
let mut iter =
446+
read_blocks_from_point(&immutable_dir, to_network_point(resume_point))?.map_while(Result::ok).skip(1); // Exclude the resume point itself
353447
loop {
354448
let chunk: Vec<_> = iter.by_ref().take(BLOCKS_PER_ARCHIVE).collect();
355449
if chunk.is_empty() {
@@ -363,11 +457,129 @@ pub async fn run(args: Args) -> Result<(), Box<dyn std::error::Error>> {
363457
Ok((name, cbor))
364458
})
365459
.collect::<Result<BTreeMap<String, &Vec<u8>>, cbor::decode::Error>>()?;
460+
461+
#[allow(clippy::expect_used)]
462+
let archive_name = archive_name_for_blocks(&map).expect("chunk map is non-empty here by construction");
463+
if let Some(tail_archive) = &tail_archive {
464+
let tail_first =
465+
format!("{}.{}.cbor", tail_archive.first_block.slot_or_default(), tail_archive.first_block.hash());
466+
let first_block = map.first_key_value().map(|(name, _)| name.as_str());
467+
468+
if first_block == Some(tail_first.as_str()) && archive_name == tail_archive.file_name {
469+
info!(archive = %archive_name, "Retaining existing tail archive");
470+
continue;
471+
}
472+
473+
if first_block == Some(tail_first.as_str()) && archive_name != tail_archive.file_name {
474+
let stale_archive_path = PathBuf::from(blocks_dir(network)).join(&tail_archive.file_name);
475+
fs::remove_file(&stale_archive_path)?;
476+
existing_archives.remove(&tail_archive.file_name);
477+
info!(archive = %tail_archive.file_name, replacement = %archive_name, "Replacing tail archive");
478+
}
479+
}
480+
if existing_archives.contains(&archive_name) {
481+
debug_assert!(false, "encountered an already archived non-tail batch: {}", archive_name);
482+
warn!(archive = %archive_name, "Encountered an already archived non-tail batch");
483+
continue;
484+
}
485+
366486
let dir = package_blocks(&network, &map)?;
367-
info!("Created {}", dir);
487+
existing_archives.insert(archive_name);
488+
489+
info!(blocks = map.len(), dir, "Created archive batch");
368490
}
369491

370492
info!("Done");
371493

372494
Ok(())
373495
}
496+
497+
#[cfg(test)]
498+
mod tests {
499+
use std::collections::BTreeMap;
500+
501+
use amaru_kernel::Point;
502+
503+
use super::{
504+
ArchiveMetadata, BLOCKS_PER_ARCHIVE, archive_name_for_blocks, build_archive_bytes, latest_archive,
505+
latest_archive_end_point, parse_archive_bounds, parse_archive_metadata, resume_point_for_archives,
506+
};
507+
508+
#[test]
509+
fn archive_name_includes_first_and_last_blocks() {
510+
let block_a = Vec::from([0x01_u8]);
511+
let block_b = Vec::from([0x02_u8]);
512+
let mut blocks = BTreeMap::new();
513+
514+
blocks.insert("10.aaaa.cbor".to_string(), &block_a);
515+
blocks.insert("20.bbbb.cbor".to_string(), &block_b);
516+
517+
assert_eq!(archive_name_for_blocks(&blocks), Some("10.aaaa__20.bbbb.tar.gz".to_string()));
518+
}
519+
520+
#[test]
521+
fn archive_name_is_absent_for_empty_batch() {
522+
let blocks: BTreeMap<String, &Vec<u8>> = BTreeMap::new();
523+
524+
assert_eq!(archive_name_for_blocks(&blocks), None);
525+
}
526+
527+
#[test]
528+
fn parse_archive_bounds_extracts_first_and_last_points() {
529+
let bounds = parse_archive_bounds(
530+
"10.aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa__20.bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb.tar.gz",
531+
);
532+
533+
assert_eq!(
534+
bounds,
535+
Some((
536+
Point::try_from("10.aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap(),
537+
Point::try_from("20.bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb").unwrap(),
538+
))
539+
);
540+
}
541+
542+
#[test]
543+
fn latest_archive_end_point_uses_last_block_boundary() {
544+
let archives = vec![
545+
"10.aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa__20.bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb.tar.gz".to_string(),
546+
"21.cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc__30.dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd.tar.gz".to_string(),
547+
];
548+
549+
assert_eq!(
550+
latest_archive_end_point(&archives),
551+
Some(Point::try_from("30.dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd").unwrap())
552+
);
553+
}
554+
555+
#[test]
556+
fn latest_archive_picks_last_archive() {
557+
let archives = vec![
558+
"10.aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa__20.bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb.tar.gz".to_string(),
559+
"21.cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc__25.dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd.tar.gz".to_string(),
560+
];
561+
562+
assert_eq!(
563+
latest_archive(&archives),
564+
Some(ArchiveMetadata {
565+
file_name: "21.cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc__25.dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd.tar.gz".to_string(),
566+
first_block: Point::try_from("21.cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc").unwrap(),
567+
last_block: Point::try_from("25.dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd").unwrap(),
568+
})
569+
);
570+
}
571+
572+
#[test]
573+
fn resume_point_uses_previous_boundary_for_incomplete_tail() {
574+
let tip = Point::try_from("5.eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee").unwrap();
575+
let archives = vec![
576+
"10.aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa__20.bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb.tar.gz".to_string(),
577+
"21.cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc__25.dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd.tar.gz".to_string(),
578+
];
579+
580+
assert_eq!(
581+
resume_point_for_archives(tip, &archives),
582+
Point::try_from("20.bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb").unwrap()
583+
);
584+
}
585+
}

0 commit comments

Comments
 (0)