diff --git a/crates/amaru/src/bin/ledger/cmd/mithril.rs b/crates/amaru/src/bin/ledger/cmd/mithril.rs index 28b7356c0..e28f2a8ba 100644 --- a/crates/amaru/src/bin/ledger/cmd/mithril.rs +++ b/crates/amaru/src/bin/ledger/cmd/mithril.rs @@ -13,21 +13,20 @@ // limitations under the License. use std::{ - collections::BTreeMap, + collections::{BTreeMap, BTreeSet}, fmt, fs::{self, File}, io::{self, Cursor, Write}, path::PathBuf, sync::Arc, - time::{SystemTime, UNIX_EPOCH}, }; use amaru::{default_data_dir, default_ledger_dir}; -use amaru_kernel::{Hasher, NetworkName, cbor}; +use amaru_kernel::{Hasher, NetworkName, Point, cbor}; use amaru_network::point::to_network_point; use async_trait::async_trait; use clap::Parser; -use flate2::{Compression, write::GzEncoder}; +use flate2::{Compression, GzBuilder}; use indicatif::{MultiProgress, ProgressBar, ProgressState, ProgressStyle}; use mithril_client::{ ClientBuilder, MessageBuilder, @@ -37,10 +36,12 @@ use mithril_client::{ use pallas_hardano::storage::immutable::read_blocks_from_point; use tar::{Builder, Header}; use tokio::sync::RwLock; -use tracing::info; +use tracing::{info, warn}; use crate::cmd::new_block_validator; +const BLOCKS_PER_ARCHIVE: usize = 20000; + #[derive(Debug, Parser)] pub struct Args { /// The target network to choose from. @@ -160,36 +161,124 @@ impl FeedbackReceiver for IndicatifFeedbackReceiver { } #[allow(clippy::expect_used)] -fn package_blocks(network: &NetworkName, blocks: &BTreeMap>) -> io::Result { - let encoder = GzEncoder::new(Vec::new(), Compression::default()); +fn package_blocks(network: &NetworkName, blocks: &BTreeMap>) -> io::Result { + let compressed = build_archive_bytes(blocks)?; + + let dir = blocks_dir(*network); + fs::create_dir_all(&dir)?; + let archive_path = archive_path_for_blocks(network, blocks).expect("blocks map is non-empty here by construction"); + let mut file = File::create(&archive_path)?; + file.write_all(&compressed)?; + + Ok(archive_path) +} + +fn block_file_name(point: &Point) -> String { + format!("{point}.cbor") +} + +fn build_archive_bytes(blocks: &BTreeMap>) -> io::Result> { + let encoder = GzBuilder::new().mtime(0).write(Vec::new(), Compression::default()); let mut tar = Builder::new(encoder); - for (name, data) in blocks { + for (point, data) in blocks { let mut header = Header::new_gnu(); header.set_size(data.len() as u64); header.set_mode(0o644); header.set_entry_type(tar::EntryType::Regular); - header.set_mtime(SystemTime::now().duration_since(UNIX_EPOCH).unwrap_or_default().as_secs()); + header.set_mtime(0); header.set_uid(0); header.set_gid(0); header.set_cksum(); - tar.append_data(&mut header, name, Cursor::new(data))?; + tar.append_data(&mut header, block_file_name(point), Cursor::new(data))?; } let encoder = tar.into_inner()?; - let compressed = encoder.finish()?; + encoder.finish() +} - let dir = format!("{}/blocks", default_data_dir(*network)); - fs::create_dir_all(&dir)?; - let first_block = - blocks.first_key_value().map(|kv| kv.0).cloned().expect("blocks map is non-empty here by construction"); - let archive_path = format!("{}/{}.tar.gz", dir, first_block); - let mut f = File::create(&archive_path)?; +fn blocks_dir(network: NetworkName) -> String { + format!("{}/blocks", default_data_dir(network)) +} - f.write_all(&compressed)?; +fn archive_name_for_blocks(blocks: &BTreeMap>) -> Option { + let (first_block, _) = blocks.first_key_value()?; + let (last_block, _) = blocks.last_key_value()?; - Ok(archive_path) + Some(format!("{first_block}__{last_block}.tar.gz")) +} + +fn archive_path_for_blocks(network: &NetworkName, blocks: &BTreeMap>) -> Option { + archive_name_for_blocks(blocks).map(|archive_name| format!("{}/{}", blocks_dir(*network), archive_name)) +} + +fn list_existing_archives(network: NetworkName) -> Result, io::Error> { + let dir = PathBuf::from(blocks_dir(network)); + if !dir.try_exists()? { + return Ok(BTreeSet::new()); + } + + Ok(fs::read_dir(dir)? + .filter_map(Result::ok) + .filter_map(|entry| entry.file_name().into_string().ok()) + .filter(|name| name.ends_with(".tar.gz")) + .collect()) +} + +fn parse_archive_point(name: &str) -> Option { + Point::try_from(name).ok() +} + +#[derive(Debug, Clone, PartialEq, Eq)] +struct ArchiveMetadata { + file_name: String, + first_block: Point, + last_block: Point, +} + +fn parse_archive_metadata(archive_name: &str) -> Option { + let archive_name = archive_name.strip_suffix(".tar.gz")?; + let (first_block, last_block) = archive_name.split_once("__")?; + + Some(ArchiveMetadata { + file_name: format!("{archive_name}.tar.gz"), + first_block: parse_archive_point(first_block)?, + last_block: parse_archive_point(last_block)?, + }) +} + +#[cfg(test)] +fn parse_archive_bounds(archive_name: &str) -> Option<(Point, Point)> { + let metadata = parse_archive_metadata(archive_name)?; + + Some((metadata.first_block, metadata.last_block)) +} + +#[cfg(test)] +fn latest_archive_end_point<'a>(archives: impl IntoIterator) -> Option { + archives + .into_iter() + .filter_map(|archive_name| parse_archive_metadata(archive_name)) + .map(|archive| archive.last_block) + .max() +} + +fn sorted_archives<'a>(archives: impl IntoIterator) -> Vec { + let mut parsed: Vec<_> = + archives.into_iter().filter_map(|archive_name| parse_archive_metadata(archive_name)).collect(); + parsed.sort_by_key(|archive| archive.last_block); + parsed +} + +fn latest_archive<'a>(archives: impl IntoIterator) -> Option { + sorted_archives(archives).into_iter().last() +} + +fn resume_point_for_archives<'a>(archives: impl IntoIterator) -> Point { + let parsed = sorted_archives(archives); + + parsed.iter().rev().nth(1).map(|archive| archive.last_block).unwrap_or(Point::Origin) } /// Returns the latest chunk number present in the given immutable directory. @@ -210,6 +299,10 @@ fn infer_chunk_from_slot(slot: u64) -> u64 { slot / 21_600 } +fn from_chunk_for_resume_point(latest_chunk: Option, resume_point: Point) -> u64 { + latest_chunk.unwrap_or_else(|| infer_chunk_from_slot(resume_point.slot_or_default().into()).saturating_sub(1)) +} + struct AggregatorDetails { endpoint: &'static str, verification_key: &'static str, @@ -333,13 +426,17 @@ pub async fn run(args: Args) -> Result<(), Box> { let ledger = new_block_validator(network, ledger_dir)?; let tip = ledger.get_tip(); + let mut existing_archives = list_existing_archives(network)?; + let tail_archive = latest_archive(&existing_archives); // Determine the chunk to start from + let resume_point = resume_point_for_archives(&existing_archives); + let latest_chunk = get_latest_chunk(&immutable_dir)?; - let from_chunk = latest_chunk.unwrap_or(infer_chunk_from_slot(tip.slot_or_default().into()) - 1); + let from_chunk = from_chunk_for_resume_point(latest_chunk, resume_point); info!( - tip=%tip, from_chunk=%from_chunk, + tip=%tip, resume_point=%resume_point, from_chunk=%from_chunk, "Downloading mithril immutable chunks" ); @@ -347,27 +444,164 @@ pub async fn run(args: Args) -> Result<(), Box> { info!("Packaging blocks into .tar.gz files"); - // Read blocks from the immutable storage and package them into .tar.gz files - const BLOCKS_PER_ARCHIVE: usize = 20000; - let mut iter = read_blocks_from_point(&immutable_dir, to_network_point(tip))?.map_while(Result::ok).skip(1); // Exclude the tip itself + // Read blocks from the immutable storage and package them into .tar.gz files. + let mut iter = + read_blocks_from_point(&immutable_dir, to_network_point(resume_point))?.map_while(Result::ok).skip(1); // Exclude the resume point itself loop { let chunk: Vec<_> = iter.by_ref().take(BLOCKS_PER_ARCHIVE).collect(); if chunk.is_empty() { break; } - let map: BTreeMap> = chunk + let map: BTreeMap> = chunk .iter() .map(|cbor| { let parsed = parse_header_slot_and_hash(cbor)?; - let name = format!("{}.{}.cbor", parsed.slot, hex::encode(parsed.header_hash)); - Ok((name, cbor)) + let point = Point::Specific(parsed.slot.into(), parsed.header_hash.into()); + Ok((point, cbor)) }) - .collect::>, cbor::decode::Error>>()?; + .collect::>, cbor::decode::Error>>()?; + + #[allow(clippy::expect_used)] + let archive_name = archive_name_for_blocks(&map).expect("chunk map is non-empty here by construction"); + if let Some(tail_archive) = &tail_archive { + let first_block = map.first_key_value().map(|(point, _)| point); + + if first_block == Some(&tail_archive.first_block) && archive_name == tail_archive.file_name { + info!(archive = %archive_name, "Retaining existing tail archive"); + continue; + } + + if first_block == Some(&tail_archive.first_block) && archive_name != tail_archive.file_name { + let stale_archive_path = PathBuf::from(blocks_dir(network)).join(&tail_archive.file_name); + fs::remove_file(&stale_archive_path)?; + existing_archives.remove(&tail_archive.file_name); + info!(archive = %tail_archive.file_name, replacement = %archive_name, "Replacing tail archive"); + } + } + if existing_archives.contains(&archive_name) { + debug_assert!(false, "encountered an already archived non-tail batch: {}", archive_name); + warn!(archive = %archive_name, "Encountered an already archived non-tail batch"); + continue; + } + let dir = package_blocks(&network, &map)?; - info!("Created {}", dir); + existing_archives.insert(archive_name); + + info!(blocks = map.len(), dir, "Created archive batch"); } info!("Done"); Ok(()) } + +#[cfg(test)] +mod tests { + use std::collections::BTreeMap; + + use amaru_kernel::Point; + + use super::{ + ArchiveMetadata, archive_name_for_blocks, latest_archive, latest_archive_end_point, parse_archive_bounds, + }; + + #[test] + fn archive_name_includes_first_and_last_blocks() { + let block_a = Vec::from([0x01_u8]); + let block_b = Vec::from([0x02_u8]); + let mut blocks = BTreeMap::new(); + + blocks.insert( + Point::try_from("10.aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap(), + &block_a, + ); + blocks.insert( + Point::try_from("20.bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb").unwrap(), + &block_b, + ); + + assert_eq!( + archive_name_for_blocks(&blocks), + Some( + "10.aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa__20.bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb.tar.gz" + .to_string() + ) + ); + } + + #[test] + fn archive_name_uses_point_order_across_decimal_boundaries() { + let block_a = Vec::from([0x01_u8]); + let block_b = Vec::from([0x02_u8]); + let mut blocks = BTreeMap::new(); + + blocks.insert( + Point::try_from("100000.aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap(), + &block_a, + ); + blocks.insert( + Point::try_from("99999.bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb").unwrap(), + &block_b, + ); + + assert_eq!( + archive_name_for_blocks(&blocks), + Some( + "99999.bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb__100000.aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.tar.gz" + .to_string() + ) + ); + } + + #[test] + fn archive_name_is_absent_for_empty_batch() { + let blocks: BTreeMap> = BTreeMap::new(); + + assert_eq!(archive_name_for_blocks(&blocks), None); + } + + #[test] + fn parse_archive_bounds_extracts_first_and_last_points() { + let bounds = parse_archive_bounds( + "10.aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa__20.bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb.tar.gz", + ); + + assert_eq!( + bounds, + Some(( + Point::try_from("10.aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa").unwrap(), + Point::try_from("20.bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb").unwrap(), + )) + ); + } + + #[test] + fn latest_archive_end_point_uses_last_block_boundary() { + let archives = vec![ + "10.aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa__20.bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb.tar.gz".to_string(), + "21.cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc__30.dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd.tar.gz".to_string(), + ]; + + assert_eq!( + latest_archive_end_point(&archives), + Some(Point::try_from("30.dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd").unwrap()) + ); + } + + #[test] + fn latest_archive_picks_last_archive() { + let archives = vec![ + "10.aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa__20.bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb.tar.gz".to_string(), + "21.cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc__25.dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd.tar.gz".to_string(), + ]; + + assert_eq!( + latest_archive(&archives), + Some(ArchiveMetadata { + file_name: "21.cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc__25.dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd.tar.gz".to_string(), + first_block: Point::try_from("21.cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc").unwrap(), + last_block: Point::try_from("25.dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd").unwrap(), + }) + ); + } +}