diff --git a/Cargo.lock b/Cargo.lock index 223f5bd9..11c677bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -295,6 +295,18 @@ dependencies = [ "tracing", ] +[[package]] +name = "acropolis_module_indexer" +version = "0.1.0" +dependencies = [ + "acropolis_common", + "anyhow", + "caryatid_sdk", + "config", + "tokio", + "tracing", +] + [[package]] name = "acropolis_module_mithril_snapshot_fetcher" version = "0.1.0" @@ -475,6 +487,23 @@ dependencies = [ "tracing", ] +[[package]] +name = "acropolis_process_indexer" +version = "0.1.0" +dependencies = [ + "acropolis_common", + "acropolis_module_block_unpacker", + "acropolis_module_genesis_bootstrapper", + "acropolis_module_indexer", + "acropolis_module_peer_network_interface", + "anyhow", + "caryatid_process", + "clap 4.5.51", + "config", + "tokio", + "tracing-subscriber", +] + [[package]] name = "acropolis_process_omnibus" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index f5b46041..20329a4b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ members = [ "processes/replayer", # All-inclusive process to replay messages "processes/golden_tests", # All-inclusive golden tests process "processes/tx_submitter_cli", # CLI wrapper for TX submitter + "processes/indexer", # Minimal example indexer ] resolver = "2" diff --git a/common/src/commands/mod.rs b/common/src/commands/mod.rs index 0824d7a9..14763b59 100644 --- a/common/src/commands/mod.rs +++ b/common/src/commands/mod.rs @@ -1 +1,2 @@ +pub mod sync; pub mod transactions; diff --git a/common/src/commands/sync.rs b/common/src/commands/sync.rs new file mode 100644 index 00000000..761aae7e --- /dev/null +++ b/common/src/commands/sync.rs @@ -0,0 +1,6 @@ +use crate::{BlockHash, Slot}; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum SyncCommand { + ChangeSyncPoint { slot: Slot, hash: BlockHash }, +} diff --git a/common/src/messages.rs b/common/src/messages.rs index 674187dc..df28e3ca 100644 --- a/common/src/messages.rs +++ b/common/src/messages.rs @@ -3,6 +3,7 @@ // We don't use these messages in the acropolis_common crate itself #![allow(dead_code)] +use crate::commands::sync::SyncCommand; use crate::commands::transactions::{TransactionsCommand, TransactionsCommandResponse}; use crate::genesis_values::GenesisValues; use crate::ledger_state::SPOState; @@ -453,6 +454,7 @@ pub enum StateQueryResponse { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum Command { Transactions(TransactionsCommand), + Sync(SyncCommand), } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] diff --git a/modules/indexer/Cargo.toml b/modules/indexer/Cargo.toml new file mode 100644 index 00000000..5cc4e0cd --- /dev/null +++ b/modules/indexer/Cargo.toml @@ -0,0 +1,22 @@ +# Acropolis indexer module + +[package] +name = "acropolis_module_indexer" +version = "0.1.0" +edition = "2021" +authors = ["William Hankins "] +description = "Core indexer logic" +license = "Apache-2.0" + +[dependencies] +acropolis_common = { path = "../../common" } + +caryatid_sdk = { workspace = true } + +anyhow = { workspace = true } +config = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } + +[lib] +path = "src/indexer.rs" diff --git a/modules/indexer/src/indexer.rs b/modules/indexer/src/indexer.rs new file mode 100644 index 00000000..9b18a931 --- /dev/null +++ b/modules/indexer/src/indexer.rs @@ -0,0 +1,69 @@ +//! Acropolis indexer module for Caryatid + +use acropolis_common::{ + commands::sync::SyncCommand, + hash::Hash, + messages::{Command, Message}, +}; +use anyhow::Result; +use caryatid_sdk::{module, Context, Module}; +use config::Config; +use std::{str::FromStr, sync::Arc}; +use tracing::info; + +// Configuration defaults +const DEFAULT_DYNAMIC_SYNC_TOPIC: (&str, &str) = + ("dynamic-sync-publisher-topic", "cardano.sync.command"); + +/// Indexer module +#[module( + message_type(Message), + name = "indexer", + description = "Core indexer module for indexer process" +)] +pub struct Indexer; + +impl Indexer { + /// Async initialisation + pub async fn init(&self, context: Arc>, config: Arc) -> Result<()> { + // Get configuration + let dynamic_sync_publisher_topic = config + .get_string(DEFAULT_DYNAMIC_SYNC_TOPIC.0) + .unwrap_or(DEFAULT_DYNAMIC_SYNC_TOPIC.1.to_string()); + info!("Creating dynamic sync publisher on '{dynamic_sync_publisher_topic}'"); + + let ctx = context.clone(); + + // This is a placeholder to test dynamic sync + context.run(async move { + let example = SyncCommand::ChangeSyncPoint { + slot: 4492799, + hash: Hash::from_str( + "f8084c61b6a238acec985b59310b6ecec49c0ab8352249afd7268da5cff2a457", + ) + .expect("Valid hash"), + }; + + // Initial sync message (This will be read from config for first sync and from DB on subsequent runs) + ctx.message_bus + .publish( + &dynamic_sync_publisher_topic, + Arc::new(Message::Command(Command::Sync(example.clone()))), + ) + .await + .unwrap(); + + // Simulate a later sync command to reset sync point to where we started + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + + ctx.message_bus + .publish( + &dynamic_sync_publisher_topic, + Arc::new(Message::Command(Command::Sync(example))), + ) + .await + .unwrap(); + }); + Ok(()) + } +} diff --git a/modules/peer_network_interface/config.default.toml b/modules/peer_network_interface/config.default.toml index c48d02dc..5babba18 100644 --- a/modules/peer_network_interface/config.default.toml +++ b/modules/peer_network_interface/config.default.toml @@ -4,6 +4,8 @@ block-topic = "cardano.block.available" snapshot-completion-topic = "cardano.snapshot.complete" # The topic to wait for when listening for genesis values from another module genesis-completion-topic = "cardano.sequence.bootstrapped" +# The topic to listen on for runtime sync commands +sync-command-topic = "cardano.sync.command" # Upstream node connections node-addresses = [ @@ -19,6 +21,7 @@ magic-number = 764824073 # - "tip": sync from the very end of the chain # - "cache": replay messages from a local filesystem cache, then sync from the point right after that cache. # - "snapshot": wait for another module to restore from a snapshot, then sync from the point right after that snapshot. +# - "dynamic": awaits a sync command to begin fetching blocks, can change sync point at runtime. sync-point = "snapshot" # The cache dir to use when sync-point is "cache" cache-dir = "upstream-cache" \ No newline at end of file diff --git a/modules/peer_network_interface/src/configuration.rs b/modules/peer_network_interface/src/configuration.rs index 9f02ebb0..40461a45 100644 --- a/modules/peer_network_interface/src/configuration.rs +++ b/modules/peer_network_interface/src/configuration.rs @@ -11,6 +11,7 @@ pub enum SyncPoint { Tip, Cache, Snapshot, + Dynamic, } #[derive(serde::Deserialize)] @@ -20,6 +21,7 @@ pub struct InterfaceConfig { pub sync_point: SyncPoint, pub snapshot_completion_topic: String, pub genesis_completion_topic: String, + pub sync_command_topic: String, pub node_addresses: Vec, pub magic_number: u64, pub cache_dir: PathBuf, diff --git a/modules/peer_network_interface/src/network.rs b/modules/peer_network_interface/src/network.rs index 450eaa97..6f3d7306 100644 --- a/modules/peer_network_interface/src/network.rs +++ b/modules/peer_network_interface/src/network.rs @@ -55,6 +55,7 @@ pub struct NetworkManager { events_sender: mpsc::Sender, block_sink: BlockSink, published_blocks: u64, + cmd_rx: Option>, } impl NetworkManager { @@ -63,6 +64,7 @@ impl NetworkManager { events: mpsc::Receiver, events_sender: mpsc::Sender, block_sink: BlockSink, + cmd_rx: Option>, ) -> Self { Self { network_magic, @@ -73,19 +75,53 @@ impl NetworkManager { events_sender, block_sink, published_blocks: 0, + cmd_rx, } } pub async fn run(mut self) -> Result<()> { - while let Some(event) = self.events.recv().await { - match event { - NetworkEvent::PeerUpdate { peer, event } => { - self.handle_peer_update(peer, event); - self.publish_blocks().await?; + loop { + tokio::select! { + cmd = async { + if let Some(rx) = &mut self.cmd_rx { + rx.recv().await + } else { + std::future::pending().await + } + } => { + self.on_sync_cmd(cmd).await?; + }, + + event = self.events.recv() => { + self.on_network_event(event).await?; } } } - bail!("event sink closed") + } + + async fn on_sync_cmd(&mut self, point: Option) -> Result<()> { + let Some(point) = point else { + return Ok(()); + }; + + self.chain = ChainState::new(); + + for peer in self.peers.values_mut() { + peer.reqs.clear(); + peer.find_intersect(vec![point.clone()]); + } + + Ok(()) + } + + async fn on_network_event(&mut self, event: Option) -> Result<()> { + let Some(NetworkEvent::PeerUpdate { peer, event }) = event else { + bail!("event sink closed"); + }; + + self.handle_peer_update(peer, event); + self.publish_blocks().await?; + Ok(()) } pub fn handle_new_connection(&mut self, address: String, delay: Duration) { diff --git a/modules/peer_network_interface/src/peer_network_interface.rs b/modules/peer_network_interface/src/peer_network_interface.rs index bea47e78..11c8540f 100644 --- a/modules/peer_network_interface/src/peer_network_interface.rs +++ b/modules/peer_network_interface/src/peer_network_interface.rs @@ -5,8 +5,9 @@ mod network; use acropolis_common::{ BlockInfo, BlockStatus, + commands::sync::SyncCommand, genesis_values::GenesisValues, - messages::{CardanoMessage, Message, RawBlockMessage}, + messages::{CardanoMessage, Command, Message, RawBlockMessage}, upstream_cache::{UpstreamCache, UpstreamCacheRecord}, }; use anyhow::{Result, bail}; @@ -14,7 +15,7 @@ use caryatid_sdk::{Context, Module, Subscription, module}; use config::Config; use pallas::network::miniprotocols::Point; use tokio::sync::mpsc; -use tracing::{error, warn}; +use tracing::{error, info, warn}; use std::{path::Path, sync::Arc, time::Duration}; @@ -44,6 +45,13 @@ impl PeerNetworkInterface { _ => None, }; + // Create background task to foward sync commands to NetworkManager + let mut cmd_rx = if cfg.sync_point == SyncPoint::Dynamic { + Some(Self::spawn_command_forwarder(context.clone(), &cfg.sync_command_topic).await?) + } else { + None + }; + context.clone().run(async move { let genesis_values = if let Some(mut sub) = genesis_complete { Self::wait_genesis_completion(&mut sub) @@ -82,12 +90,12 @@ impl PeerNetworkInterface { let manager = match cfg.sync_point { SyncPoint::Origin => { - let mut manager = Self::init_manager(cfg, sink); + let mut manager = Self::init_manager(cfg, sink, None); manager.sync_to_point(Point::Origin); manager } SyncPoint::Tip => { - let mut manager = Self::init_manager(cfg, sink); + let mut manager = Self::init_manager(cfg, sink, None); if let Err(error) = manager.sync_to_tip().await { warn!("could not sync to tip: {error:#}"); return; @@ -95,7 +103,7 @@ impl PeerNetworkInterface { manager } SyncPoint::Cache => { - let mut manager = Self::init_manager(cfg, sink); + let mut manager = Self::init_manager(cfg, sink, None); manager.sync_to_point(cache_sync_point); manager } @@ -108,7 +116,7 @@ impl PeerNetworkInterface { let (epoch, _) = sink.genesis_values.slot_to_epoch(slot); sink.last_epoch = Some(epoch); } - let mut manager = Self::init_manager(cfg, sink); + let mut manager = Self::init_manager(cfg, sink, None); manager.sync_to_point(point); manager } @@ -118,6 +126,36 @@ impl PeerNetworkInterface { } } } + SyncPoint::Dynamic => { + let mut rx = match cmd_rx.take() { + Some(rx) => rx, + None => { + warn!("Dynamic mode configured but cmd_rx is missing"); + return; + } + }; + + let point = match Self::wait_sync_command(&mut rx).await { + Ok(Point::Specific(slot, hash)) => { + let (epoch, _) = sink.genesis_values.slot_to_epoch(slot); + sink.last_epoch = Some(epoch); + info!("Dynamic sync starting at slot {} (epoch {})", slot, epoch); + Point::Specific(slot, hash) + } + Ok(Point::Origin) => { + warn!("Dynamic sync received Point::Origin; ignoring"); + return; + } + Err(err) => { + warn!("Failed to receive initial sync command: {err:#}"); + return; + } + }; + + let mut manager = Self::init_manager(cfg, sink, Some(rx)); + manager.sync_to_point(point); + manager + } }; if let Err(err) = manager.run().await { @@ -128,9 +166,14 @@ impl PeerNetworkInterface { Ok(()) } - fn init_manager(cfg: InterfaceConfig, sink: BlockSink) -> NetworkManager { + fn init_manager( + cfg: InterfaceConfig, + sink: BlockSink, + cmd_rx: Option>, + ) -> NetworkManager { let (events_sender, events) = mpsc::channel(1024); - let mut manager = NetworkManager::new(cfg.magic_number, events, events_sender, sink); + let mut manager = + NetworkManager::new(cfg.magic_number, events, events_sender, sink, cmd_rx); for address in cfg.node_addresses { manager.handle_new_connection(address, Duration::ZERO); } @@ -183,6 +226,37 @@ impl PeerNetworkInterface { msg => bail!("Unexpected message in snapshot completion topic: {msg:?}"), } } + + async fn wait_sync_command(rx: &mut mpsc::Receiver) -> Result { + match rx.recv().await { + Some(point) => Ok(point), + None => Err(anyhow::anyhow!( + "Channel closed before receiving a start point" + )), + } + } + + async fn spawn_command_forwarder( + context: Arc>, + topic: &str, + ) -> Result> { + let (tx, rx) = mpsc::channel::(32); + + let mut sub = context.subscribe(topic).await?; + tokio::spawn(async move { + while let Ok((_, msg)) = sub.read().await { + if let Message::Command(Command::Sync(SyncCommand::ChangeSyncPoint { + slot, + hash, + })) = msg.as_ref() + { + let _ = tx.send(Point::new(*slot, hash.to_vec())).await; + } + } + }); + + Ok(rx) + } } struct BlockSink { diff --git a/processes/indexer/Cargo.toml b/processes/indexer/Cargo.toml new file mode 100644 index 00000000..3a368efb --- /dev/null +++ b/processes/indexer/Cargo.toml @@ -0,0 +1,24 @@ +# Acropolis indexer process +[package] +name = "acropolis_process_indexer" +version = "0.1.0" +edition = "2021" +authors = ["William Hankins "] +description = "Acropolis indexer process containing core modules" +license = "Apache-2.0" + +[dependencies] +acropolis_common = { path = "../../common" } +acropolis_module_genesis_bootstrapper = { path = "../../modules/genesis_bootstrapper" } +acropolis_module_peer_network_interface = { path = "../../modules/peer_network_interface" } +acropolis_module_block_unpacker = { path = "../../modules/block_unpacker" } +acropolis_module_indexer = { path = "../../modules/indexer" } + +caryatid_process = { workspace = true } + +anyhow = { workspace = true } +clap = { workspace = true } +config = { workspace = true } +tracing-subscriber = { version = "0.3.20", features = ["registry", "env-filter"] } +tokio = { workspace = true } + diff --git a/processes/indexer/indexer.toml b/processes/indexer/indexer.toml new file mode 100644 index 00000000..013c8dac --- /dev/null +++ b/processes/indexer/indexer.toml @@ -0,0 +1,32 @@ +# Top-level configuration for Acropolis indexer process + +[module.genesis-bootstrapper] + +[module.peer-network-interface] +sync-point = "dynamic" +node-addresses = [ + "backbone.cardano.iog.io:3001", + "backbone.mainnet.cardanofoundation.org:3001", + "backbone.mainnet.emurgornd.com:3001", +] +magic-number = 764824073 + +[module.block-unpacker] + +[module.indexer] + +[startup] +topic = "cardano.sequence.start" + +[message-bus.internal] +class = "in-memory" +workers = 50 +dispatch-queue-size = 1000 +worker-queue-size = 100 +bulk-block-capacity = 50 +bulk-resume-capacity = 75 + +# Message routing +[[message-router.route]] # Everything is internal only +pattern = "#" +bus = "internal" diff --git a/processes/indexer/src/main.rs b/processes/indexer/src/main.rs new file mode 100644 index 00000000..50a3b458 --- /dev/null +++ b/processes/indexer/src/main.rs @@ -0,0 +1,42 @@ +use acropolis_common::messages::Message; +use acropolis_module_indexer::Indexer; +use anyhow::Result; +use caryatid_process::Process; +use clap::Parser; +use config::{Config, Environment, File}; +use std::sync::Arc; + +use acropolis_module_block_unpacker::BlockUnpacker; +use acropolis_module_genesis_bootstrapper::GenesisBootstrapper; +use acropolis_module_peer_network_interface::PeerNetworkInterface; + +#[derive(Debug, clap::Parser)] +struct Args { + #[arg(long, value_name = "PATH", default_value = "indexer.toml")] + config: String, +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + + tracing_subscriber::fmt().with_env_filter("info").init(); + + let config = Arc::new( + Config::builder() + .add_source(File::with_name(&args.config)) + .add_source(Environment::with_prefix("ACROPOLIS")) + .build() + .unwrap(), + ); + + let mut process = Process::::create(config).await; + + GenesisBootstrapper::register(&mut process); + BlockUnpacker::register(&mut process); + PeerNetworkInterface::register(&mut process); + Indexer::register(&mut process); + + process.run().await?; + Ok(()) +}