From f1ee59aa0eb00dd7ac1a6046c0a40fe99ad76bc4 Mon Sep 17 00:00:00 2001 From: Krisztian Pinter Date: Fri, 7 Nov 2025 23:22:53 +0100 Subject: [PATCH 01/12] some fixes + diff data source --- Cargo.lock | 2 + Cargo.toml | 1 + demo/node/src/data_sources.rs | 26 +- demo/node/src/diff_sources.rs | 238 ++++++++++++ demo/node/src/lib.rs | 1 + demo/node/src/main.rs | 1 + toolkit/data-sources/db-sync/Cargo.toml | 2 +- .../db-sync/src/candidates/cached.rs | 3 + .../db-sync/src/candidates/mod.rs | 2 +- toolkit/data-sources/db-sync/src/lib.rs | 2 +- toolkit/data-sources/dolos/Cargo.toml | 2 + toolkit/data-sources/dolos/src/block.rs | 350 ++++++++++++++++++ toolkit/data-sources/dolos/src/candidate.rs | 6 +- toolkit/data-sources/dolos/src/lib.rs | 13 + toolkit/data-sources/dolos/src/mc_hash.rs | 31 +- toolkit/sidechain/domain/src/lib.rs | 2 +- 16 files changed, 658 insertions(+), 24 deletions(-) create mode 100644 demo/node/src/diff_sources.rs create mode 100644 toolkit/data-sources/dolos/src/block.rs diff --git a/Cargo.lock b/Cargo.lock index 6fc9bb257f..3d74c3df0c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7581,6 +7581,8 @@ dependencies = [ "authority-selection-inherents", "blockfrost-openapi", "cardano-serialization-lib", + "chrono", + "derive-new", "figment", "futures", "itertools 0.14.0", diff --git a/Cargo.toml b/Cargo.toml index 760def5156..40a957fd2b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -188,6 +188,7 @@ fork-tree = { version = "13.0.1" } ureq = { version = "3.1.2", default-features = false } url = { version = "2.5.7", default-features = false } blockfrost-openapi = { version = "0.1.75", default-features = false } +chrono = { version = "0.4.31", default-features = false } # substrate dependencies frame-benchmarking = { default-features = false, git = "https://github.com/paritytech/polkadot-sdk.git", tag = "polkadot-stable2509" } diff --git a/demo/node/src/data_sources.rs b/demo/node/src/data_sources.rs index faec588e96..523e44de7a 100644 --- a/demo/node/src/data_sources.rs +++ b/demo/node/src/data_sources.rs @@ -9,6 +9,8 @@ use sp_governed_map::GovernedMapDataSource; use sp_partner_chains_bridge::TokenBridgeDataSource; use std::{error::Error, sync::Arc}; +use crate::diff_sources; + pub const DATA_SOURCE_VAR: &str = "CARDANO_DATA_SOURCE"; #[derive(Clone, Debug, PartialEq)] @@ -16,6 +18,7 @@ pub enum DataSourceType { DbSync, Mock, Dolos, + Diff, } impl DataSourceType { @@ -34,7 +37,7 @@ impl std::str::FromStr for DataSourceType { match s.to_lowercase().as_str() { "db-sync" => Ok(DataSourceType::DbSync), "mock" => Ok(DataSourceType::Mock), - "dolos" => Ok(DataSourceType::Dolos), + "dolos" => Ok(DataSourceType::Diff), // TODO restore this to dolos _ => { Err(format!("Invalid data source type: {}. Valid options: db-sync, mock, dolos", s)) }, @@ -48,6 +51,7 @@ impl std::fmt::Display for DataSourceType { DataSourceType::DbSync => write!(f, "db-sync"), DataSourceType::Mock => write!(f, "mock"), DataSourceType::Dolos => write!(f, "dolos"), + DataSourceType::Diff => write!(f, "diff"), } } } @@ -84,6 +88,14 @@ pub(crate) async fn create_cached_data_sources( DataSourceType::Dolos => create_dolos_data_sources(metrics_opt).await.map_err(|err| { ServiceError::Application(format!("Failed to create dolos data sources: {err}").into()) }), + + DataSourceType::Diff => { + diff_sources::create_diff_data_sources(metrics_opt).await.map_err(|err| { + ServiceError::Application( + format!("Failed to create dolos data sources: {err}").into(), + ) + }) + }, } } @@ -107,10 +119,14 @@ pub async fn create_dolos_data_sources( ) -> std::result::Result> { let dolos_client = partner_chains_dolos_data_sources::get_connection_from_env()?; let pool = partner_chains_db_sync_data_sources::get_connection_from_env().await?; - let block = Arc::new( + let block_dbsync = Arc::new( partner_chains_db_sync_data_sources::BlockDataSourceImpl::new_from_env(pool.clone()) .await?, ); + let block_dolos = Arc::new( + partner_chains_dolos_data_sources::BlockDataSourceImpl::new_from_env(dolos_client.clone()) + .await?, + ); Ok(DataSources { sidechain_rpc: Arc::new( partner_chains_dolos_data_sources::SidechainRpcDataSourceImpl::new( @@ -118,7 +134,7 @@ pub async fn create_dolos_data_sources( ), ), mc_hash: Arc::new(partner_chains_dolos_data_sources::McHashDataSourceImpl::new( - dolos_client.clone(), + block_dolos.clone(), )), authority_selection: Arc::new( partner_chains_dolos_data_sources::AuthoritySelectionDataSourceImpl::new( @@ -137,7 +153,7 @@ pub async fn create_dolos_data_sources( pool.clone(), metrics_opt.clone(), GOVERNED_MAP_CACHE_SIZE, - block.clone(), + block_dbsync.clone(), ) .await?, ), @@ -145,7 +161,7 @@ pub async fn create_dolos_data_sources( partner_chains_db_sync_data_sources::CachedTokenBridgeDataSourceImpl::new( pool, metrics_opt, - block, + block_dbsync, BRIDGE_TRANSFER_CACHE_LOOKAHEAD, ), ), diff --git a/demo/node/src/diff_sources.rs b/demo/node/src/diff_sources.rs new file mode 100644 index 0000000000..292d7588b9 --- /dev/null +++ b/demo/node/src/diff_sources.rs @@ -0,0 +1,238 @@ +use authority_selection_inherents::AuthoritySelectionDataSource; +use pallet_sidechain_rpc::SidechainRpcDataSource; +use partner_chains_data_source_metrics::McFollowerMetrics; +use sidechain_domain::*; +use sidechain_mc_hash::McHashDataSource; +use std::{error::Error, sync::Arc}; + +use crate::data_sources::*; + +struct SidechainRpcDataSourceImplDiff { + dolos: Arc, + dbsync: Arc, +} + +#[async_trait::async_trait] +impl SidechainRpcDataSource for SidechainRpcDataSourceImplDiff { + async fn get_latest_block_info( + &self, + ) -> Result> { + let reference = self.dbsync.get_latest_block_info().await?; + let dolos_output = self.dolos.get_latest_block_info().await?; + if reference != dolos_output { + println!( + ">>>>>>>>>>>>>>>>>>>>>>>>>>>> SidechainRpcDataSource::get_latest_block_info mismatch: dbs: {reference:?} dolos: {dolos_output:?}" + ) + } + Ok(reference) + } +} + +struct McHashDataSourceImplDiff { + dolos: Arc, + dbsync: Arc, +} + +#[async_trait::async_trait] +impl McHashDataSource for McHashDataSourceImplDiff { + async fn get_latest_stable_block_for( + &self, + reference_timestamp: sp_timestamp::Timestamp, + ) -> Result, Box> { + let reference = self.dbsync.get_latest_stable_block_for(reference_timestamp).await?; + let dolos_output = self.dolos.get_latest_stable_block_for(reference_timestamp).await?; + if reference != dolos_output { + println!( + ">>>>>>>>>>>>>>>>>>>>>>>>>>>> McHashDataSource::get_latest_stable_block_for mismatch: dbs: {reference:?} dolos: {dolos_output:?}" + ) + } + Ok(reference) + } + + async fn get_stable_block_for( + &self, + hash: McBlockHash, + reference_timestamp: sp_timestamp::Timestamp, + ) -> Result, Box> { + let reference = self.dbsync.get_stable_block_for(hash.clone(), reference_timestamp).await?; + let dolos_output = self.dolos.get_stable_block_for(hash, reference_timestamp).await?; + if reference != dolos_output { + println!( + ">>>>>>>>>>>>>>>>>>>>>>>>>>>> McHashDataSource::get_stable_block_for mismatch: dbs: {reference:?} dolos: {dolos_output:?}" + ) + } + Ok(reference) + } + + async fn get_block_by_hash( + &self, + hash: McBlockHash, + ) -> Result, Box> { + let reference = self.dbsync.get_block_by_hash(hash.clone()).await?; + let dolos_output = self.dolos.get_block_by_hash(hash).await?; + if reference != dolos_output { + println!( + ">>>>>>>>>>>>>>>>>>>>>>>>>>>> McHashDataSource::get_block_by_hash mismatch: dbs: {reference:?} dolos: {dolos_output:?}" + ) + } + Ok(reference) + } +} + +struct AuthoritySelectionDataSourceImplDiff { + dolos: Arc, + dbsync: Arc, +} + +#[async_trait::async_trait] +impl AuthoritySelectionDataSource for AuthoritySelectionDataSourceImplDiff { + async fn get_ariadne_parameters( + &self, + epoch_number: McEpochNumber, + d_parameter_policy: PolicyId, + permissioned_candidate_policy: PolicyId, + ) -> Result< + authority_selection_inherents::AriadneParameters, + Box, + > { + let reference = self + .dbsync + .get_ariadne_parameters( + epoch_number, + d_parameter_policy.clone(), + permissioned_candidate_policy.clone(), + ) + .await?; + let dolos_output = self + .dolos + .get_ariadne_parameters(epoch_number, d_parameter_policy, permissioned_candidate_policy) + .await?; + if reference != dolos_output { + println!( + ">>>>>>>>>>>>>>>>>>>>>>>>>>>> McHashDataSource::get_ariadne_parameters mismatch: dbs: {reference:?} dolos: {dolos_output:?}" + ) + } + Ok(reference) + } + + async fn get_candidates( + &self, + epoch_number: McEpochNumber, + committee_candidate_address: MainchainAddress, + ) -> Result, Box> { + let reference = self + .dbsync + .get_candidates(epoch_number, committee_candidate_address.clone()) + .await?; + let dolos_output = + self.dolos.get_candidates(epoch_number, committee_candidate_address).await?; + if reference != dolos_output { + println!( + ">>>>>>>>>>>>>>>>>>>>>>>>>>>> McHashDataSource::get_candidates mismatch: dbs: {reference:?} dolos: {dolos_output:?}" + ) + } + Ok(reference) + } + + async fn get_epoch_nonce( + &self, + epoch_number: McEpochNumber, + ) -> Result, Box> { + let reference = self.dbsync.get_epoch_nonce(epoch_number).await?; + let dolos_output = self.dolos.get_epoch_nonce(epoch_number).await?; + if reference != dolos_output { + println!( + ">>>>>>>>>>>>>>>>>>>>>>>>>>>> McHashDataSource::get_epoch_nonce mismatch: dbs: {reference:?} dolos: {dolos_output:?}" + ) + } + Ok(reference) + } + + async fn data_epoch( + &self, + for_epoch: McEpochNumber, + ) -> Result> { + let reference = self.dbsync.data_epoch(for_epoch).await?; + let dolos_output = self.dolos.data_epoch(for_epoch).await?; + if reference != dolos_output { + println!( + ">>>>>>>>>>>>>>>>>>>>>>>>>>>> McHashDataSource::data_epoch mismatch: dbs: {reference:?} dolos: {dolos_output:?}" + ) + } + Ok(reference) + } +} + +pub async fn create_diff_data_sources( + metrics_opt: Option, +) -> std::result::Result> { + let dolos_client = partner_chains_dolos_data_sources::get_connection_from_env()?; + let pool = partner_chains_db_sync_data_sources::get_connection_from_env().await?; + let block_dbsync = Arc::new( + partner_chains_db_sync_data_sources::BlockDataSourceImpl::new_from_env(pool.clone()) + .await?, + ); + let block_dolos = Arc::new( + partner_chains_dolos_data_sources::BlockDataSourceImpl::new_from_env(dolos_client.clone()) + .await?, + ); + Ok(DataSources { + sidechain_rpc: Arc::new(SidechainRpcDataSourceImplDiff { + dolos: Arc::new(partner_chains_dolos_data_sources::SidechainRpcDataSourceImpl::new( + dolos_client.clone(), + )), + dbsync: Arc::new(partner_chains_db_sync_data_sources::SidechainRpcDataSourceImpl::new( + block_dbsync.clone(), + metrics_opt.clone(), + )), + }), + mc_hash: Arc::new(McHashDataSourceImplDiff { + dolos: Arc::new(partner_chains_dolos_data_sources::McHashDataSourceImpl::new( + block_dolos.clone(), + )), + dbsync: Arc::new(partner_chains_db_sync_data_sources::McHashDataSourceImpl::new( + block_dbsync.clone(), + metrics_opt.clone(), + )), + }), + authority_selection: Arc::new(AuthoritySelectionDataSourceImplDiff { + dolos: Arc::new( + partner_chains_dolos_data_sources::AuthoritySelectionDataSourceImpl::new( + dolos_client.clone(), + ), + ), + dbsync: Arc::new( + partner_chains_db_sync_data_sources::CandidatesDataSourceImpl::new( + pool.clone(), + metrics_opt.clone(), + ) + .await? + .cached(CANDIDATES_FOR_EPOCH_CACHE_SIZE)?, + ), + }), + block_participation: Arc::new( + partner_chains_db_sync_data_sources::StakeDistributionDataSourceImpl::new( + pool.clone(), + metrics_opt.clone(), + STAKE_CACHE_SIZE, + ), + ), + governed_map: Arc::new( + partner_chains_db_sync_data_sources::GovernedMapDataSourceCachedImpl::new( + pool.clone(), + metrics_opt.clone(), + GOVERNED_MAP_CACHE_SIZE, + block_dbsync.clone(), + ) + .await?, + ), + bridge: Arc::new( + partner_chains_db_sync_data_sources::CachedTokenBridgeDataSourceImpl::new( + pool, + metrics_opt, + block_dbsync, + BRIDGE_TRANSFER_CACHE_LOOKAHEAD, + ), + ), + }) +} diff --git a/demo/node/src/lib.rs b/demo/node/src/lib.rs index 830a16d38b..27f04da233 100644 --- a/demo/node/src/lib.rs +++ b/demo/node/src/lib.rs @@ -3,6 +3,7 @@ pub mod chain_spec; mod data_sources; +mod diff_sources; mod inherent_data; pub mod rpc; pub mod service; diff --git a/demo/node/src/main.rs b/demo/node/src/main.rs index 86890d4b3f..9d9813e8a2 100644 --- a/demo/node/src/main.rs +++ b/demo/node/src/main.rs @@ -6,6 +6,7 @@ mod chain_spec; mod cli; mod command; mod data_sources; +mod diff_sources; mod inherent_data; mod rpc; mod service; diff --git a/toolkit/data-sources/db-sync/Cargo.toml b/toolkit/data-sources/db-sync/Cargo.toml index 80d63fbc0a..671c7b194d 100644 --- a/toolkit/data-sources/db-sync/Cargo.toml +++ b/toolkit/data-sources/db-sync/Cargo.toml @@ -16,7 +16,7 @@ sqlx = { workspace = true } db-sync-sqlx = { workspace = true } tokio = { workspace = true, features = ["full"] } futures = { workspace = true } -chrono = "0.4.31" +chrono = { workspace = true } hex = { workspace = true } hex-literal = { workspace = true } itertools = { workspace = true } diff --git a/toolkit/data-sources/db-sync/src/candidates/cached.rs b/toolkit/data-sources/db-sync/src/candidates/cached.rs index 6e0e61e18a..9427e3e7bb 100644 --- a/toolkit/data-sources/db-sync/src/candidates/cached.rs +++ b/toolkit/data-sources/db-sync/src/candidates/cached.rs @@ -20,6 +20,7 @@ pub type ArcMut = Arc>; type AriadneParametersCacheKey = (McEpochNumber, PolicyId, PolicyId); type CandidatesCacheKey = (McEpochNumber, String); +/// Cached candidate data source pub struct CandidateDataSourceCached { inner: CandidatesDataSourceImpl, get_ariadne_parameters_for_epoch_cache: @@ -96,6 +97,7 @@ impl CandidateDataSourceCacheConfig { } impl CandidateDataSourceCached { + /// TODO pub fn new( inner: CandidatesDataSourceImpl, candidates_for_epoch_cache_size: usize, @@ -114,6 +116,7 @@ impl CandidateDataSourceCached { } } + /// TODO pub fn new_from_env( inner: CandidatesDataSourceImpl, candidates_for_epoch_cache_size: usize, diff --git a/toolkit/data-sources/db-sync/src/candidates/mod.rs b/toolkit/data-sources/db-sync/src/candidates/mod.rs index 6196efe040..d59b421530 100644 --- a/toolkit/data-sources/db-sync/src/candidates/mod.rs +++ b/toolkit/data-sources/db-sync/src/candidates/mod.rs @@ -17,7 +17,7 @@ use sqlx::PgPool; use std::collections::HashMap; use std::error::Error; -mod cached; +pub mod cached; #[cfg(test)] mod tests; diff --git a/toolkit/data-sources/db-sync/src/lib.rs b/toolkit/data-sources/db-sync/src/lib.rs index 4c88d59db4..15446f09e4 100644 --- a/toolkit/data-sources/db-sync/src/lib.rs +++ b/toolkit/data-sources/db-sync/src/lib.rs @@ -103,7 +103,7 @@ pub use crate::block::{BlockDataSourceImpl, DbSyncBlockDataSourceConfig}; #[cfg(feature = "bridge")] pub use crate::bridge::{TokenBridgeDataSourceImpl, cache::CachedTokenBridgeDataSourceImpl}; #[cfg(feature = "candidate-source")] -pub use crate::candidates::CandidatesDataSourceImpl; +pub use crate::candidates::{CandidatesDataSourceImpl, cached::CandidateDataSourceCached}; #[cfg(feature = "governed-map")] pub use crate::governed_map::{GovernedMapDataSourceCachedImpl, GovernedMapDataSourceImpl}; #[cfg(feature = "mc-hash")] diff --git a/toolkit/data-sources/dolos/Cargo.toml b/toolkit/data-sources/dolos/Cargo.toml index b91d158b4c..f9b76b27ab 100644 --- a/toolkit/data-sources/dolos/Cargo.toml +++ b/toolkit/data-sources/dolos/Cargo.toml @@ -40,6 +40,8 @@ cardano-serialization-lib = { workspace = true } itertools = { workspace = true } figment = { workspace = true } tokio = { workspace = true } +derive-new = { workspace = true } +chrono = { workspace = true } [features] default = [] diff --git a/toolkit/data-sources/dolos/src/block.rs b/toolkit/data-sources/dolos/src/block.rs new file mode 100644 index 0000000000..b52dd03d32 --- /dev/null +++ b/toolkit/data-sources/dolos/src/block.rs @@ -0,0 +1,350 @@ +use crate::{ + DataSourceError, Result, + client::{ + self, MiniBFClient, + api::MiniBFApi, + conversions::{self, from_block_content}, + }, + read_mc_epoch_config, +}; +use blockfrost_openapi::models::block_content::BlockContent; +use chrono::{DateTime, NaiveDateTime, TimeDelta}; +use derive_new::new; +use figment::{Figment, providers::Env}; +use log::{debug, info}; +use serde::Deserialize; +use sidechain_domain::mainchain_epoch::{MainchainEpochConfig, MainchainEpochDerivation}; +use sidechain_domain::*; +use sp_timestamp::Timestamp; +use std::{ + error::Error, + sync::{Arc, Mutex}, +}; + +#[derive(new)] +pub struct BlockDataSourceImpl { + /// MiniBF client + client: MiniBFClient, + /// Cardano security parameter + /// + /// This parameter controls how many confirmations (blocks on top) are required by + /// the Cardano node to consider a block to be stable. This is a network-wide parameter. + security_parameter: u32, + /// Minimal age of a block to be considered valid stable in relation to some given timestamp. + /// Must be equal to `security parameter / active slot coefficient`. + min_slot_boundary_as_seconds: TimeDelta, + /// a characteristic of Ouroboros Praos and is equal to `3 * security parameter / active slot coefficient` + max_slot_boundary_as_seconds: TimeDelta, + /// Cardano main chain epoch configuration + mainchain_epoch_config: MainchainEpochConfig, + /// Additional offset applied when selecting the latest stable Cardano block + /// + /// This parameter should be 0 by default and should only be increased to 1 in networks + /// struggling with frequent block rejections due to Db-Sync or Cardano node lag. + block_stability_margin: u32, + /// Number of contiguous Cardano blocks to be cached by this data source + cache_size: u16, + /// Internal block cache + stable_blocks_cache: Arc>, +} + +impl BlockDataSourceImpl { + /// Returns the latest _unstable_ Cardano block from the Db-Sync database + pub async fn get_latest_block_info(&self) -> Result { + self.client + .blocks_latest() + .await + .and_then(conversions::from_block_content) + .map_err(|e| { + DataSourceError::ExpectedDataNotFound(format!("No latest block on chain. {e}",)) + .into() + }) + } + + /// Returns the latest _stable_ Cardano block from the Db-Sync database that is within + /// acceptable bounds from `reference_timestamp`, accounting for the additional stability + /// offset configured by [block_stability_margin][Self::block_stability_margin]. + pub async fn get_latest_stable_block_for( + &self, + reference_timestamp: Timestamp, + ) -> Result> { + let reference_timestamp = BlockDataSourceImpl::timestamp_to_db_type(reference_timestamp)?; + let latest = self.get_latest_block_info().await?; + let offset = self.security_parameter + self.block_stability_margin; + let stable = latest.number.saturating_sub(offset).into(); + let block = self.get_latest_block(stable, reference_timestamp).await?; + Ok(block) + } + + /// Finds a block by its `hash` and verifies that it is stable in reference to `reference_timestamp` + /// and returns its info + pub async fn get_stable_block_for( + &self, + hash: McBlockHash, + reference_timestamp: Timestamp, + ) -> Result> { + let reference_timestamp = BlockDataSourceImpl::timestamp_to_db_type(reference_timestamp)?; + self.get_stable_block_by_hash(hash, reference_timestamp).await + } + + /// Finds a block by its `hash` and returns its info + pub async fn get_block_by_hash(&self, hash: McBlockHash) -> Result> { + let from_cache = if let Ok(cache) = self.stable_blocks_cache.lock() { + cache.find_by_hash(hash.clone()) + } else { + None + }; + let block_opt = match from_cache { + Some(block) => Some(block), + None => Some(conversions::from_block_content(self.client.blocks_by_id(hash).await?)?), + }; + Ok(block_opt) + } +} + +/// Configuration for [BlockDataSourceImpl] +#[derive(Debug, Clone, Deserialize)] +pub struct DbSyncBlockDataSourceConfig { + /// Cardano security parameter, ie. the number of confirmations needed to stabilize a block + pub cardano_security_parameter: u32, + /// Expected fraction of Cardano slots that will have a block produced + /// + /// This value can be found in `shelley-genesis.json` file used by the Cardano node, + /// example: `"activeSlotsCoeff": 0.05`. + pub cardano_active_slots_coeff: f64, + /// Additional offset applied when selecting the latest stable Cardano block + /// + /// This parameter should be 0 by default and should only be increased to 1 in networks + /// struggling with frequent block rejections due to Db-Sync or Cardano node lag. + pub block_stability_margin: u32, +} + +impl DbSyncBlockDataSourceConfig { + /// Reads the config from environment + pub fn from_env() -> std::result::Result> { + let config: Self = Figment::new() + .merge(Env::raw()) + .extract() + .map_err(|e| format!("Failed to read block data source config: {e}"))?; + info!("Using block data source configuration: {config:?}"); + Ok(config) + } +} + +impl BlockDataSourceImpl { + /// Creates a new instance of [BlockDataSourceImpl], reading configuration from the environment. + pub async fn new_from_env( + client: MiniBFClient, + ) -> std::result::Result> { + Ok(Self::from_config( + client, + DbSyncBlockDataSourceConfig::from_env()?, + &read_mc_epoch_config()?, + )) + } + + /// Creates a new instance of [BlockDataSourceImpl], using passed configuration. + pub fn from_config( + client: MiniBFClient, + DbSyncBlockDataSourceConfig { + cardano_security_parameter, + cardano_active_slots_coeff, + block_stability_margin, + }: DbSyncBlockDataSourceConfig, + mc_epoch_config: &MainchainEpochConfig, + ) -> BlockDataSourceImpl { + let k: f64 = cardano_security_parameter.into(); + let slot_duration: f64 = mc_epoch_config.slot_duration_millis.millis() as f64; + let min_slot_boundary = (slot_duration * k / cardano_active_slots_coeff).round() as i64; + let max_slot_boundary = 3 * min_slot_boundary; + let cache_size = 100; + BlockDataSourceImpl::new( + client, + cardano_security_parameter, + TimeDelta::milliseconds(min_slot_boundary), + TimeDelta::milliseconds(max_slot_boundary), + mc_epoch_config.clone(), + block_stability_margin, + cache_size, + BlocksCache::new_arc_mutex(), + ) + } + async fn get_latest_block( + &self, + max_block: McBlockNumber, + reference_timestamp: NaiveDateTime, + ) -> Result> { + let min_time_naive = self.min_block_allowed_time(reference_timestamp); + let min_time = convert_naive_datetime(min_time_naive); + let min_slot = self.date_time_to_slot(min_time_naive)?; + let max_time_naive = self.max_allowed_block_time(reference_timestamp); + let max_time = convert_naive_datetime(max_time_naive); + let max_slot = self.date_time_to_slot(max_time_naive)?; + + let mut current_block_number = max_block; + + loop { + let block = match self.client.blocks_by_id(current_block_number).await { + Ok(b) => conversions::from_block_content(b)?, + Err(_) => return Ok(None), + }; + + let is_time_match = block.timestamp >= min_time && block.timestamp <= max_time; + let is_slot_match = block.slot >= min_slot && block.slot <= max_slot; + + if is_time_match && is_slot_match { + return Ok(Some(block)); + } + + if block.timestamp < min_time || block.slot < min_slot || block.number.0 == 0 { + return Ok(None); + } + + current_block_number = block.number.saturating_sub(1u32); + } + } + + fn min_block_allowed_time(&self, reference_timestamp: NaiveDateTime) -> NaiveDateTime { + reference_timestamp - self.max_slot_boundary_as_seconds + } + + fn max_allowed_block_time(&self, reference_timestamp: NaiveDateTime) -> NaiveDateTime { + reference_timestamp - self.min_slot_boundary_as_seconds + } + + /// Rules for block selection and verification mandates that timestamp of the block + /// falls in a given range, calculated from the reference timestamp, which is either + /// PC current time or PC block timestamp. + fn is_block_time_valid(&self, block: &MainchainBlock, timestamp: NaiveDateTime) -> bool { + convert_naive_datetime(self.min_block_allowed_time(timestamp)) <= block.timestamp + && block.timestamp <= convert_naive_datetime(self.max_allowed_block_time(timestamp)) + } + + async fn get_stable_block_by_hash( + &self, + hash: McBlockHash, + reference_timestamp: NaiveDateTime, + ) -> Result> { + if let Some(block) = + self.get_stable_block_by_hash_from_cache(hash.clone(), reference_timestamp) + { + debug!("Block by hash: {hash} found in cache."); + Ok(Some(From::from(block))) + } else { + debug!("Block by hash: {hash}, not found in cache, serving from database."); + if let Some(block_by_hash) = + self.get_stable_block_by_hash_from_db(hash, reference_timestamp).await? + { + self.fill_cache(&block_by_hash).await?; + Ok(Some(MainchainBlock::from(block_by_hash))) + } else { + Ok(None) + } + } + } + + fn get_stable_block_by_hash_from_cache( + &self, + hash: McBlockHash, + reference_timestamp: NaiveDateTime, + ) -> Option { + if let Ok(cache) = self.stable_blocks_cache.lock() { + cache + .find_by_hash(hash) + .filter(|block| self.is_block_time_valid(block, reference_timestamp)) + } else { + None + } + } + + /// Returns block by given hash from the cache if it is valid in reference to given timestamp + async fn get_stable_block_by_hash_from_db( + &self, + hash: McBlockHash, + reference_timestamp: NaiveDateTime, + ) -> Result> { + let block = Some(conversions::from_block_content(self.client.blocks_by_id(hash).await?)?); + let latest_block = + Some(conversions::from_block_content(self.client.blocks_latest().await?)?); + Ok(block + .zip(latest_block) + .filter(|(block, latest_block)| { + block.number.saturating_add(self.security_parameter) <= latest_block.number + && self.is_block_time_valid(block, reference_timestamp) + }) + .map(|(block, _)| block)) + } + + /// Caches stable blocks for lookup by hash. + async fn fill_cache(&self, _from_block: &MainchainBlock) -> Result<()> { + // let from_block_no = from_block.block_no; + // let size = u32::from(self.cache_size); + // let latest_block = + // db_model::get_latest_block_info(&self.pool) + // .await? + // .ok_or(InternalDataSourceError( + // "No latest block when filling the caches.".to_string(), + // ))?; + // let stable_block_num = latest_block.block_no.saturating_sub(self.security_parameter); + + // let to_block_no = from_block_no.saturating_add(size).min(stable_block_num); + // let blocks = if to_block_no > from_block_no { + // db_model::get_blocks_by_numbers(&self.pool, from_block_no, to_block_no).await? + // } else { + // vec![from_block.clone()] + // }; + + // if let Ok(mut cache) = self.stable_blocks_cache.lock() { + // cache.update(blocks); + // debug!("Cached blocks {} to {} for by hash lookups.", from_block_no.0, to_block_no.0); + // } + Ok(()) + } + + fn date_time_to_slot(&self, dt: NaiveDateTime) -> Result { + let millis: u64 = + dt.and_utc().timestamp_millis().try_into().map_err(|_| { + DataSourceError::BadRequest(format!("Datetime out of range: {dt:?}")) + })?; + let ts = sidechain_domain::mainchain_epoch::Timestamp::from_unix_millis(millis); + let slot = self + .mainchain_epoch_config + .timestamp_to_mainchain_slot_number(ts) + .unwrap_or(self.mainchain_epoch_config.first_slot_number); + Ok(McSlotNumber(slot)) + } + + fn timestamp_to_db_type(timestamp: Timestamp) -> Result { + let millis: Option = timestamp.as_millis().try_into().ok(); + let dt = millis + .and_then(DateTime::from_timestamp_millis) + .ok_or(DataSourceError::BadRequest(format!("Timestamp out of range: {timestamp:?}")))?; + Ok(NaiveDateTime::new(dt.date_naive(), dt.time())) + } +} + +fn convert_naive_datetime(d: NaiveDateTime) -> u64 { + d.and_utc().timestamp().try_into().expect("i64 timestamp is valid u64") +} + +/// Helper structure for caching stable blocks. +#[derive(new)] +pub(crate) struct BlocksCache { + /// Continuous main chain blocks. All blocks should be stable. Used to query by hash. + #[new(default)] + from_last_by_hash: Vec, +} + +impl BlocksCache { + fn find_by_hash(&self, hash: McBlockHash) -> Option { + self.from_last_by_hash.iter().find(|b| b.hash == hash).cloned() + } + + pub fn update(&mut self, from_last_by_hash: Vec) { + self.from_last_by_hash = from_last_by_hash; + } + + pub fn new_arc_mutex() -> Arc> { + Arc::new(Mutex::new(Self::new())) + } +} diff --git a/toolkit/data-sources/dolos/src/candidate.rs b/toolkit/data-sources/dolos/src/candidate.rs index 02e6254d02..1b21b67c34 100644 --- a/toolkit/data-sources/dolos/src/candidate.rs +++ b/toolkit/data-sources/dolos/src/candidate.rs @@ -108,9 +108,9 @@ impl AuthoritySelectionDataSource for AuthoritySelectionDataSourceImpl { let pools = self.client.pools_extended().await?; let pred = |pool: PoolListExtendedInner| async move { let history = self.client.pools_history(&pool.pool_id).await?; - Result::Ok(match history.into_iter().find(|h| h.epoch == epoch.0 as i32) { + Result::Ok(match history.into_iter().find(|h| h.epoch <= epoch.0 as i32) { Some(e) => Some(( - MainchainKeyHash(pool.pool_id.as_bytes().try_into()?), // TODO is pool_id a pool hash? + MainchainKeyHash::decode_hex(&pool.pool_id)?, // TODO is pool_id a pool hash? StakeDelegation(e.active_stake.parse::()?), )), None => None, @@ -139,7 +139,7 @@ impl AuthoritySelectionDataSource for AuthoritySelectionDataSourceImpl { async fn get_epoch_nonce(&self, epoch_number: McEpochNumber) -> Result> { let epoch = self.get_epoch_of_data_storage(epoch_number)?; let nonce: String = self.client.epochs_parameters(epoch).await?.nonce; - Ok(Some(EpochNonce(nonce.into()))) + Ok(Some(EpochNonce::decode_hex(&nonce)?)) } async fn data_epoch(&self, for_epoch: McEpochNumber) -> Result { diff --git a/toolkit/data-sources/dolos/src/lib.rs b/toolkit/data-sources/dolos/src/lib.rs index 25598ea8b6..fc97095746 100644 --- a/toolkit/data-sources/dolos/src/lib.rs +++ b/toolkit/data-sources/dolos/src/lib.rs @@ -28,6 +28,10 @@ mod bridge; #[cfg(feature = "bridge")] pub use bridge::TokenBridgeDataSourceImpl; +mod block; +pub use block::BlockDataSourceImpl; +use sidechain_domain::mainchain_epoch::MainchainEpochConfig; + use crate::client::MiniBFClient; pub mod client; @@ -82,3 +86,12 @@ impl ConnectionConfig { Ok(config) } } + +/// Reads Cardano main chain epoch configuration from the environment. +/// +/// See documentation of [MainchainEpochConfig::read_from_env] for the list of environment variables read. +#[cfg(feature = "block-source")] +pub fn read_mc_epoch_config() -> Result { + Ok(MainchainEpochConfig::read_from_env() + .map_err(|e| format!("Failed to read main chain config: {}", e))?) +} diff --git a/toolkit/data-sources/dolos/src/mc_hash.rs b/toolkit/data-sources/dolos/src/mc_hash.rs index 55bf1b01d7..9df431ddf7 100644 --- a/toolkit/data-sources/dolos/src/mc_hash.rs +++ b/toolkit/data-sources/dolos/src/mc_hash.rs @@ -1,17 +1,18 @@ -use crate::{ - Result, - client::{MiniBFClient, api::MiniBFApi, conversions::from_block_content}, -}; +use crate::Result; +use crate::block::BlockDataSourceImpl; use async_trait::async_trait; use sidechain_domain::*; +use sp_timestamp::Timestamp; +use std::sync::Arc; pub struct McHashDataSourceImpl { - client: MiniBFClient, + /// [BlockDataSourceImpl] instance shared with other data sources for cache reuse. + inner: Arc, } impl McHashDataSourceImpl { - pub fn new(client: MiniBFClient) -> Self { - Self { client } + pub fn new(inner: Arc) -> Self { + Self { inner } } } @@ -19,20 +20,26 @@ impl McHashDataSourceImpl { impl sidechain_mc_hash::McHashDataSource for McHashDataSourceImpl { async fn get_latest_stable_block_for( &self, - _reference_timestamp: sp_timestamp::Timestamp, + reference_timestamp: sp_timestamp::Timestamp, ) -> Result> { - Ok(Some(from_block_content(self.client.blocks_latest().await?)?)) + Ok(self + .inner + .get_latest_stable_block_for(Timestamp::new(reference_timestamp.as_millis())) + .await?) } async fn get_stable_block_for( &self, hash: McBlockHash, - _reference_timestamp: sp_timestamp::Timestamp, + reference_timestamp: sp_timestamp::Timestamp, ) -> Result> { - Ok(Some(from_block_content(self.client.blocks_by_id(hash).await?)?)) + Ok(self + .inner + .get_stable_block_for(hash, Timestamp::new(reference_timestamp.as_millis())) + .await?) } async fn get_block_by_hash(&self, hash: McBlockHash) -> Result> { - Ok(Some(from_block_content(self.client.blocks_by_id(hash).await?)?)) + Ok(self.inner.get_block_by_hash(hash).await?) } } diff --git a/toolkit/sidechain/domain/src/lib.rs b/toolkit/sidechain/domain/src/lib.rs index 952f7fee5f..838bbe9a13 100644 --- a/toolkit/sidechain/domain/src/lib.rs +++ b/toolkit/sidechain/domain/src/lib.rs @@ -774,7 +774,7 @@ const EPOCH_NONCE_LEN: usize = 32; /// Because it is subject to Cardano's consensus mechanism and has strong cryptographic guarantees, /// this value can be used as a tamper-proof shared randomness seed by Partner Chain Toolkit components. #[derive(Default, Clone, Encode, Decode, DecodeWithMemTracking, PartialEq, Eq, TypeInfo)] -#[byte_string(debug, hex_serialize)] +#[byte_string(debug, hex_serialize, decode_hex)] pub struct EpochNonce(pub Vec); impl EpochNonce { From ba0b47d57b8236118f449e810b0d4f982f73cbe4 Mon Sep 17 00:00:00 2001 From: Krisztian Pinter Date: Mon, 10 Nov 2025 13:52:27 +0100 Subject: [PATCH 02/12] fixes --- demo/node/src/diff_sources.rs | 6 ++++-- dev/local-environment/setup.sh | 2 +- toolkit/data-sources/dolos/src/candidate.rs | 19 ++++++++----------- 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/demo/node/src/diff_sources.rs b/demo/node/src/diff_sources.rs index 292d7588b9..621e07caef 100644 --- a/demo/node/src/diff_sources.rs +++ b/demo/node/src/diff_sources.rs @@ -120,12 +120,14 @@ impl AuthoritySelectionDataSource for AuthoritySelectionDataSourceImplDiff { epoch_number: McEpochNumber, committee_candidate_address: MainchainAddress, ) -> Result, Box> { - let reference = self + let mut reference = self .dbsync .get_candidates(epoch_number, committee_candidate_address.clone()) .await?; - let dolos_output = + let mut dolos_output = self.dolos.get_candidates(epoch_number, committee_candidate_address).await?; + reference.sort_by_key(|a| a.mainchain_pub_key().clone()); + dolos_output.sort_by_key(|a| a.mainchain_pub_key().clone()); if reference != dolos_output { println!( ">>>>>>>>>>>>>>>>>>>>>>>>>>>> McHashDataSource::get_candidates mismatch: dbs: {reference:?} dolos: {dolos_output:?}" diff --git a/dev/local-environment/setup.sh b/dev/local-environment/setup.sh index fee18cf16a..3e80cce01d 100755 --- a/dev/local-environment/setup.sh +++ b/dev/local-environment/setup.sh @@ -3,7 +3,7 @@ PARTNER_CHAINS_NODE_IMAGE="ghcr.io/input-output-hk/partner-chains/partner-chains-node-unstable:latest" CARDANO_IMAGE="ghcr.io/intersectmbo/cardano-node:10.5.1" DBSYNC_IMAGE="ghcr.io/intersectmbo/cardano-db-sync:13.6.0.5" -DOLOS_IMAGE="ghcr.io/txpipe/dolos:1.0.0-beta.8" +DOLOS_IMAGE="ghcr.io/txpipe/dolos:1.0.0-rc.1" OGMIOS_IMAGE="cardanosolutions/ogmios:v6.13.0" POSTGRES_IMAGE="postgres:17.2" TESTS_IMAGE="python:3.12-slim" diff --git a/toolkit/data-sources/dolos/src/candidate.rs b/toolkit/data-sources/dolos/src/candidate.rs index 1b21b67c34..1c7578cccc 100644 --- a/toolkit/data-sources/dolos/src/candidate.rs +++ b/toolkit/data-sources/dolos/src/candidate.rs @@ -110,7 +110,7 @@ impl AuthoritySelectionDataSource for AuthoritySelectionDataSourceImpl { let history = self.client.pools_history(&pool.pool_id).await?; Result::Ok(match history.into_iter().find(|h| h.epoch <= epoch.0 as i32) { Some(e) => Some(( - MainchainKeyHash::decode_hex(&pool.pool_id)?, // TODO is pool_id a pool hash? + MainchainKeyHash::decode_hex(&pool.pool_id)?, StakeDelegation(e.active_stake.parse::()?), )), None => None, @@ -265,9 +265,9 @@ impl AuthoritySelectionDataSourceImpl { output.clone().output_index ))?; let datum = cardano_serialization_lib::PlutusData::from_hex(&datum_str) - .map_err(|e| e.to_string())?; + .map_err(|e| format!("Failed to parse datum string: {e}"))?; let utxo_id = UtxoId { - tx_hash: output.tx_hash.as_bytes().try_into()?, + tx_hash: McTxHash::decode_hex(&output.tx_hash)?, index: UtxoIndex(output.tx_index.try_into()?), }; let register_validator_datum = RegisterValidatorDatum::try_from(datum) @@ -285,7 +285,7 @@ impl AuthoritySelectionDataSourceImpl { .into_iter() .map(|input| { Ok::>(UtxoId { - tx_hash: input.tx_hash.as_bytes().try_into()?, + tx_hash: McTxHash::decode_hex(&input.tx_hash)?, index: UtxoIndex(input.output_index.try_into()?), }) }) @@ -316,7 +316,7 @@ impl AuthoritySelectionDataSourceImpl { .filter_map(|r| match r { Ok(candidate) => Some(candidate.clone()), Err(msg) => { - log::error!("{msg}"); + log::error!("Failed to parse candidate: {msg}"); None }, }) @@ -359,13 +359,10 @@ impl AuthoritySelectionDataSourceImpl { let active_utxos = match registrations_block_for_epoch_opt { Some(registrations_block_for_epoch) => { let pred = |utxo: AddressUtxoContentInner| async move { + let block = self.client.blocks_by_id(utxo.block.clone()).await?; Ok::( - self.client - .blocks_by_id(utxo.block.clone()) - .await? - .height - .ok_or("committee candidate block height missing")? as u32 - >= registrations_block_for_epoch + block.height.ok_or("committee candidate block height missing")? as u32 + <= registrations_block_for_epoch .height .ok_or("last_block_for_epoch block height missing")? as u32, ) From da1e6351cacf5b0dacdcf1fb9da8e6ed7e06a0d6 Mon Sep 17 00:00:00 2001 From: Krisztian Pinter Date: Wed, 12 Nov 2025 15:52:57 +0100 Subject: [PATCH 03/12] add block caching --- toolkit/data-sources/dolos/src/block.rs | 68 +++++++++++-------------- 1 file changed, 29 insertions(+), 39 deletions(-) diff --git a/toolkit/data-sources/dolos/src/block.rs b/toolkit/data-sources/dolos/src/block.rs index b52dd03d32..b6b6b6c696 100644 --- a/toolkit/data-sources/dolos/src/block.rs +++ b/toolkit/data-sources/dolos/src/block.rs @@ -1,13 +1,8 @@ use crate::{ DataSourceError, Result, - client::{ - self, MiniBFClient, - api::MiniBFApi, - conversions::{self, from_block_content}, - }, + client::{MiniBFClient, api::MiniBFApi, conversions::from_block_content}, read_mc_epoch_config, }; -use blockfrost_openapi::models::block_content::BlockContent; use chrono::{DateTime, NaiveDateTime, TimeDelta}; use derive_new::new; use figment::{Figment, providers::Env}; @@ -51,14 +46,9 @@ pub struct BlockDataSourceImpl { impl BlockDataSourceImpl { /// Returns the latest _unstable_ Cardano block from the Db-Sync database pub async fn get_latest_block_info(&self) -> Result { - self.client - .blocks_latest() - .await - .and_then(conversions::from_block_content) - .map_err(|e| { - DataSourceError::ExpectedDataNotFound(format!("No latest block on chain. {e}",)) - .into() - }) + self.client.blocks_latest().await.and_then(from_block_content).map_err(|e| { + DataSourceError::ExpectedDataNotFound(format!("No latest block on chain. {e}",)).into() + }) } /// Returns the latest _stable_ Cardano block from the Db-Sync database that is within @@ -96,7 +86,7 @@ impl BlockDataSourceImpl { }; let block_opt = match from_cache { Some(block) => Some(block), - None => Some(conversions::from_block_content(self.client.blocks_by_id(hash).await?)?), + None => Some(from_block_content(self.client.blocks_by_id(hash).await?)?), }; Ok(block_opt) } @@ -185,7 +175,7 @@ impl BlockDataSourceImpl { loop { let block = match self.client.blocks_by_id(current_block_number).await { - Ok(b) => conversions::from_block_content(b)?, + Ok(b) => from_block_content(b)?, Err(_) => return Ok(None), }; @@ -263,9 +253,8 @@ impl BlockDataSourceImpl { hash: McBlockHash, reference_timestamp: NaiveDateTime, ) -> Result> { - let block = Some(conversions::from_block_content(self.client.blocks_by_id(hash).await?)?); - let latest_block = - Some(conversions::from_block_content(self.client.blocks_latest().await?)?); + let block = Some(from_block_content(self.client.blocks_by_id(hash).await?)?); + let latest_block = Some(from_block_content(self.client.blocks_latest().await?)?); Ok(block .zip(latest_block) .filter(|(block, latest_block)| { @@ -276,28 +265,29 @@ impl BlockDataSourceImpl { } /// Caches stable blocks for lookup by hash. - async fn fill_cache(&self, _from_block: &MainchainBlock) -> Result<()> { - // let from_block_no = from_block.block_no; - // let size = u32::from(self.cache_size); - // let latest_block = - // db_model::get_latest_block_info(&self.pool) - // .await? - // .ok_or(InternalDataSourceError( - // "No latest block when filling the caches.".to_string(), - // ))?; - // let stable_block_num = latest_block.block_no.saturating_sub(self.security_parameter); + async fn fill_cache(&self, from_block: &MainchainBlock) -> Result<()> { + let from_block_no = from_block.number; + let size = u32::from(self.cache_size); + let latest_block = from_block_content(self.client.blocks_latest().await?)?; + let stable_block_num = latest_block.number.saturating_sub(self.security_parameter); - // let to_block_no = from_block_no.saturating_add(size).min(stable_block_num); - // let blocks = if to_block_no > from_block_no { - // db_model::get_blocks_by_numbers(&self.pool, from_block_no, to_block_no).await? - // } else { - // vec![from_block.clone()] - // }; + let to_block_no = from_block_no.saturating_add(size).min(stable_block_num); + let blocks = if from_block_no < to_block_no { + let futures = (from_block_no.0..=to_block_no.0).map(|block_no| async move { + self.client + .blocks_by_id(McBlockNumber(block_no)) + .await + .and_then(from_block_content) + }); + futures::future::try_join_all(futures).await?.into_iter().collect() + } else { + vec![from_block.clone()] + }; - // if let Ok(mut cache) = self.stable_blocks_cache.lock() { - // cache.update(blocks); - // debug!("Cached blocks {} to {} for by hash lookups.", from_block_no.0, to_block_no.0); - // } + if let Ok(mut cache) = self.stable_blocks_cache.lock() { + cache.update(blocks); + debug!("Cached blocks {} to {} for by hash lookups.", from_block_no.0, to_block_no.0); + } Ok(()) } From 8961ad99994c89ba24eb0159810034c58113a21d Mon Sep 17 00:00:00 2001 From: Krisztian Pinter Date: Wed, 12 Nov 2025 15:53:09 +0100 Subject: [PATCH 04/12] remove diff sources --- demo/node/src/data_sources.rs | 14 +- demo/node/src/diff_sources.rs | 240 ---------------------------------- demo/node/src/lib.rs | 1 - demo/node/src/main.rs | 1 - 4 files changed, 1 insertion(+), 255 deletions(-) delete mode 100644 demo/node/src/diff_sources.rs diff --git a/demo/node/src/data_sources.rs b/demo/node/src/data_sources.rs index 523e44de7a..14a1649660 100644 --- a/demo/node/src/data_sources.rs +++ b/demo/node/src/data_sources.rs @@ -9,8 +9,6 @@ use sp_governed_map::GovernedMapDataSource; use sp_partner_chains_bridge::TokenBridgeDataSource; use std::{error::Error, sync::Arc}; -use crate::diff_sources; - pub const DATA_SOURCE_VAR: &str = "CARDANO_DATA_SOURCE"; #[derive(Clone, Debug, PartialEq)] @@ -18,7 +16,6 @@ pub enum DataSourceType { DbSync, Mock, Dolos, - Diff, } impl DataSourceType { @@ -37,7 +34,7 @@ impl std::str::FromStr for DataSourceType { match s.to_lowercase().as_str() { "db-sync" => Ok(DataSourceType::DbSync), "mock" => Ok(DataSourceType::Mock), - "dolos" => Ok(DataSourceType::Diff), // TODO restore this to dolos + "dolos" => Ok(DataSourceType::Dolos), _ => { Err(format!("Invalid data source type: {}. Valid options: db-sync, mock, dolos", s)) }, @@ -51,7 +48,6 @@ impl std::fmt::Display for DataSourceType { DataSourceType::DbSync => write!(f, "db-sync"), DataSourceType::Mock => write!(f, "mock"), DataSourceType::Dolos => write!(f, "dolos"), - DataSourceType::Diff => write!(f, "diff"), } } } @@ -88,14 +84,6 @@ pub(crate) async fn create_cached_data_sources( DataSourceType::Dolos => create_dolos_data_sources(metrics_opt).await.map_err(|err| { ServiceError::Application(format!("Failed to create dolos data sources: {err}").into()) }), - - DataSourceType::Diff => { - diff_sources::create_diff_data_sources(metrics_opt).await.map_err(|err| { - ServiceError::Application( - format!("Failed to create dolos data sources: {err}").into(), - ) - }) - }, } } diff --git a/demo/node/src/diff_sources.rs b/demo/node/src/diff_sources.rs deleted file mode 100644 index 621e07caef..0000000000 --- a/demo/node/src/diff_sources.rs +++ /dev/null @@ -1,240 +0,0 @@ -use authority_selection_inherents::AuthoritySelectionDataSource; -use pallet_sidechain_rpc::SidechainRpcDataSource; -use partner_chains_data_source_metrics::McFollowerMetrics; -use sidechain_domain::*; -use sidechain_mc_hash::McHashDataSource; -use std::{error::Error, sync::Arc}; - -use crate::data_sources::*; - -struct SidechainRpcDataSourceImplDiff { - dolos: Arc, - dbsync: Arc, -} - -#[async_trait::async_trait] -impl SidechainRpcDataSource for SidechainRpcDataSourceImplDiff { - async fn get_latest_block_info( - &self, - ) -> Result> { - let reference = self.dbsync.get_latest_block_info().await?; - let dolos_output = self.dolos.get_latest_block_info().await?; - if reference != dolos_output { - println!( - ">>>>>>>>>>>>>>>>>>>>>>>>>>>> SidechainRpcDataSource::get_latest_block_info mismatch: dbs: {reference:?} dolos: {dolos_output:?}" - ) - } - Ok(reference) - } -} - -struct McHashDataSourceImplDiff { - dolos: Arc, - dbsync: Arc, -} - -#[async_trait::async_trait] -impl McHashDataSource for McHashDataSourceImplDiff { - async fn get_latest_stable_block_for( - &self, - reference_timestamp: sp_timestamp::Timestamp, - ) -> Result, Box> { - let reference = self.dbsync.get_latest_stable_block_for(reference_timestamp).await?; - let dolos_output = self.dolos.get_latest_stable_block_for(reference_timestamp).await?; - if reference != dolos_output { - println!( - ">>>>>>>>>>>>>>>>>>>>>>>>>>>> McHashDataSource::get_latest_stable_block_for mismatch: dbs: {reference:?} dolos: {dolos_output:?}" - ) - } - Ok(reference) - } - - async fn get_stable_block_for( - &self, - hash: McBlockHash, - reference_timestamp: sp_timestamp::Timestamp, - ) -> Result, Box> { - let reference = self.dbsync.get_stable_block_for(hash.clone(), reference_timestamp).await?; - let dolos_output = self.dolos.get_stable_block_for(hash, reference_timestamp).await?; - if reference != dolos_output { - println!( - ">>>>>>>>>>>>>>>>>>>>>>>>>>>> McHashDataSource::get_stable_block_for mismatch: dbs: {reference:?} dolos: {dolos_output:?}" - ) - } - Ok(reference) - } - - async fn get_block_by_hash( - &self, - hash: McBlockHash, - ) -> Result, Box> { - let reference = self.dbsync.get_block_by_hash(hash.clone()).await?; - let dolos_output = self.dolos.get_block_by_hash(hash).await?; - if reference != dolos_output { - println!( - ">>>>>>>>>>>>>>>>>>>>>>>>>>>> McHashDataSource::get_block_by_hash mismatch: dbs: {reference:?} dolos: {dolos_output:?}" - ) - } - Ok(reference) - } -} - -struct AuthoritySelectionDataSourceImplDiff { - dolos: Arc, - dbsync: Arc, -} - -#[async_trait::async_trait] -impl AuthoritySelectionDataSource for AuthoritySelectionDataSourceImplDiff { - async fn get_ariadne_parameters( - &self, - epoch_number: McEpochNumber, - d_parameter_policy: PolicyId, - permissioned_candidate_policy: PolicyId, - ) -> Result< - authority_selection_inherents::AriadneParameters, - Box, - > { - let reference = self - .dbsync - .get_ariadne_parameters( - epoch_number, - d_parameter_policy.clone(), - permissioned_candidate_policy.clone(), - ) - .await?; - let dolos_output = self - .dolos - .get_ariadne_parameters(epoch_number, d_parameter_policy, permissioned_candidate_policy) - .await?; - if reference != dolos_output { - println!( - ">>>>>>>>>>>>>>>>>>>>>>>>>>>> McHashDataSource::get_ariadne_parameters mismatch: dbs: {reference:?} dolos: {dolos_output:?}" - ) - } - Ok(reference) - } - - async fn get_candidates( - &self, - epoch_number: McEpochNumber, - committee_candidate_address: MainchainAddress, - ) -> Result, Box> { - let mut reference = self - .dbsync - .get_candidates(epoch_number, committee_candidate_address.clone()) - .await?; - let mut dolos_output = - self.dolos.get_candidates(epoch_number, committee_candidate_address).await?; - reference.sort_by_key(|a| a.mainchain_pub_key().clone()); - dolos_output.sort_by_key(|a| a.mainchain_pub_key().clone()); - if reference != dolos_output { - println!( - ">>>>>>>>>>>>>>>>>>>>>>>>>>>> McHashDataSource::get_candidates mismatch: dbs: {reference:?} dolos: {dolos_output:?}" - ) - } - Ok(reference) - } - - async fn get_epoch_nonce( - &self, - epoch_number: McEpochNumber, - ) -> Result, Box> { - let reference = self.dbsync.get_epoch_nonce(epoch_number).await?; - let dolos_output = self.dolos.get_epoch_nonce(epoch_number).await?; - if reference != dolos_output { - println!( - ">>>>>>>>>>>>>>>>>>>>>>>>>>>> McHashDataSource::get_epoch_nonce mismatch: dbs: {reference:?} dolos: {dolos_output:?}" - ) - } - Ok(reference) - } - - async fn data_epoch( - &self, - for_epoch: McEpochNumber, - ) -> Result> { - let reference = self.dbsync.data_epoch(for_epoch).await?; - let dolos_output = self.dolos.data_epoch(for_epoch).await?; - if reference != dolos_output { - println!( - ">>>>>>>>>>>>>>>>>>>>>>>>>>>> McHashDataSource::data_epoch mismatch: dbs: {reference:?} dolos: {dolos_output:?}" - ) - } - Ok(reference) - } -} - -pub async fn create_diff_data_sources( - metrics_opt: Option, -) -> std::result::Result> { - let dolos_client = partner_chains_dolos_data_sources::get_connection_from_env()?; - let pool = partner_chains_db_sync_data_sources::get_connection_from_env().await?; - let block_dbsync = Arc::new( - partner_chains_db_sync_data_sources::BlockDataSourceImpl::new_from_env(pool.clone()) - .await?, - ); - let block_dolos = Arc::new( - partner_chains_dolos_data_sources::BlockDataSourceImpl::new_from_env(dolos_client.clone()) - .await?, - ); - Ok(DataSources { - sidechain_rpc: Arc::new(SidechainRpcDataSourceImplDiff { - dolos: Arc::new(partner_chains_dolos_data_sources::SidechainRpcDataSourceImpl::new( - dolos_client.clone(), - )), - dbsync: Arc::new(partner_chains_db_sync_data_sources::SidechainRpcDataSourceImpl::new( - block_dbsync.clone(), - metrics_opt.clone(), - )), - }), - mc_hash: Arc::new(McHashDataSourceImplDiff { - dolos: Arc::new(partner_chains_dolos_data_sources::McHashDataSourceImpl::new( - block_dolos.clone(), - )), - dbsync: Arc::new(partner_chains_db_sync_data_sources::McHashDataSourceImpl::new( - block_dbsync.clone(), - metrics_opt.clone(), - )), - }), - authority_selection: Arc::new(AuthoritySelectionDataSourceImplDiff { - dolos: Arc::new( - partner_chains_dolos_data_sources::AuthoritySelectionDataSourceImpl::new( - dolos_client.clone(), - ), - ), - dbsync: Arc::new( - partner_chains_db_sync_data_sources::CandidatesDataSourceImpl::new( - pool.clone(), - metrics_opt.clone(), - ) - .await? - .cached(CANDIDATES_FOR_EPOCH_CACHE_SIZE)?, - ), - }), - block_participation: Arc::new( - partner_chains_db_sync_data_sources::StakeDistributionDataSourceImpl::new( - pool.clone(), - metrics_opt.clone(), - STAKE_CACHE_SIZE, - ), - ), - governed_map: Arc::new( - partner_chains_db_sync_data_sources::GovernedMapDataSourceCachedImpl::new( - pool.clone(), - metrics_opt.clone(), - GOVERNED_MAP_CACHE_SIZE, - block_dbsync.clone(), - ) - .await?, - ), - bridge: Arc::new( - partner_chains_db_sync_data_sources::CachedTokenBridgeDataSourceImpl::new( - pool, - metrics_opt, - block_dbsync, - BRIDGE_TRANSFER_CACHE_LOOKAHEAD, - ), - ), - }) -} diff --git a/demo/node/src/lib.rs b/demo/node/src/lib.rs index 27f04da233..830a16d38b 100644 --- a/demo/node/src/lib.rs +++ b/demo/node/src/lib.rs @@ -3,7 +3,6 @@ pub mod chain_spec; mod data_sources; -mod diff_sources; mod inherent_data; pub mod rpc; pub mod service; diff --git a/demo/node/src/main.rs b/demo/node/src/main.rs index 9d9813e8a2..86890d4b3f 100644 --- a/demo/node/src/main.rs +++ b/demo/node/src/main.rs @@ -6,7 +6,6 @@ mod chain_spec; mod cli; mod command; mod data_sources; -mod diff_sources; mod inherent_data; mod rpc; mod service; From 131d7a1e84b6bc60d1b95e6f679848682e3927a8 Mon Sep 17 00:00:00 2001 From: Krisztian Pinter Date: Wed, 12 Nov 2025 16:40:16 +0100 Subject: [PATCH 05/12] fix docstrings --- toolkit/data-sources/db-sync/src/candidates/cached.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/toolkit/data-sources/db-sync/src/candidates/cached.rs b/toolkit/data-sources/db-sync/src/candidates/cached.rs index 9427e3e7bb..e2ebb886c0 100644 --- a/toolkit/data-sources/db-sync/src/candidates/cached.rs +++ b/toolkit/data-sources/db-sync/src/candidates/cached.rs @@ -97,7 +97,7 @@ impl CandidateDataSourceCacheConfig { } impl CandidateDataSourceCached { - /// TODO + /// Creates new instance of the data source pub fn new( inner: CandidatesDataSourceImpl, candidates_for_epoch_cache_size: usize, @@ -116,7 +116,7 @@ impl CandidateDataSourceCached { } } - /// TODO + /// Creates a new instance of the data source, reading configuration from the environment. pub fn new_from_env( inner: CandidatesDataSourceImpl, candidates_for_epoch_cache_size: usize, From 9f0404371d84bc6f499022de95853bd9132d9fab Mon Sep 17 00:00:00 2001 From: Krisztian Pinter Date: Thu, 13 Nov 2025 13:49:07 +0100 Subject: [PATCH 06/12] address comments --- toolkit/data-sources/dolos/src/block.rs | 72 +++++++++---------- toolkit/data-sources/dolos/src/client/api.rs | 11 ++- .../dolos/src/client/conversions.rs | 9 +-- .../data-sources/dolos/src/client/minibf.rs | 12 ++-- toolkit/data-sources/dolos/src/lib.rs | 2 +- 5 files changed, 48 insertions(+), 58 deletions(-) diff --git a/toolkit/data-sources/dolos/src/block.rs b/toolkit/data-sources/dolos/src/block.rs index b6b6b6c696..21981f3d75 100644 --- a/toolkit/data-sources/dolos/src/block.rs +++ b/toolkit/data-sources/dolos/src/block.rs @@ -6,6 +6,7 @@ use crate::{ use chrono::{DateTime, NaiveDateTime, TimeDelta}; use derive_new::new; use figment::{Figment, providers::Env}; +use futures::TryFutureExt; use log::{debug, info}; use serde::Deserialize; use sidechain_domain::mainchain_epoch::{MainchainEpochConfig, MainchainEpochDerivation}; @@ -34,8 +35,7 @@ pub struct BlockDataSourceImpl { mainchain_epoch_config: MainchainEpochConfig, /// Additional offset applied when selecting the latest stable Cardano block /// - /// This parameter should be 0 by default and should only be increased to 1 in networks - /// struggling with frequent block rejections due to Db-Sync or Cardano node lag. + /// This parameter should be 1 by default. block_stability_margin: u32, /// Number of contiguous Cardano blocks to be cached by this data source cache_size: u16, @@ -44,14 +44,14 @@ pub struct BlockDataSourceImpl { } impl BlockDataSourceImpl { - /// Returns the latest _unstable_ Cardano block from the Db-Sync database + /// Returns the latest _unstable_ Cardano block from Dolos pub async fn get_latest_block_info(&self) -> Result { - self.client.blocks_latest().await.and_then(from_block_content).map_err(|e| { + self.client.blocks_latest().await.map_err(|e| { DataSourceError::ExpectedDataNotFound(format!("No latest block on chain. {e}",)).into() - }) + }).and_then(from_block_content) } - /// Returns the latest _stable_ Cardano block from the Db-Sync database that is within + /// Returns the latest _stable_ Cardano block from Dolos that is within /// acceptable bounds from `reference_timestamp`, accounting for the additional stability /// offset configured by [block_stability_margin][Self::block_stability_margin]. pub async fn get_latest_stable_block_for( @@ -94,22 +94,14 @@ impl BlockDataSourceImpl { /// Configuration for [BlockDataSourceImpl] #[derive(Debug, Clone, Deserialize)] -pub struct DbSyncBlockDataSourceConfig { - /// Cardano security parameter, ie. the number of confirmations needed to stabilize a block - pub cardano_security_parameter: u32, - /// Expected fraction of Cardano slots that will have a block produced - /// - /// This value can be found in `shelley-genesis.json` file used by the Cardano node, - /// example: `"activeSlotsCoeff": 0.05`. - pub cardano_active_slots_coeff: f64, +pub struct DolosBlockDataSourceConfig { /// Additional offset applied when selecting the latest stable Cardano block /// - /// This parameter should be 0 by default and should only be increased to 1 in networks - /// struggling with frequent block rejections due to Db-Sync or Cardano node lag. + /// This parameter should be 1 by default. pub block_stability_margin: u32, } -impl DbSyncBlockDataSourceConfig { +impl DolosBlockDataSourceConfig { /// Reads the config from environment pub fn from_env() -> std::result::Result> { let config: Self = Figment::new() @@ -126,38 +118,39 @@ impl BlockDataSourceImpl { pub async fn new_from_env( client: MiniBFClient, ) -> std::result::Result> { - Ok(Self::from_config( + Self::from_config( client, - DbSyncBlockDataSourceConfig::from_env()?, + DolosBlockDataSourceConfig::from_env()?, &read_mc_epoch_config()?, - )) + ).await } /// Creates a new instance of [BlockDataSourceImpl], using passed configuration. - pub fn from_config( + pub async fn from_config( client: MiniBFClient, - DbSyncBlockDataSourceConfig { - cardano_security_parameter, - cardano_active_slots_coeff, + DolosBlockDataSourceConfig { block_stability_margin, - }: DbSyncBlockDataSourceConfig, + }: DolosBlockDataSourceConfig, mc_epoch_config: &MainchainEpochConfig, - ) -> BlockDataSourceImpl { - let k: f64 = cardano_security_parameter.into(); + ) -> Result { + let genesis = client.genesis().await?; + let active_slots_coeff = genesis.active_slots_coefficient; + let security_parameter = genesis.security_param as u32; + let k: f64 = security_parameter.into(); let slot_duration: f64 = mc_epoch_config.slot_duration_millis.millis() as f64; - let min_slot_boundary = (slot_duration * k / cardano_active_slots_coeff).round() as i64; + let min_slot_boundary = (slot_duration * k / active_slots_coeff).round() as i64; let max_slot_boundary = 3 * min_slot_boundary; let cache_size = 100; - BlockDataSourceImpl::new( - client, - cardano_security_parameter, - TimeDelta::milliseconds(min_slot_boundary), - TimeDelta::milliseconds(max_slot_boundary), - mc_epoch_config.clone(), - block_stability_margin, - cache_size, - BlocksCache::new_arc_mutex(), - ) + Ok(BlockDataSourceImpl::new( + client, + security_parameter, + TimeDelta::milliseconds(min_slot_boundary), + TimeDelta::milliseconds(max_slot_boundary), + mc_epoch_config.clone(), + block_stability_margin, + cache_size, + BlocksCache::new_arc_mutex(), + )) } async fn get_latest_block( &self, @@ -221,7 +214,7 @@ impl BlockDataSourceImpl { debug!("Block by hash: {hash} found in cache."); Ok(Some(From::from(block))) } else { - debug!("Block by hash: {hash}, not found in cache, serving from database."); + debug!("Block by hash: {hash}, not found in cache, serving from Dolos."); if let Some(block_by_hash) = self.get_stable_block_by_hash_from_db(hash, reference_timestamp).await? { @@ -276,6 +269,7 @@ impl BlockDataSourceImpl { let futures = (from_block_no.0..=to_block_no.0).map(|block_no| async move { self.client .blocks_by_id(McBlockNumber(block_no)) + .map_err(|e| e.into()) .await .and_then(from_block_content) }); diff --git a/toolkit/data-sources/dolos/src/client/api.rs b/toolkit/data-sources/dolos/src/client/api.rs index 3a0f63378d..99c2470cb5 100644 --- a/toolkit/data-sources/dolos/src/client/api.rs +++ b/toolkit/data-sources/dolos/src/client/api.rs @@ -1,12 +1,6 @@ use async_trait::async_trait; use blockfrost_openapi::models::{ - address_transactions_content_inner::AddressTransactionsContentInner, - address_utxo_content_inner::AddressUtxoContentInner, - asset_addresses_inner::AssetAddressesInner, asset_transactions_inner::AssetTransactionsInner, - block_content::BlockContent, epoch_param_content::EpochParamContent, - epoch_stake_pool_content_inner::EpochStakePoolContentInner, - pool_history_inner::PoolHistoryInner, pool_list_extended_inner::PoolListExtendedInner, - tx_content::TxContent, tx_content_utxo::TxContentUtxo, + address_transactions_content_inner::AddressTransactionsContentInner, address_utxo_content_inner::AddressUtxoContentInner, asset_addresses_inner::AssetAddressesInner, asset_transactions_inner::AssetTransactionsInner, block_content::BlockContent, epoch_param_content::EpochParamContent, epoch_stake_pool_content_inner::EpochStakePoolContentInner, genesis_content::GenesisContent, pool_history_inner::PoolHistoryInner, pool_list_extended_inner::PoolListExtendedInner, tx_content::TxContent, tx_content_utxo::TxContentUtxo }; use sidechain_domain::*; @@ -111,4 +105,7 @@ pub trait MiniBFApi { async fn transaction_by_hash(&self, tx_hash: McTxHash) -> Result; /// Return the inputs and UTXOs of the specific transaction. async fn transactions_utxos(&self, tx_hash: McTxHash) -> Result; + + /// Return the information about blockchain genesis. + async fn genesis(&self) -> Result; } diff --git a/toolkit/data-sources/dolos/src/client/conversions.rs b/toolkit/data-sources/dolos/src/client/conversions.rs index 18cc00106d..72747442e9 100644 --- a/toolkit/data-sources/dolos/src/client/conversions.rs +++ b/toolkit/data-sources/dolos/src/client/conversions.rs @@ -1,21 +1,22 @@ use blockfrost_openapi::models::block_content::BlockContent; use sidechain_domain::*; +use crate::{Result, DataSourceError}; -pub fn from_block_content(value: BlockContent) -> Result { +pub fn from_block_content(value: BlockContent) -> Result { Ok(MainchainBlock { number: value .height .map(|n| sidechain_domain::McBlockNumber(n as u32)) - .ok_or("number missing")?, + .ok_or(DataSourceError::InvalidData("number missing".to_string()))?, hash: McBlockHash::decode_hex(&value.hash)?, epoch: value .epoch .map(|n| sidechain_domain::McEpochNumber(n as u32)) - .ok_or("epoch missing")?, + .ok_or(DataSourceError::InvalidData("epoch missing".to_string()))?, slot: value .slot .map(|n| sidechain_domain::McSlotNumber(n as u64)) - .ok_or("slot missing")?, + .ok_or(DataSourceError::InvalidData("slot missing".to_string()))?, timestamp: value.time as u64, }) } diff --git a/toolkit/data-sources/dolos/src/client/minibf.rs b/toolkit/data-sources/dolos/src/client/minibf.rs index 8979c8acca..7f269db8ef 100644 --- a/toolkit/data-sources/dolos/src/client/minibf.rs +++ b/toolkit/data-sources/dolos/src/client/minibf.rs @@ -1,12 +1,6 @@ use async_trait::async_trait; use blockfrost_openapi::models::{ - address_transactions_content_inner::AddressTransactionsContentInner, - address_utxo_content_inner::AddressUtxoContentInner, - asset_addresses_inner::AssetAddressesInner, asset_transactions_inner::AssetTransactionsInner, - block_content::BlockContent, epoch_param_content::EpochParamContent, - epoch_stake_pool_content_inner::EpochStakePoolContentInner, - pool_history_inner::PoolHistoryInner, pool_list_extended_inner::PoolListExtendedInner, - tx_content::TxContent, tx_content_utxo::TxContentUtxo, + address_transactions_content_inner::AddressTransactionsContentInner, address_utxo_content_inner::AddressUtxoContentInner, asset_addresses_inner::AssetAddressesInner, asset_transactions_inner::AssetTransactionsInner, block_content::BlockContent, epoch_param_content::EpochParamContent, epoch_stake_pool_content_inner::EpochStakePoolContentInner, genesis_content::GenesisContent, pool_history_inner::PoolHistoryInner, pool_list_extended_inner::PoolListExtendedInner, tx_content::TxContent, tx_content_utxo::TxContentUtxo }; use serde::de::DeserializeOwned; use sidechain_domain::*; @@ -188,6 +182,10 @@ impl MiniBFApi for MiniBFClient { async fn transactions_utxos(&self, tx_hash: McTxHash) -> Result { self.request(&format!("txs/{tx_hash}/utxos")).await } + + async fn genesis(&self) -> Result { + self.request(&format!("genesis")).await + } } fn format_asset_id(asset_id: &AssetId) -> String { diff --git a/toolkit/data-sources/dolos/src/lib.rs b/toolkit/data-sources/dolos/src/lib.rs index fc97095746..7db15b3b1e 100644 --- a/toolkit/data-sources/dolos/src/lib.rs +++ b/toolkit/data-sources/dolos/src/lib.rs @@ -45,7 +45,7 @@ pub enum DataSourceError { /// Indicates that Dolos rejected a request as invalid #[error("Bad request: `{0}`.")] BadRequest(String), - /// Indicates that an internal error occured when querying Dolos + /// Indicates that an internal error occurred when querying Dolos #[error("Internal error of data source: `{0}`.")] InternalDataSourceError(String), /// Indicates that expected data was not found when querying Dolos From 8387adb32bbd461b0b7cee7f0f9e78b4f8dc0761 Mon Sep 17 00:00:00 2001 From: Krisztian Pinter Date: Thu, 13 Nov 2025 15:09:31 +0100 Subject: [PATCH 07/12] better errors --- toolkit/data-sources/dolos/src/block.rs | 43 +++++---- toolkit/data-sources/dolos/src/client/api.rs | 64 +++++++++---- .../data-sources/dolos/src/client/minibf.rs | 93 +++++++++++++------ toolkit/data-sources/dolos/src/lib.rs | 9 +- 4 files changed, 135 insertions(+), 74 deletions(-) diff --git a/toolkit/data-sources/dolos/src/block.rs b/toolkit/data-sources/dolos/src/block.rs index 21981f3d75..849d8c79de 100644 --- a/toolkit/data-sources/dolos/src/block.rs +++ b/toolkit/data-sources/dolos/src/block.rs @@ -6,7 +6,6 @@ use crate::{ use chrono::{DateTime, NaiveDateTime, TimeDelta}; use derive_new::new; use figment::{Figment, providers::Env}; -use futures::TryFutureExt; use log::{debug, info}; use serde::Deserialize; use sidechain_domain::mainchain_epoch::{MainchainEpochConfig, MainchainEpochDerivation}; @@ -46,9 +45,14 @@ pub struct BlockDataSourceImpl { impl BlockDataSourceImpl { /// Returns the latest _unstable_ Cardano block from Dolos pub async fn get_latest_block_info(&self) -> Result { - self.client.blocks_latest().await.map_err(|e| { - DataSourceError::ExpectedDataNotFound(format!("No latest block on chain. {e}",)).into() - }).and_then(from_block_content) + self.client + .blocks_latest() + .await + .map_err(|e| { + DataSourceError::ExpectedDataNotFound(format!("No latest block on chain. {e}",)) + .into() + }) + .and_then(from_block_content) } /// Returns the latest _stable_ Cardano block from Dolos that is within @@ -118,19 +122,14 @@ impl BlockDataSourceImpl { pub async fn new_from_env( client: MiniBFClient, ) -> std::result::Result> { - Self::from_config( - client, - DolosBlockDataSourceConfig::from_env()?, - &read_mc_epoch_config()?, - ).await + Self::from_config(client, DolosBlockDataSourceConfig::from_env()?, &read_mc_epoch_config()?) + .await } /// Creates a new instance of [BlockDataSourceImpl], using passed configuration. pub async fn from_config( client: MiniBFClient, - DolosBlockDataSourceConfig { - block_stability_margin, - }: DolosBlockDataSourceConfig, + DolosBlockDataSourceConfig { block_stability_margin }: DolosBlockDataSourceConfig, mc_epoch_config: &MainchainEpochConfig, ) -> Result { let genesis = client.genesis().await?; @@ -142,15 +141,15 @@ impl BlockDataSourceImpl { let max_slot_boundary = 3 * min_slot_boundary; let cache_size = 100; Ok(BlockDataSourceImpl::new( - client, - security_parameter, - TimeDelta::milliseconds(min_slot_boundary), - TimeDelta::milliseconds(max_slot_boundary), - mc_epoch_config.clone(), - block_stability_margin, - cache_size, - BlocksCache::new_arc_mutex(), - )) + client, + security_parameter, + TimeDelta::milliseconds(min_slot_boundary), + TimeDelta::milliseconds(max_slot_boundary), + mc_epoch_config.clone(), + block_stability_margin, + cache_size, + BlocksCache::new_arc_mutex(), + )) } async fn get_latest_block( &self, @@ -269,8 +268,8 @@ impl BlockDataSourceImpl { let futures = (from_block_no.0..=to_block_no.0).map(|block_no| async move { self.client .blocks_by_id(McBlockNumber(block_no)) - .map_err(|e| e.into()) .await + .map_err(|e| e.into()) .and_then(from_block_content) }); futures::future::try_join_all(futures).await?.into_iter().collect() diff --git a/toolkit/data-sources/dolos/src/client/api.rs b/toolkit/data-sources/dolos/src/client/api.rs index 99c2470cb5..cf0786a1b6 100644 --- a/toolkit/data-sources/dolos/src/client/api.rs +++ b/toolkit/data-sources/dolos/src/client/api.rs @@ -1,9 +1,17 @@ use async_trait::async_trait; use blockfrost_openapi::models::{ - address_transactions_content_inner::AddressTransactionsContentInner, address_utxo_content_inner::AddressUtxoContentInner, asset_addresses_inner::AssetAddressesInner, asset_transactions_inner::AssetTransactionsInner, block_content::BlockContent, epoch_param_content::EpochParamContent, epoch_stake_pool_content_inner::EpochStakePoolContentInner, genesis_content::GenesisContent, pool_history_inner::PoolHistoryInner, pool_list_extended_inner::PoolListExtendedInner, tx_content::TxContent, tx_content_utxo::TxContentUtxo + address_transactions_content_inner::AddressTransactionsContentInner, + address_utxo_content_inner::AddressUtxoContentInner, + asset_addresses_inner::AssetAddressesInner, asset_transactions_inner::AssetTransactionsInner, + block_content::BlockContent, epoch_param_content::EpochParamContent, + epoch_stake_pool_content_inner::EpochStakePoolContentInner, genesis_content::GenesisContent, + pool_history_inner::PoolHistoryInner, pool_list_extended_inner::PoolListExtendedInner, + tx_content::TxContent, tx_content_utxo::TxContentUtxo, }; use sidechain_domain::*; +use crate::DataSourceError; + /// Mainchain block id, either a block hash or a block number pub enum McBlockId { /// Domain type Mainchain block hash @@ -49,63 +57,79 @@ pub trait MiniBFApi { async fn addresses_utxos( &self, address: MainchainAddress, - ) -> Result, String>; + ) -> Result, DataSourceError>; /// Transactions on the address. async fn addresses_transactions( &self, address: MainchainAddress, - ) -> Result, String>; + ) -> Result, DataSourceError>; /// List of a specific asset transactions. async fn assets_transactions( &self, asset_id: AssetId, - ) -> Result, String>; + ) -> Result, DataSourceError>; /// List of a addresses containing a specific asset. - async fn assets_addresses(&self, asset_id: AssetId) - -> Result, String>; + async fn assets_addresses( + &self, + asset_id: AssetId, + ) -> Result, DataSourceError>; /// Return the latest block available to the backends, also known as the tip of the blockchain. - async fn blocks_latest(&self) -> Result; + async fn blocks_latest(&self) -> Result; /// Return the content of a requested block. - async fn blocks_by_id(&self, id: impl Into + Send) -> Result; + async fn blocks_by_id( + &self, + id: impl Into + Send, + ) -> Result; /// Return the content of a requested block for a specific slot. - async fn blocks_slot(&self, slot_number: McSlotNumber) -> Result; + async fn blocks_slot(&self, slot_number: McSlotNumber) + -> Result; /// Return the list of blocks following a specific block. async fn blocks_next( &self, hash: impl Into + Send, - ) -> Result, String>; + ) -> Result, DataSourceError>; /// Return the transactions within the block. - async fn blocks_txs(&self, id: impl Into + Send) -> Result, String>; + async fn blocks_txs( + &self, + id: impl Into + Send, + ) -> Result, DataSourceError>; /// Return the blocks minted for the epoch specified. - async fn epochs_blocks(&self, epoch_number: McEpochNumber) -> Result, String>; + async fn epochs_blocks( + &self, + epoch_number: McEpochNumber, + ) -> Result, DataSourceError>; /// Return the protocol parameters for the epoch specified. async fn epochs_parameters( &self, epoch_number: McEpochNumber, - ) -> Result; + ) -> Result; /// Return the active stake distribution for the epoch specified by stake pool. async fn epochs_stakes_by_pool( &self, epoch_number: McEpochNumber, pool_id: &str, - ) -> Result, String>; + ) -> Result, DataSourceError>; /// History of stake pool parameters over epochs. - async fn pools_history(&self, pool_id: &str) -> Result, String>; + async fn pools_history(&self, pool_id: &str) -> Result, DataSourceError>; /// List of registered stake pools with additional information. - async fn pools_extended(&self) -> Result, String>; + async fn pools_extended(&self) -> Result, DataSourceError>; /// Query JSON value of a datum by its hash. - async fn scripts_datum_hash(&self, datum_hash: &str) -> Result, String>; + async fn scripts_datum_hash( + &self, + datum_hash: &str, + ) -> Result, DataSourceError>; /// Return content of the requested transaction. - async fn transaction_by_hash(&self, tx_hash: McTxHash) -> Result; + async fn transaction_by_hash(&self, tx_hash: McTxHash) -> Result; /// Return the inputs and UTXOs of the specific transaction. - async fn transactions_utxos(&self, tx_hash: McTxHash) -> Result; + async fn transactions_utxos(&self, tx_hash: McTxHash) + -> Result; /// Return the information about blockchain genesis. - async fn genesis(&self) -> Result; + async fn genesis(&self) -> Result; } diff --git a/toolkit/data-sources/dolos/src/client/minibf.rs b/toolkit/data-sources/dolos/src/client/minibf.rs index 7f269db8ef..191a2da513 100644 --- a/toolkit/data-sources/dolos/src/client/minibf.rs +++ b/toolkit/data-sources/dolos/src/client/minibf.rs @@ -1,13 +1,22 @@ use async_trait::async_trait; use blockfrost_openapi::models::{ - address_transactions_content_inner::AddressTransactionsContentInner, address_utxo_content_inner::AddressUtxoContentInner, asset_addresses_inner::AssetAddressesInner, asset_transactions_inner::AssetTransactionsInner, block_content::BlockContent, epoch_param_content::EpochParamContent, epoch_stake_pool_content_inner::EpochStakePoolContentInner, genesis_content::GenesisContent, pool_history_inner::PoolHistoryInner, pool_list_extended_inner::PoolListExtendedInner, tx_content::TxContent, tx_content_utxo::TxContentUtxo + address_transactions_content_inner::AddressTransactionsContentInner, + address_utxo_content_inner::AddressUtxoContentInner, + asset_addresses_inner::AssetAddressesInner, asset_transactions_inner::AssetTransactionsInner, + block_content::BlockContent, epoch_param_content::EpochParamContent, + epoch_stake_pool_content_inner::EpochStakePoolContentInner, genesis_content::GenesisContent, + pool_history_inner::PoolHistoryInner, pool_list_extended_inner::PoolListExtendedInner, + tx_content::TxContent, tx_content_utxo::TxContentUtxo, }; use serde::de::DeserializeOwned; use sidechain_domain::*; use std::time::Duration; use ureq::Agent; -use crate::client::api::{McBlockId, MiniBFApi}; +use crate::{ + DataSourceError, + client::api::{McBlockId, MiniBFApi}, +}; /// Client implementing Dolos MiniBF #[derive(Clone)] @@ -25,15 +34,19 @@ impl MiniBFClient { async fn request( &self, method: &str, - ) -> Result { + ) -> Result { let req = format!("{}/{}", self.addr, method); log::trace!("Dolos request: {req:?}"); let resp = self .agent .get(req) .call() - .map_err(|e| e.to_string()) - .and_then(|mut r| r.body_mut().read_json().map_err(|e| e.to_string())); + .map_err(|e| DataSourceError::DolosCallError(e.to_string())) + .and_then(|mut r| { + r.body_mut() + .read_json() + .map_err(|e| DataSourceError::DolosResponseParseError(e.to_string())) + }); log::trace!("Dolos response: {resp:?}"); resp } @@ -42,7 +55,7 @@ impl MiniBFClient { &self, method: &str, pagination: Pagination, - ) -> Result, String> { + ) -> Result, DataSourceError> { let mut query_pairs = url::form_urlencoded::Serializer::new(String::new()); query_pairs.extend_pairs([ ("count", &pagination.count.to_string()), @@ -56,15 +69,19 @@ impl MiniBFClient { query_pairs.append_pair("to", &to); } let mut req_url = - url::Url::parse(&format!("{}/{}", self.addr, method)).map_err(|e| e.to_string())?; + url::Url::parse(&format!("{}/{}", self.addr, method)).expect("valid Dolos url"); req_url.set_query(Some(&query_pairs.finish())); log::trace!("Dolos request: {req_url:?}"); let resp = self .agent .get(req_url.as_str()) .call() - .map_err(|e| e.to_string()) - .and_then(|mut r| r.body_mut().read_json().map_err(|e| e.to_string())); + .map_err(|e| DataSourceError::DolosCallError(e.to_string())) + .and_then(|mut r| { + r.body_mut() + .read_json() + .map_err(|e| DataSourceError::DolosResponseParseError(e.to_string())) + }); log::trace!("Dolos response: {resp:?}"); resp } @@ -72,7 +89,7 @@ impl MiniBFClient { async fn paginated_request_all( &self, method: &str, - ) -> Result, String> { + ) -> Result, DataSourceError> { let mut pagination: Pagination = Pagination::default(); let mut have_all_pages = false; let mut res = Vec::new(); @@ -93,21 +110,21 @@ impl MiniBFApi for MiniBFClient { async fn addresses_utxos( &self, address: MainchainAddress, - ) -> Result, String> { + ) -> Result, DataSourceError> { self.paginated_request_all(&format!("addresses/{address}/utxos")).await } async fn addresses_transactions( &self, address: MainchainAddress, - ) -> Result, String> { + ) -> Result, DataSourceError> { self.paginated_request_all(&format!("addresses/{address}/transactions")).await } async fn assets_transactions( &self, asset_id: AssetId, - ) -> Result, String> { + ) -> Result, DataSourceError> { let asset_id_str = format_asset_id(&asset_id); self.paginated_request_all(&format!("assets/{asset_id_str}/transactions")).await } @@ -115,76 +132,94 @@ impl MiniBFApi for MiniBFClient { async fn assets_addresses( &self, asset_id: AssetId, - ) -> Result, String> { + ) -> Result, DataSourceError> { let asset_id_str = format_asset_id(&asset_id); self.paginated_request_all(&format!("assets/{asset_id_str}/addresses")).await } - async fn blocks_latest(&self) -> Result { + async fn blocks_latest(&self) -> Result { self.request("blocks/latest").await } - async fn blocks_by_id(&self, id: impl Into + Send) -> Result { + async fn blocks_by_id( + &self, + id: impl Into + Send, + ) -> Result { let id: McBlockId = id.into(); self.request(&format!("blocks/{id}")).await } - async fn blocks_slot(&self, slot_number: McSlotNumber) -> Result { + async fn blocks_slot( + &self, + slot_number: McSlotNumber, + ) -> Result { self.request(&format!("blocks/slot/{slot_number}")).await } async fn blocks_next( &self, id: impl Into + Send, - ) -> Result, String> { + ) -> Result, DataSourceError> { let id: McBlockId = id.into(); self.request(&format!("blocks/{id}/next")).await } - async fn blocks_txs(&self, id: impl Into + Send) -> Result, String> { + async fn blocks_txs( + &self, + id: impl Into + Send, + ) -> Result, DataSourceError> { let id: McBlockId = id.into(); self.request(&format!("blocks/{id}/txs")).await } - async fn epochs_blocks(&self, epoch_number: McEpochNumber) -> Result, String> { + async fn epochs_blocks( + &self, + epoch_number: McEpochNumber, + ) -> Result, DataSourceError> { self.paginated_request_all(&format!("epochs/{epoch_number}/blocks")).await } async fn epochs_parameters( &self, epoch_number: McEpochNumber, - ) -> Result { + ) -> Result { self.request(&format!("epochs/{epoch_number}/parameters")).await } async fn epochs_stakes_by_pool( &self, epoch_number: McEpochNumber, pool_id: &str, - ) -> Result, String> { + ) -> Result, DataSourceError> { self.paginated_request_all(&format!("epochs/{epoch_number}/stakes/{pool_id}")) .await } - async fn pools_history(&self, pool_id: &str) -> Result, String> { + async fn pools_history(&self, pool_id: &str) -> Result, DataSourceError> { self.paginated_request_all(&format!("pools/{pool_id}/history")).await } - async fn pools_extended(&self) -> Result, String> { + async fn pools_extended(&self) -> Result, DataSourceError> { self.paginated_request_all("pools/extended").await } - async fn scripts_datum_hash(&self, datum_hash: &str) -> Result, String> { + async fn scripts_datum_hash( + &self, + datum_hash: &str, + ) -> Result, DataSourceError> { self.request(&format!("scripts/datum/{datum_hash}")).await } - async fn transaction_by_hash(&self, tx_hash: McTxHash) -> Result { + async fn transaction_by_hash(&self, tx_hash: McTxHash) -> Result { self.request(&format!("txs/{tx_hash}")).await } - async fn transactions_utxos(&self, tx_hash: McTxHash) -> Result { + async fn transactions_utxos( + &self, + tx_hash: McTxHash, + ) -> Result { self.request(&format!("txs/{tx_hash}/utxos")).await } - async fn genesis(&self) -> Result { - self.request(&format!("genesis")).await + async fn genesis(&self) -> Result { + self.request("genesis").await } } diff --git a/toolkit/data-sources/dolos/src/lib.rs b/toolkit/data-sources/dolos/src/lib.rs index 7db15b3b1e..abc53610db 100644 --- a/toolkit/data-sources/dolos/src/lib.rs +++ b/toolkit/data-sources/dolos/src/lib.rs @@ -45,9 +45,12 @@ pub enum DataSourceError { /// Indicates that Dolos rejected a request as invalid #[error("Bad request: `{0}`.")] BadRequest(String), - /// Indicates that an internal error occurred when querying Dolos - #[error("Internal error of data source: `{0}`.")] - InternalDataSourceError(String), + /// Indicates that Dolos client produced an error while calling endpoint + #[error("Dolos client call error: `{0}`.")] + DolosCallError(String), + /// Indicates that Dolos client produced an error while parsing response + #[error("Dolos client response parse error: `{0}`.")] + DolosResponseParseError(String), /// Indicates that expected data was not found when querying Dolos #[error( "'{0}' not found. Possible causes: data source configuration error, Dolos not synced fully, or data not set on the main chain." From 1779b05ed166375e5278d6e917db5cb5165951dc Mon Sep 17 00:00:00 2001 From: Krisztian Pinter Date: Thu, 13 Nov 2025 15:11:25 +0100 Subject: [PATCH 08/12] upgrade to dolos rc2 --- dev/local-environment/setup.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev/local-environment/setup.sh b/dev/local-environment/setup.sh index 3e80cce01d..8b58af0439 100755 --- a/dev/local-environment/setup.sh +++ b/dev/local-environment/setup.sh @@ -3,7 +3,7 @@ PARTNER_CHAINS_NODE_IMAGE="ghcr.io/input-output-hk/partner-chains/partner-chains-node-unstable:latest" CARDANO_IMAGE="ghcr.io/intersectmbo/cardano-node:10.5.1" DBSYNC_IMAGE="ghcr.io/intersectmbo/cardano-db-sync:13.6.0.5" -DOLOS_IMAGE="ghcr.io/txpipe/dolos:1.0.0-rc.1" +DOLOS_IMAGE="ghcr.io/txpipe/dolos:1.0.0-rc.2" OGMIOS_IMAGE="cardanosolutions/ogmios:v6.13.0" POSTGRES_IMAGE="postgres:17.2" TESTS_IMAGE="python:3.12-slim" From d37358019fbd2c75c5f745b5326101eed5e7a0d3 Mon Sep 17 00:00:00 2001 From: Krisztian Pinter Date: Thu, 13 Nov 2025 15:19:23 +0100 Subject: [PATCH 09/12] formatting --- toolkit/data-sources/dolos/src/client/conversions.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/toolkit/data-sources/dolos/src/client/conversions.rs b/toolkit/data-sources/dolos/src/client/conversions.rs index 72747442e9..a31a21154e 100644 --- a/toolkit/data-sources/dolos/src/client/conversions.rs +++ b/toolkit/data-sources/dolos/src/client/conversions.rs @@ -1,6 +1,6 @@ +use crate::{DataSourceError, Result}; use blockfrost_openapi::models::block_content::BlockContent; use sidechain_domain::*; -use crate::{Result, DataSourceError}; pub fn from_block_content(value: BlockContent) -> Result { Ok(MainchainBlock { From f1f054467ffa5e1fc9e2c449bed5401769a9628c Mon Sep 17 00:00:00 2001 From: Krisztian Pinter Date: Fri, 14 Nov 2025 14:25:59 +0100 Subject: [PATCH 10/12] fix --- toolkit/data-sources/dolos/src/candidate.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/toolkit/data-sources/dolos/src/candidate.rs b/toolkit/data-sources/dolos/src/candidate.rs index 1c7578cccc..fc230ba378 100644 --- a/toolkit/data-sources/dolos/src/candidate.rs +++ b/toolkit/data-sources/dolos/src/candidate.rs @@ -108,7 +108,7 @@ impl AuthoritySelectionDataSource for AuthoritySelectionDataSourceImpl { let pools = self.client.pools_extended().await?; let pred = |pool: PoolListExtendedInner| async move { let history = self.client.pools_history(&pool.pool_id).await?; - Result::Ok(match history.into_iter().find(|h| h.epoch <= epoch.0 as i32) { + Result::Ok(match history.into_iter().find(|h| h.epoch == epoch.0 as i32) { Some(e) => Some(( MainchainKeyHash::decode_hex(&pool.pool_id)?, StakeDelegation(e.active_stake.parse::()?), From 6a419e6a5fb765d0776c7f208da4b0d115fe5367 Mon Sep 17 00:00:00 2001 From: Krisztian Pinter Date: Fri, 14 Nov 2025 15:44:22 +0100 Subject: [PATCH 11/12] api cleanup --- Cargo.lock | 1 + toolkit/data-sources/dolos/Cargo.toml | 1 + toolkit/data-sources/dolos/src/candidate.rs | 13 +++-- toolkit/data-sources/dolos/src/client/api.rs | 56 ++++++++++++------- .../data-sources/dolos/src/client/minibf.rs | 13 +++-- 5 files changed, 55 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3d74c3df0c..96e3f25d9b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7579,6 +7579,7 @@ version = "1.8.0" dependencies = [ "async-trait", "authority-selection-inherents", + "bech32 0.11.0", "blockfrost-openapi", "cardano-serialization-lib", "chrono", diff --git a/toolkit/data-sources/dolos/Cargo.toml b/toolkit/data-sources/dolos/Cargo.toml index f9b76b27ab..cf4fb8a9f0 100644 --- a/toolkit/data-sources/dolos/Cargo.toml +++ b/toolkit/data-sources/dolos/Cargo.toml @@ -42,6 +42,7 @@ figment = { workspace = true } tokio = { workspace = true } derive-new = { workspace = true } chrono = { workspace = true } +bech32 = { workspace = true } [features] default = [] diff --git a/toolkit/data-sources/dolos/src/candidate.rs b/toolkit/data-sources/dolos/src/candidate.rs index fc230ba378..64edf3cb50 100644 --- a/toolkit/data-sources/dolos/src/candidate.rs +++ b/toolkit/data-sources/dolos/src/candidate.rs @@ -107,12 +107,10 @@ impl AuthoritySelectionDataSource for AuthoritySelectionDataSourceImpl { let candidates = self.get_registered_candidates(epoch, committee_candidate_address).await?; let pools = self.client.pools_extended().await?; let pred = |pool: PoolListExtendedInner| async move { - let history = self.client.pools_history(&pool.pool_id).await?; + let pool_id = mckeyhash_from_bech32(&pool.pool_id)?; + let history = self.client.pools_history(pool_id).await?; Result::Ok(match history.into_iter().find(|h| h.epoch == epoch.0 as i32) { - Some(e) => Some(( - MainchainKeyHash::decode_hex(&pool.pool_id)?, - StakeDelegation(e.active_stake.parse::()?), - )), + Some(e) => Some((pool_id, StakeDelegation(e.active_stake.parse::()?))), None => None, }) }; @@ -147,6 +145,11 @@ impl AuthoritySelectionDataSource for AuthoritySelectionDataSourceImpl { } } +fn mckeyhash_from_bech32(bech32_str: &str) -> Result { + let (_hrp, val) = bech32::decode(bech32_str).map_err(|e| e.to_string())?; + Ok(MainchainKeyHash(val.try_into().map_err(|_| "failed to convert vec to array")?)) +} + #[derive(Debug)] struct RegisteredCandidate { stake_pool_pub_key: StakePoolPublicKey, diff --git a/toolkit/data-sources/dolos/src/client/api.rs b/toolkit/data-sources/dolos/src/client/api.rs index cf0786a1b6..4513e3c418 100644 --- a/toolkit/data-sources/dolos/src/client/api.rs +++ b/toolkit/data-sources/dolos/src/client/api.rs @@ -13,40 +13,53 @@ use sidechain_domain::*; use crate::DataSourceError; /// Mainchain block id, either a block hash or a block number -pub enum McBlockId { - /// Domain type Mainchain block hash - McBlockHash(McBlockHash), - /// Domain type Mainchain block number - McBlockNumber(McBlockNumber), - /// Mainchain block hash returned as string by Blockfrost API - Raw(String), -} +pub struct McBlockId(String); impl From for McBlockId { fn from(value: McBlockHash) -> Self { - McBlockId::McBlockHash(value) + McBlockId(value.to_string()) } } impl From for McBlockId { fn from(value: McBlockNumber) -> Self { - McBlockId::McBlockNumber(value) + McBlockId(value.to_string()) } } impl From for McBlockId { fn from(value: String) -> Self { - McBlockId::Raw(value) + McBlockId(value) } } impl std::fmt::Display for McBlockId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - McBlockId::McBlockHash(mc_block_hash) => mc_block_hash.fmt(f), - McBlockId::McBlockNumber(mc_block_number) => mc_block_number.fmt(f), - McBlockId::Raw(str) => str.fmt(f), - } + self.0.fmt(f) + } +} + +/// Mainchain pool id as a bech32 string +pub struct McPoolId(String); + +impl From for McPoolId { + fn from(value: MainchainKeyHash) -> Self { + let pool_id = + bech32::encode::(bech32::Hrp::parse_unchecked("pool"), &value.0) + .expect("MainchainKeyHash is valid"); + McPoolId(pool_id) + } +} + +impl From for McPoolId { + fn from(value: String) -> Self { + McPoolId(value) + } +} + +impl std::fmt::Display for McPoolId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) } } @@ -64,12 +77,12 @@ pub trait MiniBFApi { address: MainchainAddress, ) -> Result, DataSourceError>; - /// List of a specific asset transactions. + /// List of specific asset transactions. async fn assets_transactions( &self, asset_id: AssetId, ) -> Result, DataSourceError>; - /// List of a addresses containing a specific asset. + /// List of addresses containing a specific asset. async fn assets_addresses( &self, asset_id: AssetId, @@ -110,11 +123,14 @@ pub trait MiniBFApi { async fn epochs_stakes_by_pool( &self, epoch_number: McEpochNumber, - pool_id: &str, + pool_id: impl Into + Send, ) -> Result, DataSourceError>; /// History of stake pool parameters over epochs. - async fn pools_history(&self, pool_id: &str) -> Result, DataSourceError>; + async fn pools_history( + &self, + pool_id: impl Into + Send, + ) -> Result, DataSourceError>; /// List of registered stake pools with additional information. async fn pools_extended(&self) -> Result, DataSourceError>; diff --git a/toolkit/data-sources/dolos/src/client/minibf.rs b/toolkit/data-sources/dolos/src/client/minibf.rs index 191a2da513..910561fde4 100644 --- a/toolkit/data-sources/dolos/src/client/minibf.rs +++ b/toolkit/data-sources/dolos/src/client/minibf.rs @@ -15,7 +15,7 @@ use ureq::Agent; use crate::{ DataSourceError, - client::api::{McBlockId, MiniBFApi}, + client::api::{McBlockId, McPoolId, MiniBFApi}, }; /// Client implementing Dolos MiniBF @@ -187,13 +187,18 @@ impl MiniBFApi for MiniBFClient { async fn epochs_stakes_by_pool( &self, epoch_number: McEpochNumber, - pool_id: &str, + pool_id: impl Into + Send, ) -> Result, DataSourceError> { + let pool_id: McPoolId = pool_id.into(); self.paginated_request_all(&format!("epochs/{epoch_number}/stakes/{pool_id}")) .await } - async fn pools_history(&self, pool_id: &str) -> Result, DataSourceError> { + async fn pools_history( + &self, + pool_id: impl Into + Send, + ) -> Result, DataSourceError> { + let pool_id: McPoolId = pool_id.into(); self.paginated_request_all(&format!("pools/{pool_id}/history")).await } async fn pools_extended(&self) -> Result, DataSourceError> { @@ -223,7 +228,7 @@ impl MiniBFApi for MiniBFClient { } } -fn format_asset_id(asset_id: &AssetId) -> String { +pub fn format_asset_id(asset_id: &AssetId) -> String { let AssetId { policy_id, asset_name } = asset_id; format!("{}{}", &policy_id.to_hex_string()[2..], &asset_name.to_hex_string()[2..]) } From c0b450ea7ea5b27aaed0c1b596637172f0af9184 Mon Sep 17 00:00:00 2001 From: Krisztian Pinter Date: Thu, 6 Nov 2025 10:46:54 +0100 Subject: [PATCH 12/12] fixes --- toolkit/data-sources/dolos/src/lib.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/toolkit/data-sources/dolos/src/lib.rs b/toolkit/data-sources/dolos/src/lib.rs index abc53610db..f24211f6e8 100644 --- a/toolkit/data-sources/dolos/src/lib.rs +++ b/toolkit/data-sources/dolos/src/lib.rs @@ -68,6 +68,7 @@ pub enum DataSourceError { /// # Environment variables read: /// - `DOLOS_MINIBF_URL`: Dolos MiniBF client, eg. `localhost:3000` pub fn get_connection_from_env() -> Result { + log::warn!("Dolos data sources are still WIP and should not be used in production"); let config = ConnectionConfig::from_env()?; Ok(MiniBFClient::new(config.dolos_minibf_url.as_str(), std::time::Duration::from_secs(30))) } @@ -80,12 +81,12 @@ pub struct ConnectionConfig { } impl ConnectionConfig { - /// Reads Postgres connection config from the environment + /// Reads Dolos connection config from the environment pub fn from_env() -> Result { let config: Self = figment::Figment::new() .merge(figment::providers::Env::raw()) .extract() - .map_err(|e| format!("Failed to read postgres data source connection: {e}"))?; + .map_err(|e| format!("Failed to read Dolos data source connection: {e}"))?; Ok(config) } }