1313// limitations under the License.
1414
1515use std:: {
16- collections:: BTreeMap ,
16+ collections:: { BTreeMap , BTreeSet } ,
1717 fmt,
1818 fs:: { self , File } ,
1919 io:: { self , Cursor , Write } ,
2020 path:: PathBuf ,
2121 sync:: Arc ,
22- time:: { SystemTime , UNIX_EPOCH } ,
2322} ;
2423
2524use amaru:: { default_data_dir, default_ledger_dir} ;
26- use amaru_kernel:: { Hasher , NetworkName , cbor} ;
25+ use amaru_kernel:: { Hasher , NetworkName , Point , cbor} ;
2726use amaru_network:: point:: to_network_point;
2827use async_trait:: async_trait;
2928use clap:: Parser ;
30- use flate2:: { Compression , write :: GzEncoder } ;
29+ use flate2:: { Compression , GzBuilder } ;
3130use indicatif:: { MultiProgress , ProgressBar , ProgressState , ProgressStyle } ;
3231use mithril_client:: {
3332 ClientBuilder , MessageBuilder ,
@@ -37,10 +36,12 @@ use mithril_client::{
3736use pallas_hardano:: storage:: immutable:: read_blocks_from_point;
3837use tar:: { Builder , Header } ;
3938use tokio:: sync:: RwLock ;
40- use tracing:: info;
39+ use tracing:: { info, warn } ;
4140
4241use crate :: cmd:: new_block_validator;
4342
43+ const BLOCKS_PER_ARCHIVE : usize = 20000 ;
44+
4445#[ derive( Debug , Parser ) ]
4546pub struct Args {
4647 /// The target network to choose from.
@@ -160,36 +161,124 @@ impl FeedbackReceiver for IndicatifFeedbackReceiver {
160161}
161162
162163#[ allow( clippy:: expect_used) ]
163- fn package_blocks ( network : & NetworkName , blocks : & BTreeMap < String , & Vec < u8 > > ) -> io:: Result < String > {
164- let encoder = GzEncoder :: new ( Vec :: new ( ) , Compression :: default ( ) ) ;
164+ fn package_blocks ( network : & NetworkName , blocks : & BTreeMap < Point , & Vec < u8 > > ) -> io:: Result < String > {
165+ let compressed = build_archive_bytes ( blocks) ?;
166+
167+ let dir = blocks_dir ( * network) ;
168+ fs:: create_dir_all ( & dir) ?;
169+ let archive_path = archive_path_for_blocks ( network, blocks) . expect ( "blocks map is non-empty here by construction" ) ;
170+ let mut file = File :: create ( & archive_path) ?;
171+ file. write_all ( & compressed) ?;
172+
173+ Ok ( archive_path)
174+ }
175+
176+ fn block_file_name ( point : & Point ) -> String {
177+ format ! ( "{point}.cbor" )
178+ }
179+
180+ fn build_archive_bytes ( blocks : & BTreeMap < Point , & Vec < u8 > > ) -> io:: Result < Vec < u8 > > {
181+ let encoder = GzBuilder :: new ( ) . mtime ( 0 ) . write ( Vec :: new ( ) , Compression :: default ( ) ) ;
165182 let mut tar = Builder :: new ( encoder) ;
166183
167- for ( name , data) in blocks {
184+ for ( point , data) in blocks {
168185 let mut header = Header :: new_gnu ( ) ;
169186 header. set_size ( data. len ( ) as u64 ) ;
170187 header. set_mode ( 0o644 ) ;
171188 header. set_entry_type ( tar:: EntryType :: Regular ) ;
172- header. set_mtime ( SystemTime :: now ( ) . duration_since ( UNIX_EPOCH ) . unwrap_or_default ( ) . as_secs ( ) ) ;
189+ header. set_mtime ( 0 ) ;
173190 header. set_uid ( 0 ) ;
174191 header. set_gid ( 0 ) ;
175192 header. set_cksum ( ) ;
176193
177- tar. append_data ( & mut header, name , Cursor :: new ( data) ) ?;
194+ tar. append_data ( & mut header, block_file_name ( point ) , Cursor :: new ( data) ) ?;
178195 }
179196
180197 let encoder = tar. into_inner ( ) ?;
181- let compressed = encoder. finish ( ) ?;
198+ encoder. finish ( )
199+ }
182200
183- let dir = format ! ( "{}/blocks" , default_data_dir( * network) ) ;
184- fs:: create_dir_all ( & dir) ?;
185- let first_block =
186- blocks. first_key_value ( ) . map ( |kv| kv. 0 ) . cloned ( ) . expect ( "blocks map is non-empty here by construction" ) ;
187- let archive_path = format ! ( "{}/{}.tar.gz" , dir, first_block) ;
188- let mut f = File :: create ( & archive_path) ?;
201+ fn blocks_dir ( network : NetworkName ) -> String {
202+ format ! ( "{}/blocks" , default_data_dir( network) )
203+ }
189204
190- f. write_all ( & compressed) ?;
205+ fn archive_name_for_blocks ( blocks : & BTreeMap < Point , & Vec < u8 > > ) -> Option < String > {
206+ let ( first_block, _) = blocks. first_key_value ( ) ?;
207+ let ( last_block, _) = blocks. last_key_value ( ) ?;
191208
192- Ok ( archive_path)
209+ Some ( format ! ( "{first_block}__{last_block}.tar.gz" ) )
210+ }
211+
212+ fn archive_path_for_blocks ( network : & NetworkName , blocks : & BTreeMap < Point , & Vec < u8 > > ) -> Option < String > {
213+ archive_name_for_blocks ( blocks) . map ( |archive_name| format ! ( "{}/{}" , blocks_dir( * network) , archive_name) )
214+ }
215+
216+ fn list_existing_archives ( network : NetworkName ) -> Result < BTreeSet < String > , io:: Error > {
217+ let dir = PathBuf :: from ( blocks_dir ( network) ) ;
218+ if !dir. try_exists ( ) ? {
219+ return Ok ( BTreeSet :: new ( ) ) ;
220+ }
221+
222+ Ok ( fs:: read_dir ( dir) ?
223+ . filter_map ( Result :: ok)
224+ . filter_map ( |entry| entry. file_name ( ) . into_string ( ) . ok ( ) )
225+ . filter ( |name| name. ends_with ( ".tar.gz" ) )
226+ . collect ( ) )
227+ }
228+
229+ fn parse_archive_point ( name : & str ) -> Option < Point > {
230+ Point :: try_from ( name) . ok ( )
231+ }
232+
233+ #[ derive( Debug , Clone , PartialEq , Eq ) ]
234+ struct ArchiveMetadata {
235+ file_name : String ,
236+ first_block : Point ,
237+ last_block : Point ,
238+ }
239+
240+ fn parse_archive_metadata ( archive_name : & str ) -> Option < ArchiveMetadata > {
241+ let archive_name = archive_name. strip_suffix ( ".tar.gz" ) ?;
242+ let ( first_block, last_block) = archive_name. split_once ( "__" ) ?;
243+
244+ Some ( ArchiveMetadata {
245+ file_name : format ! ( "{archive_name}.tar.gz" ) ,
246+ first_block : parse_archive_point ( first_block) ?,
247+ last_block : parse_archive_point ( last_block) ?,
248+ } )
249+ }
250+
251+ #[ cfg( test) ]
252+ fn parse_archive_bounds ( archive_name : & str ) -> Option < ( Point , Point ) > {
253+ let metadata = parse_archive_metadata ( archive_name) ?;
254+
255+ Some ( ( metadata. first_block , metadata. last_block ) )
256+ }
257+
258+ #[ cfg( test) ]
259+ fn latest_archive_end_point < ' a > ( archives : impl IntoIterator < Item = & ' a String > ) -> Option < Point > {
260+ archives
261+ . into_iter ( )
262+ . filter_map ( |archive_name| parse_archive_metadata ( archive_name) )
263+ . map ( |archive| archive. last_block )
264+ . max ( )
265+ }
266+
267+ fn sorted_archives < ' a > ( archives : impl IntoIterator < Item = & ' a String > ) -> Vec < ArchiveMetadata > {
268+ let mut parsed: Vec < _ > =
269+ archives. into_iter ( ) . filter_map ( |archive_name| parse_archive_metadata ( archive_name) ) . collect ( ) ;
270+ parsed. sort_by_key ( |archive| archive. last_block ) ;
271+ parsed
272+ }
273+
274+ fn latest_archive < ' a > ( archives : impl IntoIterator < Item = & ' a String > ) -> Option < ArchiveMetadata > {
275+ sorted_archives ( archives) . into_iter ( ) . last ( )
276+ }
277+
278+ fn resume_point_for_archives < ' a > ( archives : impl IntoIterator < Item = & ' a String > ) -> Point {
279+ let parsed = sorted_archives ( archives) ;
280+
281+ parsed. iter ( ) . rev ( ) . nth ( 1 ) . map ( |archive| archive. last_block ) . unwrap_or ( Point :: Origin )
193282}
194283
195284/// Returns the latest chunk number present in the given immutable directory.
@@ -210,6 +299,10 @@ fn infer_chunk_from_slot(slot: u64) -> u64 {
210299 slot / 21_600
211300}
212301
302+ fn from_chunk_for_resume_point ( latest_chunk : Option < u64 > , resume_point : Point ) -> u64 {
303+ latest_chunk. unwrap_or_else ( || infer_chunk_from_slot ( resume_point. slot_or_default ( ) . into ( ) ) . saturating_sub ( 1 ) )
304+ }
305+
213306struct AggregatorDetails {
214307 endpoint : & ' static str ,
215308 verification_key : & ' static str ,
@@ -333,41 +426,182 @@ pub async fn run(args: Args) -> Result<(), Box<dyn std::error::Error>> {
333426
334427 let ledger = new_block_validator ( network, ledger_dir) ?;
335428 let tip = ledger. get_tip ( ) ;
429+ let mut existing_archives = list_existing_archives ( network) ?;
430+ let tail_archive = latest_archive ( & existing_archives) ;
336431
337432 // Determine the chunk to start from
433+ let resume_point = resume_point_for_archives ( & existing_archives) ;
434+
338435 let latest_chunk = get_latest_chunk ( & immutable_dir) ?;
339- let from_chunk = latest_chunk . unwrap_or ( infer_chunk_from_slot ( tip . slot_or_default ( ) . into ( ) ) - 1 ) ;
436+ let from_chunk = from_chunk_for_resume_point ( latest_chunk , resume_point ) ;
340437
341438 info ! (
342- tip=%tip, from_chunk=%from_chunk,
439+ tip=%tip, resume_point=%resume_point , from_chunk=%from_chunk,
343440 "Downloading mithril immutable chunks"
344441 ) ;
345442
346443 download_from_mithril ( network, target_dir, from_chunk) . await ?;
347444
348445 info ! ( "Packaging blocks into .tar.gz files" ) ;
349446
350- // Read blocks from the immutable storage and package them into .tar.gz files
351- const BLOCKS_PER_ARCHIVE : usize = 20000 ;
352- let mut iter = read_blocks_from_point ( & immutable_dir, to_network_point ( tip ) ) ?. map_while ( Result :: ok) . skip ( 1 ) ; // Exclude the tip itself
447+ // Read blocks from the immutable storage and package them into .tar.gz files.
448+ let mut iter =
449+ read_blocks_from_point ( & immutable_dir, to_network_point ( resume_point ) ) ?. map_while ( Result :: ok) . skip ( 1 ) ; // Exclude the resume point itself
353450 loop {
354451 let chunk: Vec < _ > = iter. by_ref ( ) . take ( BLOCKS_PER_ARCHIVE ) . collect ( ) ;
355452 if chunk. is_empty ( ) {
356453 break ;
357454 }
358- let map: BTreeMap < String , & Vec < u8 > > = chunk
455+ let map: BTreeMap < Point , & Vec < u8 > > = chunk
359456 . iter ( )
360457 . map ( |cbor| {
361458 let parsed = parse_header_slot_and_hash ( cbor) ?;
362- let name = format ! ( "{}.{}.cbor" , parsed. slot, hex :: encode ( parsed. header_hash) ) ;
363- Ok ( ( name , cbor) )
459+ let point = Point :: Specific ( parsed. slot . into ( ) , parsed. header_hash . into ( ) ) ;
460+ Ok ( ( point , cbor) )
364461 } )
365- . collect :: < Result < BTreeMap < String , & Vec < u8 > > , cbor:: decode:: Error > > ( ) ?;
462+ . collect :: < Result < BTreeMap < Point , & Vec < u8 > > , cbor:: decode:: Error > > ( ) ?;
463+
464+ #[ allow( clippy:: expect_used) ]
465+ let archive_name = archive_name_for_blocks ( & map) . expect ( "chunk map is non-empty here by construction" ) ;
466+ if let Some ( tail_archive) = & tail_archive {
467+ let first_block = map. first_key_value ( ) . map ( |( point, _) | point) ;
468+
469+ if first_block == Some ( & tail_archive. first_block ) && archive_name == tail_archive. file_name {
470+ info ! ( archive = %archive_name, "Retaining existing tail archive" ) ;
471+ continue ;
472+ }
473+
474+ if first_block == Some ( & tail_archive. first_block ) && archive_name != tail_archive. file_name {
475+ let stale_archive_path = PathBuf :: from ( blocks_dir ( network) ) . join ( & tail_archive. file_name ) ;
476+ fs:: remove_file ( & stale_archive_path) ?;
477+ existing_archives. remove ( & tail_archive. file_name ) ;
478+ info ! ( archive = %tail_archive. file_name, replacement = %archive_name, "Replacing tail archive" ) ;
479+ }
480+ }
481+ if existing_archives. contains ( & archive_name) {
482+ debug_assert ! ( false , "encountered an already archived non-tail batch: {}" , archive_name) ;
483+ warn ! ( archive = %archive_name, "Encountered an already archived non-tail batch" ) ;
484+ continue ;
485+ }
486+
366487 let dir = package_blocks ( & network, & map) ?;
367- info ! ( "Created {}" , dir) ;
488+ existing_archives. insert ( archive_name) ;
489+
490+ info ! ( blocks = map. len( ) , dir, "Created archive batch" ) ;
368491 }
369492
370493 info ! ( "Done" ) ;
371494
372495 Ok ( ( ) )
373496}
497+
498+ #[ cfg( test) ]
499+ mod tests {
500+ use std:: collections:: BTreeMap ;
501+
502+ use amaru_kernel:: Point ;
503+
504+ use super :: {
505+ ArchiveMetadata , archive_name_for_blocks, latest_archive, latest_archive_end_point, parse_archive_bounds,
506+ } ;
507+
508+ #[ test]
509+ fn archive_name_includes_first_and_last_blocks ( ) {
510+ let block_a = Vec :: from ( [ 0x01_u8 ] ) ;
511+ let block_b = Vec :: from ( [ 0x02_u8 ] ) ;
512+ let mut blocks = BTreeMap :: new ( ) ;
513+
514+ blocks. insert (
515+ Point :: try_from ( "10.aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" ) . unwrap ( ) ,
516+ & block_a,
517+ ) ;
518+ blocks. insert (
519+ Point :: try_from ( "20.bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb" ) . unwrap ( ) ,
520+ & block_b,
521+ ) ;
522+
523+ assert_eq ! (
524+ archive_name_for_blocks( & blocks) ,
525+ Some (
526+ "10.aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa__20.bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb.tar.gz"
527+ . to_string( )
528+ )
529+ ) ;
530+ }
531+
532+ #[ test]
533+ fn archive_name_uses_point_order_across_decimal_boundaries ( ) {
534+ let block_a = Vec :: from ( [ 0x01_u8 ] ) ;
535+ let block_b = Vec :: from ( [ 0x02_u8 ] ) ;
536+ let mut blocks = BTreeMap :: new ( ) ;
537+
538+ blocks. insert (
539+ Point :: try_from ( "100000.aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" ) . unwrap ( ) ,
540+ & block_a,
541+ ) ;
542+ blocks. insert (
543+ Point :: try_from ( "99999.bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb" ) . unwrap ( ) ,
544+ & block_b,
545+ ) ;
546+
547+ assert_eq ! (
548+ archive_name_for_blocks( & blocks) ,
549+ Some (
550+ "99999.bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb__100000.aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.tar.gz"
551+ . to_string( )
552+ )
553+ ) ;
554+ }
555+
556+ #[ test]
557+ fn archive_name_is_absent_for_empty_batch ( ) {
558+ let blocks: BTreeMap < Point , & Vec < u8 > > = BTreeMap :: new ( ) ;
559+
560+ assert_eq ! ( archive_name_for_blocks( & blocks) , None ) ;
561+ }
562+
563+ #[ test]
564+ fn parse_archive_bounds_extracts_first_and_last_points ( ) {
565+ let bounds = parse_archive_bounds (
566+ "10.aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa__20.bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb.tar.gz" ,
567+ ) ;
568+
569+ assert_eq ! (
570+ bounds,
571+ Some ( (
572+ Point :: try_from( "10.aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" ) . unwrap( ) ,
573+ Point :: try_from( "20.bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb" ) . unwrap( ) ,
574+ ) )
575+ ) ;
576+ }
577+
578+ #[ test]
579+ fn latest_archive_end_point_uses_last_block_boundary ( ) {
580+ let archives = vec ! [
581+ "10.aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa__20.bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb.tar.gz" . to_string( ) ,
582+ "21.cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc__30.dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd.tar.gz" . to_string( ) ,
583+ ] ;
584+
585+ assert_eq ! (
586+ latest_archive_end_point( & archives) ,
587+ Some ( Point :: try_from( "30.dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd" ) . unwrap( ) )
588+ ) ;
589+ }
590+
591+ #[ test]
592+ fn latest_archive_picks_last_archive ( ) {
593+ let archives = vec ! [
594+ "10.aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa__20.bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb.tar.gz" . to_string( ) ,
595+ "21.cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc__25.dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd.tar.gz" . to_string( ) ,
596+ ] ;
597+
598+ assert_eq ! (
599+ latest_archive( & archives) ,
600+ Some ( ArchiveMetadata {
601+ file_name: "21.cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc__25.dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd.tar.gz" . to_string( ) ,
602+ first_block: Point :: try_from( "21.cccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc" ) . unwrap( ) ,
603+ last_block: Point :: try_from( "25.dddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddddd" ) . unwrap( ) ,
604+ } )
605+ ) ;
606+ }
607+ }
0 commit comments