Skip to content
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ members = [
"processes/replayer", # All-inclusive process to replay messages
"processes/golden_tests", # All-inclusive golden tests process
"processes/tx_submitter_cli", # CLI wrapper for TX submitter
"processes/indexer", # Minimal example indexer
]
resolver = "2"

Expand Down
1 change: 1 addition & 0 deletions common/src/commands/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod sync;
pub mod transactions;
6 changes: 6 additions & 0 deletions common/src/commands/sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
use crate::{BlockHash, Slot};

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum SyncCommand {
ChangeSyncPoint { slot: Slot, hash: BlockHash },
}
Comment on lines +4 to +6
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub enum SyncCommand {
ChangeSyncPoint { slot: Slot, hash: BlockHash },
}
pub enum ChainSyncCommand {
FindIntersect { slot: Slot, hash: BlockHash },
}

Not a huge deal, but IMO "sync" is a bit of a generic name when we're interacting with a specific miniprotocol

2 changes: 2 additions & 0 deletions common/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// We don't use these messages in the acropolis_common crate itself
#![allow(dead_code)]

use crate::commands::sync::SyncCommand;
use crate::commands::transactions::{TransactionsCommand, TransactionsCommandResponse};
use crate::genesis_values::GenesisValues;
use crate::ledger_state::SPOState;
Expand Down Expand Up @@ -453,6 +454,7 @@ pub enum StateQueryResponse {
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum Command {
Transactions(TransactionsCommand),
Sync(SyncCommand),
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
Expand Down
22 changes: 22 additions & 0 deletions modules/indexer/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Acropolis indexer module

[package]
name = "acropolis_module_indexer"
version = "0.1.0"
edition = "2021"
authors = ["William Hankins <[email protected]>"]
description = "Core indexer logic"
license = "Apache-2.0"

[dependencies]
acropolis_common = { path = "../../common" }

caryatid_sdk = { workspace = true }

anyhow = { workspace = true }
config = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }

[lib]
path = "src/indexer.rs"
69 changes: 69 additions & 0 deletions modules/indexer/src/indexer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
//! Acropolis indexer module for Caryatid
use acropolis_common::{
commands::sync::SyncCommand,
hash::Hash,
messages::{Command, Message},
};
use anyhow::Result;
use caryatid_sdk::{module, Context, Module};
use config::Config;
use std::{str::FromStr, sync::Arc};
use tracing::info;

// Configuration defaults
const DEFAULT_DYNAMIC_SYNC_TOPIC: (&str, &str) =
("dynamic-sync-publisher-topic", "cardano.sync.command");

Comment on lines +14 to +17
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we follow the strongly-typed configuration pattern which PeerNetworkInterface is using? I'd like that pattern to infect as much of the system as possible.

/// Historical Epochs State module
#[module(
message_type(Message),
name = "indexer",
description = "Core indexer module for indexer process"
)]
pub struct Indexer;

impl Indexer {
/// Async initialisation
pub async fn init(&self, context: Arc<Context<Message>>, config: Arc<Config>) -> Result<()> {
// Get configuration
let dynamic_sync_publisher_topic = config
.get_string(DEFAULT_DYNAMIC_SYNC_TOPIC.0)
.unwrap_or(DEFAULT_DYNAMIC_SYNC_TOPIC.1.to_string());
info!("Creating dynamic sync publisher on '{dynamic_sync_publisher_topic}'");

let ctx = context.clone();

// This is a placeholder to test dynamic sync
context.run(async move {
let example = SyncCommand::ChangeSyncPoint {
slot: 4492799,
hash: Hash::from_str(
"f8084c61b6a238acec985b59310b6ecec49c0ab8352249afd7268da5cff2a457",
)
.expect("Valid hash"),
};

// Inital sync message (This will be read from config for first sync and from DB on subsequent runs)
ctx.message_bus
.publish(
&dynamic_sync_publisher_topic,
Arc::new(Message::Command(Command::Sync(example.clone()))),
)
.await
.unwrap();

// Simulate a later sync command to reset sync point to where we started
tokio::time::sleep(std::time::Duration::from_secs(5)).await;

ctx.message_bus
.publish(
&dynamic_sync_publisher_topic,
Arc::new(Message::Command(Command::Sync(example))),
)
.await
.unwrap();
});
Ok(())
}
}
3 changes: 3 additions & 0 deletions modules/peer_network_interface/config.default.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ block-topic = "cardano.block.available"
snapshot-completion-topic = "cardano.snapshot.complete"
# The topic to wait for when listening for genesis values from another module
genesis-completion-topic = "cardano.sequence.bootstrapped"
# The topic to listen on for runtime sync commands
sync-command-topic = "cardano.sync.command"

