-
Notifications
You must be signed in to change notification settings - Fork 5
feat: Add dynamic sync mode to peer_network_interface #394
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
237e8d2
d85e915
64a302f
65125b3
c1ae681
e385823
9668f14
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1,2 @@ | ||
| pub mod sync; | ||
| pub mod transactions; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| use crate::{BlockHash, Slot}; | ||
|
|
||
| #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] | ||
| pub enum SyncCommand { | ||
| ChangeSyncPoint { slot: Slot, hash: BlockHash }, | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| # Acropolis indexer module | ||
|
|
||
| [package] | ||
| name = "acropolis_module_indexer" | ||
| version = "0.1.0" | ||
| edition = "2021" | ||
| authors = ["William Hankins <[email protected]>"] | ||
| description = "Core indexer logic" | ||
| license = "Apache-2.0" | ||
|
|
||
| [dependencies] | ||
| acropolis_common = { path = "../../common" } | ||
|
|
||
| caryatid_sdk = { workspace = true } | ||
|
|
||
| anyhow = { workspace = true } | ||
| config = { workspace = true } | ||
| tokio = { workspace = true } | ||
| tracing = { workspace = true } | ||
|
|
||
| [lib] | ||
| path = "src/indexer.rs" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,69 @@ | ||
| //! Acropolis indexer module for Caryatid | ||
|
|
||
| use acropolis_common::{ | ||
| commands::sync::SyncCommand, | ||
| hash::Hash, | ||
| messages::{Command, Message}, | ||
| }; | ||
| use anyhow::Result; | ||
| use caryatid_sdk::{module, Context, Module}; | ||
| use config::Config; | ||
| use std::{str::FromStr, sync::Arc}; | ||
| use tracing::info; | ||
|
|
||
| // Configuration defaults | ||
| const DEFAULT_DYNAMIC_SYNC_TOPIC: (&str, &str) = | ||
| ("dynamic-sync-publisher-topic", "cardano.sync.command"); | ||
|
|
||
|
Comment on lines
+14
to
+17
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we follow the strongly-typed configuration pattern which PeerNetworkInterface is using? I'd like that pattern to infect as much of the system as possible. |
||
| /// Indexer module | ||
| #[module( | ||
| message_type(Message), | ||
| name = "indexer", | ||
| description = "Core indexer module for indexer process" | ||
| )] | ||
| pub struct Indexer; | ||
|
|
||
| impl Indexer { | ||
| /// Async initialisation | ||
| pub async fn init(&self, context: Arc<Context<Message>>, config: Arc<Config>) -> Result<()> { | ||
| // Get configuration | ||
| let dynamic_sync_publisher_topic = config | ||
| .get_string(DEFAULT_DYNAMIC_SYNC_TOPIC.0) | ||
| .unwrap_or(DEFAULT_DYNAMIC_SYNC_TOPIC.1.to_string()); | ||
| info!("Creating dynamic sync publisher on '{dynamic_sync_publisher_topic}'"); | ||
|
|
||
| let ctx = context.clone(); | ||
|
|
||
| // This is a placeholder to test dynamic sync | ||
| context.run(async move { | ||
| let example = SyncCommand::ChangeSyncPoint { | ||
| slot: 4492799, | ||
| hash: Hash::from_str( | ||
| "f8084c61b6a238acec985b59310b6ecec49c0ab8352249afd7268da5cff2a457", | ||
| ) | ||
| .expect("Valid hash"), | ||
| }; | ||
|
|
||
| // Initial sync message (This will be read from config for first sync and from DB on subsequent runs) | ||
| ctx.message_bus | ||
| .publish( | ||
| &dynamic_sync_publisher_topic, | ||
| Arc::new(Message::Command(Command::Sync(example.clone()))), | ||
| ) | ||
| .await | ||
| .unwrap(); | ||
|
|
||
| // Simulate a later sync command to reset sync point to where we started | ||
| tokio::time::sleep(std::time::Duration::from_secs(5)).await; | ||
|
|
||
| ctx.message_bus | ||
| .publish( | ||
| &dynamic_sync_publisher_topic, | ||
| Arc::new(Message::Command(Command::Sync(example))), | ||
| ) | ||
| .await | ||
| .unwrap(); | ||
| }); | ||
| Ok(()) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a huge deal, but IMO "sync" is a bit of a generic name when we're interacting with a specific miniprotocol