diff --git a/Cargo.lock b/Cargo.lock index 5ffbd2d1..c6033525 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -373,9 +373,16 @@ version = "0.1.0" dependencies = [ "acropolis_common", "anyhow", + "async-compression", "caryatid_sdk", "config", + "futures-util", + "reqwest 0.12.24", + "serde", + "serde_json", + "thiserror 2.0.17", "tokio", + "tokio-util", "tracing", ] @@ -499,6 +506,7 @@ dependencies = [ "acropolis_module_parameters_state", "acropolis_module_peer_network_interface", "acropolis_module_rest_blockfrost", + "acropolis_module_snapshot_bootstrapper", "acropolis_module_spdd_state", "acropolis_module_spo_state", "acropolis_module_stake_delta_filter", diff --git a/common/src/snapshot/streaming_snapshot.rs b/common/src/snapshot/streaming_snapshot.rs index 673acb12..e4dc9f48 100644 --- a/common/src/snapshot/streaming_snapshot.rs +++ b/common/src/snapshot/streaming_snapshot.rs @@ -1568,7 +1568,7 @@ impl StreamingSnapshotParser { if utxo_count.is_multiple_of(1000000) { let buffer_usage = buffer.len(); info!( - " Streamed {} UTXOs, buffer: {} MB, max entry: {} bytes", + "Streamed {} UTXOs, buffer: {} MB, max entry: {} bytes", utxo_count, buffer_usage / 1024 / 1024, max_single_entry_size @@ -1625,20 +1625,17 @@ impl StreamingSnapshotParser { } } - info!(" 🎯 STREAMING RESULTS:"); - info!(" • UTXOs processed: {}", utxo_count); + info!("Streaming results:"); + info!(" UTXOs processed: {}", utxo_count); info!( - " • Total data streamed: {:.2} MB", + " Total data streamed: {:.2} MB", total_bytes_processed as f64 / 1024.0 / 1024.0 ); info!( - " • Peak buffer usage: {} MB (vs 2.1GB before!)", + " Peak buffer usage: {} MB", PARSE_BUFFER_SIZE / 1024 / 1024 ); - info!( - " • Largest single entry: {} bytes", - max_single_entry_size - ); + info!(" Largest single entry: {} bytes", max_single_entry_size); Ok(utxo_count) } diff --git a/modules/snapshot_bootstrapper/Cargo.toml b/modules/snapshot_bootstrapper/Cargo.toml index 05a3d128..b5104383 100644 --- a/modules/snapshot_bootstrapper/Cargo.toml +++ b/modules/snapshot_bootstrapper/Cargo.toml @@ -17,6 +17,13 @@ anyhow = { workspace = true } config = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +serde = { version = "1.0.228", features = ["derive"] } +serde_json = "1.0.132" +async-compression = { version = "0.4.32", features = ["tokio", "gzip"] } +futures-util = "0.3" +reqwest = "0.12" +thiserror = "2.0.17" +tokio-util = { version = "0.7", features = ["io"] } [lib] path = "src/snapshot_bootstrapper.rs" diff --git a/modules/snapshot_bootstrapper/data/mainnet/config.json b/modules/snapshot_bootstrapper/data/mainnet/config.json new file mode 100644 index 00000000..a1bb266a --- /dev/null +++ b/modules/snapshot_bootstrapper/data/mainnet/config.json @@ -0,0 +1,104 @@ +{ + "snapshots": [ + 507, + 508, + 509 + ], + "points": [ + { + "epoch": 507, + "id": "670ca68c3de580f8469677754a725e86ca72a7be381d3108569f0704a5fca327", + "slot": 134092758 + }, + { + "epoch": 508, + "id": "29011cc1320d03b3da0121236dc66e6bc391feef4bb1d506a7fb20e769d6a494", + "slot": 134524753 + }, + { + "epoch": 509, + "id": "6558deef007ba372a414466e49214368c17c1f8428093193fc187d1c4587053c", + "slot": 134956789 + }, + { + "epoch": 510, + "id": "3fd738bacbcc277d43358a28ed15fa4335977c822fd7546d0de2606d7d2a57aa", + "slot": 135388794 + }, + { + "epoch": 511, + "id": "d02f89d21fe9c80f927eeda31fadb03b589db2ac5c8108d7171c4c319aca2fa1", + "slot": 135820797 + }, + { + "epoch": 512, + "id": "9503c7c669746be68ca34ed4d822d9d0dcccd5c0ef61cb9679a1c1e739534853", + "slot": 136252793 + }, + { + "epoch": 513, + "id": "e2bb0babbc715953ce1edc4e0c817a4b1fa9d36124648b4d21d1a2ccd26be672", + "slot": 136684793 + }, + { + "epoch": 514, + "id": "2c4f7a0a855e76e5d83b9d3e168213711490663dddfc6925e09a37fe46ed62b4", + "slot": 137116798 + }, + { + "epoch": 515, + "id": "66c5229785de3ff7bb2834db69fc8da5d3203a7cdf2d6983b3e9e155ff6ec0fb", + "slot": 137548794 + }, + { + "epoch": 516, + "id": "b934fa686e585636cc74a07555dbd8c10f9680464f80273f1d29806ecbc5e822", + "slot": 137980781 + }, + { + "epoch": 517, + "id": "7c4afb5f4ba5d7182f99fd839e26302bcdca06c9066b825f2f40f4a094d7f0ab", + "slot": 138412701 + }, + { + "epoch": 518, + "id": "a65138e908ccc90014b4ae740382c7908f9636e56c6e9d6ecec38f452b70c93f", + "slot": 138844799 + }, + { + "epoch": 519, + "id": "0cffc5eb77a6885257fcba94b8fd6fdddc80e368bf4ef855f058c6adda4933c1", + "slot": 139276793 + }, + { + "epoch": 520, + "id": "58f198313d00d639814db34f32aad259e22c53089dfa95dae79e0e2e4d93c6f0", + "slot": 139708765 + }, + { + "epoch": 521, + "id": "7e423f52284987b4b358a0a9b6847525c42a818a024dde663101669ab2e8a6ee", + "slot": 140140779 + }, + { + "epoch": 522, + "id": "86f874039f07143ab4d7d5c6ccb27ea33fd1440f81176055fe9e4e6e910800e9", + "slot": 140572798 + }, + { + "epoch": 523, + "id": "96a53046d8bbfa690b6bfbc2c7f99036b3494f99e616e998224bcfcd33b84e7b", + "slot": 141004797 + }, + { + "epoch": 524, + "id": "865267d5b5fe9d497418ea72c9b84058e5aa2a98ace96043d53fec32eebf4fef", + "slot": 141436773 + }, + { + "epoch": 525, + "id": "e4846337e6f87ed65c88e770ab5c1bec39de45cbf3bdde88b249ac1ad2cd2a8a", + "slot": 141868737 + } + ] +} diff --git a/modules/snapshot_bootstrapper/data/mainnet/snapshots.json b/modules/snapshot_bootstrapper/data/mainnet/snapshots.json new file mode 100644 index 00000000..695a55ec --- /dev/null +++ b/modules/snapshot_bootstrapper/data/mainnet/snapshots.json @@ -0,0 +1,17 @@ +[ + { + "epoch": 507, + "point": "134092758.670ca68c3de580f8469677754a725e86ca72a7be381d3108569f0704a5fca327", + "url": "https://pub-b844360df4774bb092a2bb2043b888e5.r2.dev/134092758.670ca68c3de580f8469677754a725e86ca72a7be381d3108569f0704a5fca327.cbor.gz" + }, + { + "epoch": 508, + "point": "134524753.29011cc1320d03b3da0121236dc66e6bc391feef4bb1d506a7fb20e769d6a494", + "url": "https://pub-b844360df4774bb092a2bb2043b888e5.r2.dev/134524753.29011cc1320d03b3da0121236dc66e6bc391feef4bb1d506a7fb20e769d6a494.cbor.gz" + }, + { + "epoch": 509, + "point": "134956789.6558deef007ba372a414466e49214368c17c1f8428093193fc187d1c4587053c", + "url": "https://pub-b844360df4774bb092a2bb2043b888e5.r2.dev/134956789.6558deef007ba372a414466e49214368c17c1f8428093193fc187d1c4587053c.cbor.gz" + } +] diff --git a/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs b/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs index 9f22c356..f9edbb1a 100644 --- a/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs +++ b/modules/snapshot_bootstrapper/src/snapshot_bootstrapper.rs @@ -1,4 +1,8 @@ -use std::{str::FromStr, sync::Arc}; +use std::{ + path::{Path, PathBuf}, + str::FromStr, + sync::Arc, +}; use acropolis_common::{ genesis_values::GenesisValues, @@ -15,17 +19,78 @@ use acropolis_common::{ BlockHash, BlockInfo, BlockStatus, Era, GenesisDelegates, }; use anyhow::Result; +use async_compression::tokio::bufread::GzipDecoder; use caryatid_sdk::{module, Context}; use config::Config; +use futures_util::TryStreamExt; +use serde::{Deserialize, Serialize}; +use std::fs; +use std::io; +use thiserror::Error; +use tokio::fs::File; +use tokio::io::BufReader; use tokio::time::Instant; +use tokio_util::io::StreamReader; use tracing::{error, info, info_span, Instrument}; const DEFAULT_SNAPSHOT_TOPIC: &str = "cardano.snapshot"; const DEFAULT_STARTUP_TOPIC: &str = "cardano.sequence.start"; const DEFAULT_COMPLETION_TOPIC: &str = "cardano.sequence.bootstrapped"; +#[derive(Debug, Error)] +pub enum SnapshotBootstrapError { + #[error("Cannot read network config file {0}: {1}")] + ReadNetworkConfig(PathBuf, io::Error), + + #[error("Cannot read snapshots metadata file {0}: {1}")] + ReadSnapshotsFile(PathBuf, io::Error), + + #[error("Failed to parse network config {0}: {1}")] + MalformedNetworkConfig(PathBuf, serde_json::Error), + + #[error("Failed to parse snapshots JSON file {0}: {1}")] + MalformedSnapshotsFile(PathBuf, serde_json::Error), + + #[error("Cannot create directory {0}: {1}")] + CreateDirectory(PathBuf, io::Error), + + #[error("Failed to download snapshot from {0}: {1}")] + DownloadError(String, reqwest::Error), + + #[error("Download failed from {0}: HTTP status {1}")] + DownloadInvalidStatusCode(String, reqwest::StatusCode), + + #[error("I/O error: {0}")] + Io(#[from] io::Error), +} + +/// Network configuration file (config.json) +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +struct NetworkConfig { + snapshots: Vec, + points: Vec, +} + +/// Point +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +struct Point { + epoch: u64, + id: String, + slot: u64, +} + +/// Snapshot metadata from snapshots.json +#[derive(Debug, Deserialize, Serialize, Clone)] +struct SnapshotFileMetadata { + epoch: u64, + point: String, + url: String, +} + /// Callback handler that accumulates snapshot data and builds state -struct SnapshotHandler { +pub struct SnapshotHandler { context: Arc>, snapshot_topic: String, @@ -90,25 +155,22 @@ impl SnapshotHandler { // Shelley mainnet genesis hash (placeholder - should be from config) shelley_genesis_hash: Hash::<32>::from_str( "1a3be38bcbb7911969283716ad7aa550250226b76a61fc51cc9a9a35d9276d81", - ) - .unwrap(), - genesis_delegs: GenesisDelegates::try_from(vec![]).unwrap(), + )?, + genesis_delegs: GenesisDelegates::try_from(vec![])?, }) } async fn publish_start(&self) -> Result<()> { - anyhow::Context::context( - self.context - .message_bus - .publish( - &self.snapshot_topic, - Arc::new(Message::Snapshot( - acropolis_common::messages::SnapshotMessage::Startup, - )), - ) - .await, - "Failed to publish start message", - ) + self.context + .message_bus + .publish( + &self.snapshot_topic, + Arc::new(Message::Snapshot( + acropolis_common::messages::SnapshotMessage::Startup, + )), + ) + .await + .map_err(|e| anyhow::anyhow!("Failed to publish start message: {}", e)) } async fn publish_completion( @@ -123,10 +185,11 @@ impl SnapshotHandler { }), )); - anyhow::Context::context( - self.context.message_bus.publish(&self.snapshot_topic, Arc::new(message)).await, - "Failed to publish completion", - ) + self.context + .message_bus + .publish(&self.snapshot_topic, Arc::new(message)) + .await + .map_err(|e| anyhow::anyhow!("Failed to publish completion: {}", e)) } } @@ -222,21 +285,20 @@ impl SnapshotCallbacks for SnapshotHandler { impl SnapshotBootstrapper { pub async fn init(&self, context: Arc>, config: Arc) -> Result<()> { - // TODO: read a config file path, not the snapshot-path; implement TODOs below. - let file_path = config - .get_string("snapshot-path") - .inspect_err(|e| error!("failed to find snapshot-path config: {e}"))?; - + let network = config.get_string("network").unwrap_or_else(|_| "mainnet".to_string()); + let data_dir = config.get_string("data-dir").unwrap_or_else(|_| "./data".to_string()); let startup_topic = config.get_string("startup-topic").unwrap_or(DEFAULT_STARTUP_TOPIC.to_string()); - let snapshot_topic = config.get_string("snapshot-topic").unwrap_or(DEFAULT_SNAPSHOT_TOPIC.to_string()); - info!("Publishing snapshots on '{snapshot_topic}'"); - let completion_topic = config.get_string("completion-topic").unwrap_or(DEFAULT_COMPLETION_TOPIC.to_string()); + info!("Publishing snapshots on '{snapshot_topic}'"); info!("Completing with '{completion_topic}'"); + info!("Snapshot bootstrapper initializing"); + info!(" Network: {}", network); + info!(" Data directory: {}", data_dir); + info!(" Publishing on '{}'", snapshot_topic); let mut subscription = context.subscribe(&startup_topic).await?; @@ -254,11 +316,77 @@ impl SnapshotBootstrapper { let span = info_span!("snapshot_bootstrapper.handle"); async { - if let Err(e) = - Self::process_snapshot(&file_path, context.clone(), &completion_topic).await - { - error!("Failed to process snapshot: {}", e); + let network_dir = format!("{}/{}", data_dir, network); + let config_path = format!("{}/config.json", network_dir); + let snapshots_path = format!("{}/snapshots.json", network_dir); + + let network_config = match Self::read_network_config(&config_path) { + Ok(cfg) => cfg, + Err(e) => { + error!("Failed to read network config: {}", e); + return; + } + }; + + info!( + "Loading snapshots for epochs: {:?}", + network_config.snapshots + ); + + let all_snapshots = match Self::read_snapshots_metadata(&snapshots_path) { + Ok(snaps) => snaps, + Err(e) => { + error!("Failed to read snapshots metadata: {}", e); + return; + } + }; + + let target_snapshots: Vec<_> = all_snapshots + .iter() + .filter(|s| network_config.snapshots.contains(&s.epoch)) + .cloned() + .collect(); + + if target_snapshots.is_empty() { + error!( + "No snapshots found for requested epochs: {:?}", + network_config.snapshots + ); + return; + } + + info!("Found {} snapshot files to process", target_snapshots.len()); + + for snapshot_meta in &target_snapshots { + let filename = format!("{}.cbor", snapshot_meta.point); + let file_path = format!("{}/{}", network_dir, filename); + + if let Err(e) = + Self::ensure_snapshot_downloaded(&file_path, snapshot_meta).await + { + error!("Failed to download snapshot: {}", e); + return; + } + } + + for snapshot_meta in target_snapshots { + let filename = format!("{}.cbor", snapshot_meta.point); + let file_path = format!("{}/{}", network_dir, filename); + + info!( + "Processing snapshot for epoch {} from {}", + snapshot_meta.epoch, file_path + ); + + if let Err(e) = + Self::process_snapshot(&file_path, context.clone(), &completion_topic).await + { + error!("Failed to process snapshot: {}", e); + return; + } } + + info!("Snapshot bootstrap completed successfully"); } .instrument(span) .await; @@ -267,6 +395,95 @@ impl SnapshotBootstrapper { Ok(()) } + /// Read network configuration + fn read_network_config(path: &str) -> Result { + let path_buf = PathBuf::from(path); + let content = fs::read_to_string(&path_buf) + .map_err(|e| SnapshotBootstrapError::ReadNetworkConfig(path_buf.clone(), e))?; + + let config: NetworkConfig = serde_json::from_str(&content) + .map_err(|e| SnapshotBootstrapError::MalformedNetworkConfig(path_buf, e))?; + + Ok(config) + } + + /// Read snapshot metadata + fn read_snapshots_metadata( + path: &str, + ) -> Result, SnapshotBootstrapError> { + let path_buf = PathBuf::from(path); + let content = fs::read_to_string(&path_buf) + .map_err(|e| SnapshotBootstrapError::ReadSnapshotsFile(path_buf.clone(), e))?; + + let snapshots: Vec = serde_json::from_str(&content) + .map_err(|e| SnapshotBootstrapError::MalformedSnapshotsFile(path_buf, e))?; + + Ok(snapshots) + } + + /// Ensure the snapshot is downloaded + async fn ensure_snapshot_downloaded( + file_path: &str, + metadata: &SnapshotFileMetadata, + ) -> Result<(), SnapshotBootstrapError> { + let path = Path::new(file_path); + + if path.exists() { + info!("Snapshot file already exists: {}", file_path); + return Ok(()); + } + + info!( + "Downloading snapshot from {} to {}", + metadata.url, file_path + ); + Self::download_snapshot(&metadata.url, file_path).await?; + info!("Downloaded: {}", file_path); + Ok(()) + } + + async fn download_snapshot(url: &str, output_path: &str) -> Result<(), SnapshotBootstrapError> { + if let Some(parent) = Path::new(output_path).parent() { + tokio::fs::create_dir_all(parent) + .await + .map_err(|e| SnapshotBootstrapError::CreateDirectory(parent.to_path_buf(), e))?; + } + + let client = reqwest::Client::new(); + let response = client + .get(url) + .send() + .await + .map_err(|e| SnapshotBootstrapError::DownloadError(url.to_string(), e))?; + + if !response.status().is_success() { + return Err(SnapshotBootstrapError::DownloadInvalidStatusCode( + url.to_string(), + response.status(), + )); + } + + let total_size = response.content_length().unwrap_or(0); + if total_size > 0 { + info!("Downloading {} MB (compressed)...", total_size / 1_000_000); + } + + let tmp_path = Path::new(output_path).with_extension("partial"); + let mut file = File::create(&tmp_path).await?; + + let raw_stream_reader = + StreamReader::new(response.bytes_stream().map_err(io::Error::other)); + let buffered_reader = BufReader::new(raw_stream_reader); + let mut decoded_stream = GzipDecoder::new(buffered_reader); + + tokio::io::copy(&mut decoded_stream, &mut file).await?; + file.sync_all().await?; + tokio::fs::rename(&tmp_path, output_path).await?; + + Ok(()) + } + + /// Process a single snapshot file async fn process_snapshot( file_path: &str, context: Arc>, @@ -275,22 +492,14 @@ impl SnapshotBootstrapper { let parser = StreamingSnapshotParser::new(file_path); let mut callbacks = SnapshotHandler::new(context.clone(), completion_topic.to_string()); - info!( - "Starting snapshot parsing and publishing from: {}", - file_path - ); + info!("Starting snapshot parsing: {}", file_path); let start = Instant::now(); callbacks.publish_start().await?; - - // Parse the snapshot with our callback handler parser.parse(&mut callbacks)?; let duration = start.elapsed(); - info!( - "✓ Parse and publish completed successfully in {:.2?}", - duration - ); + info!("Parsed snapshot in {:.2?}", duration); // Build the final state from accumulated data let block_info = callbacks.build_block_info()?; diff --git a/processes/omnibus/Cargo.toml b/processes/omnibus/Cargo.toml index 9afa646f..bb171711 100644 --- a/processes/omnibus/Cargo.toml +++ b/processes/omnibus/Cargo.toml @@ -33,6 +33,7 @@ acropolis_module_historical_accounts_state = { path = "../../modules/historical_ acropolis_module_historical_epochs_state = { path = "../../modules/historical_epochs_state" } acropolis_module_block_vrf_validator = { path = "../../modules/block_vrf_validator" } acropolis_module_block_kes_validator = { path = "../../modules/block_kes_validator" } +acropolis_module_snapshot_bootstrapper = { path = "../../modules/snapshot_bootstrapper" } caryatid_process = { workspace = true } caryatid_module_clock = { workspace = true } diff --git a/processes/omnibus/omnibus.toml b/processes/omnibus/omnibus.toml index 0548466d..f6805c04 100644 --- a/processes/omnibus/omnibus.toml +++ b/processes/omnibus/omnibus.toml @@ -1,5 +1,15 @@ # Top-level configuration for Acropolis omnibus process +# ============================================================================ +# Startup Configuration +# ============================================================================ +[startup] +method = "genesis" # Options: "genesis" | "snapshot" +topic = "cardano.sequence.start" + +# ============================================================================ +# Bootstrap Module Configurations +# ============================================================================ [module.genesis-bootstrapper] [module.mithril-snapshot-fetcher] @@ -10,6 +20,13 @@ download-max-age = "never" # Pause constraint E.g. "epoch:100", "block:1200" pause = "none" +[module.snapshot-bootstrapper] +network = "mainnet" +data-dir = "../../modules/snapshot_bootstrapper/data" + +# ============================================================================ +# Core Module Configurations +# ============================================================================ [module.peer-network-interface] sync-point = "snapshot" node-addresses = [ @@ -170,9 +187,9 @@ port = 4340 # Enable for message spying #topic = "cardano.#" -[startup] -topic = "cardano.sequence.start" - +# ============================================================================ +# Message Bus Configuration +# ============================================================================ [message-bus.external] class = "rabbit-mq" url = "amqp://127.0.0.1:5672/%2f" diff --git a/processes/omnibus/src/main.rs b/processes/omnibus/src/main.rs index c27df335..55d2cee8 100644 --- a/processes/omnibus/src/main.rs +++ b/processes/omnibus/src/main.rs @@ -27,6 +27,7 @@ use acropolis_module_mithril_snapshot_fetcher::MithrilSnapshotFetcher; use acropolis_module_parameters_state::ParametersState; use acropolis_module_peer_network_interface::PeerNetworkInterface; use acropolis_module_rest_blockfrost::BlockfrostREST; +use acropolis_module_snapshot_bootstrapper::SnapshotBootstrapper; use acropolis_module_spdd_state::SPDDState; use acropolis_module_spo_state::SPOState; use acropolis_module_stake_delta_filter::StakeDeltaFilter; @@ -98,10 +99,33 @@ pub async fn main() -> Result<()> { ); // Create the process - let mut process = Process::::create(config).await; + let mut process = Process::::create(config.clone()).await; + + // Get startup method from config + let startup_method = + config.get_string("startup.method").unwrap_or_else(|_| "snapshot".to_string()); + + info!("Using startup method: {}", startup_method); + + // Register bootstrap modules based on startup method + match startup_method.as_str() { + "genesis" => { + info!("Registering GenesisBootstrapper"); + GenesisBootstrapper::register(&mut process); + } + "snapshot" => { + info!("Registering SnapshotBootstrapper"); + SnapshotBootstrapper::register(&mut process); + } + _ => { + panic!( + "Invalid startup method: {}. Must be one of: genesis, snapshot", + startup_method + ); + } + } // Register modules - GenesisBootstrapper::register(&mut process); MithrilSnapshotFetcher::register(&mut process); BlockUnpacker::register(&mut process); PeerNetworkInterface::register(&mut process);