# Upstream node connections
node-addresses = [
Expand All @@ -19,6 +21,7 @@ magic-number = 764824073
# - "tip": sync from the very end of the chain
# - "cache": replay messages from a local filesystem cache, then sync from the point right after that cache.
# - "snapshot": wait for another module to restore from a snapshot, then sync from the point right after that snapshot.
# - "dynamic": awaits a sync command to begin fetching blocks, can change sync point at runtime.
sync-point = "snapshot"
# The cache dir to use when sync-point is "cache"
cache-dir = "upstream-cache"
2 changes: 2 additions & 0 deletions modules/peer_network_interface/src/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub enum SyncPoint {
Tip,
Cache,
Snapshot,
Dynamic,
}

#[derive(serde::Deserialize)]
Expand All @@ -20,6 +21,7 @@ pub struct InterfaceConfig {
pub sync_point: SyncPoint,
pub snapshot_completion_topic: String,
pub genesis_completion_topic: String,
pub sync_command_topic: String,
pub node_addresses: Vec<String>,
pub magic_number: u64,
pub cache_dir: PathBuf,
Expand Down
48 changes: 42 additions & 6 deletions modules/peer_network_interface/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub struct NetworkManager {
events_sender: mpsc::Sender<NetworkEvent>,
block_sink: BlockSink,
published_blocks: u64,
cmd_rx: Option<mpsc::Receiver<Point>>,
}

impl NetworkManager {
Expand All @@ -63,6 +64,7 @@ impl NetworkManager {
events: mpsc::Receiver<NetworkEvent>,
events_sender: mpsc::Sender<NetworkEvent>,
block_sink: BlockSink,
cmd_rx: Option<mpsc::Receiver<Point>>,
) -> Self {
Self {
network_magic,
Expand All @@ -73,19 +75,53 @@ impl NetworkManager {
events_sender,
block_sink,
published_blocks: 0,
cmd_rx,
}
}

pub async fn run(mut self) -> Result<()> {
while let Some(event) = self.events.recv().await {
match event {
NetworkEvent::PeerUpdate { peer, event } => {
self.handle_peer_update(peer, event);
self.publish_blocks().await?;
loop {
tokio::select! {
cmd = async {
if let Some(rx) = &mut self.cmd_rx {
rx.recv().await
} else {
std::future::pending().await
}
} => {
self.on_sync_cmd(cmd).await?;
},

event = self.events.recv() => {
self.on_network_event(event).await?;
}
}
}
bail!("event sink closed")
}

async fn on_sync_cmd(&mut self, point: Option<Point>) -> Result<()> {
let Some(point) = point else {
return Ok(());
};

self.chain = ChainState::new();

for peer in self.peers.values_mut() {
peer.reqs.clear();
peer.find_intersect(vec![point.clone()]);
}

Ok(())
}

async fn on_network_event(&mut self, event: Option<NetworkEvent>) -> Result<()> {
let Some(NetworkEvent::PeerUpdate { peer, event }) = event else {
bail!("event sink closed");
};

self.handle_peer_update(peer, event);
self.publish_blocks().await?;
Ok(())
}

pub fn handle_new_connection(&mut self, address: String, delay: Duration) {
Expand Down
Loading
Loading