diff --git a/Cargo.lock b/Cargo.lock index 6fc9bb257f..96e3f25d9b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7579,8 +7579,11 @@ version = "1.8.0" dependencies = [ "async-trait", "authority-selection-inherents", + "bech32 0.11.0", "blockfrost-openapi", "cardano-serialization-lib", + "chrono", + "derive-new", "figment", "futures", "itertools 0.14.0", diff --git a/Cargo.toml b/Cargo.toml index 760def5156..40a957fd2b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -188,6 +188,7 @@ fork-tree = { version = "13.0.1" } ureq = { version = "3.1.2", default-features = false } url = { version = "2.5.7", default-features = false } blockfrost-openapi = { version = "0.1.75", default-features = false } +chrono = { version = "0.4.31", default-features = false } # substrate dependencies frame-benchmarking = { default-features = false, git = "https://github.com/paritytech/polkadot-sdk.git", tag = "polkadot-stable2509" } diff --git a/demo/node/src/data_sources.rs b/demo/node/src/data_sources.rs index faec588e96..14a1649660 100644 --- a/demo/node/src/data_sources.rs +++ b/demo/node/src/data_sources.rs @@ -107,10 +107,14 @@ pub async fn create_dolos_data_sources( ) -> std::result::Result> { let dolos_client = partner_chains_dolos_data_sources::get_connection_from_env()?; let pool = partner_chains_db_sync_data_sources::get_connection_from_env().await?; - let block = Arc::new( + let block_dbsync = Arc::new( partner_chains_db_sync_data_sources::BlockDataSourceImpl::new_from_env(pool.clone()) .await?, ); + let block_dolos = Arc::new( + partner_chains_dolos_data_sources::BlockDataSourceImpl::new_from_env(dolos_client.clone()) + .await?, + ); Ok(DataSources { sidechain_rpc: Arc::new( partner_chains_dolos_data_sources::SidechainRpcDataSourceImpl::new( @@ -118,7 +122,7 @@ pub async fn create_dolos_data_sources( ), ), mc_hash: Arc::new(partner_chains_dolos_data_sources::McHashDataSourceImpl::new( - dolos_client.clone(), + block_dolos.clone(), )), authority_selection: Arc::new( partner_chains_dolos_data_sources::AuthoritySelectionDataSourceImpl::new( @@ -137,7 +141,7 @@ pub async fn create_dolos_data_sources( pool.clone(), metrics_opt.clone(), GOVERNED_MAP_CACHE_SIZE, - block.clone(), + block_dbsync.clone(), ) .await?, ), @@ -145,7 +149,7 @@ pub async fn create_dolos_data_sources( partner_chains_db_sync_data_sources::CachedTokenBridgeDataSourceImpl::new( pool, metrics_opt, - block, + block_dbsync, BRIDGE_TRANSFER_CACHE_LOOKAHEAD, ), ), diff --git a/dev/local-environment/setup.sh b/dev/local-environment/setup.sh index fee18cf16a..8b58af0439 100755 --- a/dev/local-environment/setup.sh +++ b/dev/local-environment/setup.sh @@ -3,7 +3,7 @@ PARTNER_CHAINS_NODE_IMAGE="ghcr.io/input-output-hk/partner-chains/partner-chains-node-unstable:latest" CARDANO_IMAGE="ghcr.io/intersectmbo/cardano-node:10.5.1" DBSYNC_IMAGE="ghcr.io/intersectmbo/cardano-db-sync:13.6.0.5" -DOLOS_IMAGE="ghcr.io/txpipe/dolos:1.0.0-beta.8" +DOLOS_IMAGE="ghcr.io/txpipe/dolos:1.0.0-rc.2" OGMIOS_IMAGE="cardanosolutions/ogmios:v6.13.0" POSTGRES_IMAGE="postgres:17.2" TESTS_IMAGE="python:3.12-slim" diff --git a/toolkit/data-sources/db-sync/Cargo.toml b/toolkit/data-sources/db-sync/Cargo.toml index 80d63fbc0a..671c7b194d 100644 --- a/toolkit/data-sources/db-sync/Cargo.toml +++ b/toolkit/data-sources/db-sync/Cargo.toml @@ -16,7 +16,7 @@ sqlx = { workspace = true } db-sync-sqlx = { workspace = true } tokio = { workspace = true, features = ["full"] } futures = { workspace = true } -chrono = "0.4.31" +chrono = { workspace = true } hex = { workspace = true } hex-literal = { workspace = true } itertools = { workspace = true } diff --git a/toolkit/data-sources/db-sync/src/candidates/cached.rs b/toolkit/data-sources/db-sync/src/candidates/cached.rs index 6e0e61e18a..e2ebb886c0 100644 --- a/toolkit/data-sources/db-sync/src/candidates/cached.rs +++ b/toolkit/data-sources/db-sync/src/candidates/cached.rs @@ -20,6 +20,7 @@ pub type ArcMut = Arc>; type AriadneParametersCacheKey = (McEpochNumber, PolicyId, PolicyId); type CandidatesCacheKey = (McEpochNumber, String); +/// Cached candidate data source pub struct CandidateDataSourceCached { inner: CandidatesDataSourceImpl, get_ariadne_parameters_for_epoch_cache: @@ -96,6 +97,7 @@ impl CandidateDataSourceCacheConfig { } impl CandidateDataSourceCached { + /// Creates new instance of the data source pub fn new( inner: CandidatesDataSourceImpl, candidates_for_epoch_cache_size: usize, @@ -114,6 +116,7 @@ impl CandidateDataSourceCached { } } + /// Creates a new instance of the data source, reading configuration from the environment. pub fn new_from_env( inner: CandidatesDataSourceImpl, candidates_for_epoch_cache_size: usize, diff --git a/toolkit/data-sources/db-sync/src/candidates/mod.rs b/toolkit/data-sources/db-sync/src/candidates/mod.rs index 6196efe040..d59b421530 100644 --- a/toolkit/data-sources/db-sync/src/candidates/mod.rs +++ b/toolkit/data-sources/db-sync/src/candidates/mod.rs @@ -17,7 +17,7 @@ use sqlx::PgPool; use std::collections::HashMap; use std::error::Error; -mod cached; +pub mod cached; #[cfg(test)] mod tests; diff --git a/toolkit/data-sources/db-sync/src/lib.rs b/toolkit/data-sources/db-sync/src/lib.rs index 4c88d59db4..15446f09e4 100644 --- a/toolkit/data-sources/db-sync/src/lib.rs +++ b/toolkit/data-sources/db-sync/src/lib.rs @@ -103,7 +103,7 @@ pub use crate::block::{BlockDataSourceImpl, DbSyncBlockDataSourceConfig}; #[cfg(feature = "bridge")] pub use crate::bridge::{TokenBridgeDataSourceImpl, cache::CachedTokenBridgeDataSourceImpl}; #[cfg(feature = "candidate-source")] -pub use crate::candidates::CandidatesDataSourceImpl; +pub use crate::candidates::{CandidatesDataSourceImpl, cached::CandidateDataSourceCached}; #[cfg(feature = "governed-map")] pub use crate::governed_map::{GovernedMapDataSourceCachedImpl, GovernedMapDataSourceImpl}; #[cfg(feature = "mc-hash")] diff --git a/toolkit/data-sources/dolos/Cargo.toml b/toolkit/data-sources/dolos/Cargo.toml index b91d158b4c..cf4fb8a9f0 100644 --- a/toolkit/data-sources/dolos/Cargo.toml +++ b/toolkit/data-sources/dolos/Cargo.toml @@ -40,6 +40,9 @@ cardano-serialization-lib = { workspace = true } itertools = { workspace = true } figment = { workspace = true } tokio = { workspace = true } +derive-new = { workspace = true } +chrono = { workspace = true } +bech32 = { workspace = true } [features] default = [] diff --git a/toolkit/data-sources/dolos/src/block.rs b/toolkit/data-sources/dolos/src/block.rs new file mode 100644 index 0000000000..849d8c79de --- /dev/null +++ b/toolkit/data-sources/dolos/src/block.rs @@ -0,0 +1,333 @@ +use crate::{ + DataSourceError, Result, + client::{MiniBFClient, api::MiniBFApi, conversions::from_block_content}, + read_mc_epoch_config, +}; +use chrono::{DateTime, NaiveDateTime, TimeDelta}; +use derive_new::new; +use figment::{Figment, providers::Env}; +use log::{debug, info}; +use serde::Deserialize; +use sidechain_domain::mainchain_epoch::{MainchainEpochConfig, MainchainEpochDerivation}; +use sidechain_domain::*; +use sp_timestamp::Timestamp; +use std::{ + error::Error, + sync::{Arc, Mutex}, +}; + +#[derive(new)] +pub struct BlockDataSourceImpl { + /// MiniBF client + client: MiniBFClient, + /// Cardano security parameter + /// + /// This parameter controls how many confirmations (blocks on top) are required by + /// the Cardano node to consider a block to be stable. This is a network-wide parameter. + security_parameter: u32, + /// Minimal age of a block to be considered valid stable in relation to some given timestamp. + /// Must be equal to `security parameter / active slot coefficient`. + min_slot_boundary_as_seconds: TimeDelta, + /// a characteristic of Ouroboros Praos and is equal to `3 * security parameter / active slot coefficient` + max_slot_boundary_as_seconds: TimeDelta, + /// Cardano main chain epoch configuration + mainchain_epoch_config: MainchainEpochConfig, + /// Additional offset applied when selecting the latest stable Cardano block + /// + /// This parameter should be 1 by default. + block_stability_margin: u32, + /// Number of contiguous Cardano blocks to be cached by this data source + cache_size: u16, + /// Internal block cache + stable_blocks_cache: Arc>, +} + +impl BlockDataSourceImpl { + /// Returns the latest _unstable_ Cardano block from Dolos + pub async fn get_latest_block_info(&self) -> Result { + self.client + .blocks_latest() + .await + .map_err(|e| { + DataSourceError::ExpectedDataNotFound(format!("No latest block on chain. {e}",)) + .into() + }) + .and_then(from_block_content) + } + + /// Returns the latest _stable_ Cardano block from Dolos that is within + /// acceptable bounds from `reference_timestamp`, accounting for the additional stability + /// offset configured by [block_stability_margin][Self::block_stability_margin]. + pub async fn get_latest_stable_block_for( + &self, + reference_timestamp: Timestamp, + ) -> Result> { + let reference_timestamp = BlockDataSourceImpl::timestamp_to_db_type(reference_timestamp)?; + let latest = self.get_latest_block_info().await?; + let offset = self.security_parameter + self.block_stability_margin; + let stable = latest.number.saturating_sub(offset).into(); + let block = self.get_latest_block(stable, reference_timestamp).await?; + Ok(block) + } + + /// Finds a block by its `hash` and verifies that it is stable in reference to `reference_timestamp` + /// and returns its info + pub async fn get_stable_block_for( + &self, + hash: McBlockHash, + reference_timestamp: Timestamp, + ) -> Result> { + let reference_timestamp = BlockDataSourceImpl::timestamp_to_db_type(reference_timestamp)?; + self.get_stable_block_by_hash(hash, reference_timestamp).await + } + + /// Finds a block by its `hash` and returns its info + pub async fn get_block_by_hash(&self, hash: McBlockHash) -> Result> { + let from_cache = if let Ok(cache) = self.stable_blocks_cache.lock() { + cache.find_by_hash(hash.clone()) + } else { + None + }; + let block_opt = match from_cache { + Some(block) => Some(block), + None => Some(from_block_content(self.client.blocks_by_id(hash).await?)?), + }; + Ok(block_opt) + } +} + +/// Configuration for [BlockDataSourceImpl] +#[derive(Debug, Clone, Deserialize)] +pub struct DolosBlockDataSourceConfig { + /// Additional offset applied when selecting the latest stable Cardano block + /// + /// This parameter should be 1 by default. + pub block_stability_margin: u32, +} + +impl DolosBlockDataSourceConfig { + /// Reads the config from environment + pub fn from_env() -> std::result::Result> { + let config: Self = Figment::new() + .merge(Env::raw()) + .extract() + .map_err(|e| format!("Failed to read block data source config: {e}"))?; + info!("Using block data source configuration: {config:?}"); + Ok(config) + } +} + +impl BlockDataSourceImpl { + /// Creates a new instance of [BlockDataSourceImpl], reading configuration from the environment. + pub async fn new_from_env( + client: MiniBFClient, + ) -> std::result::Result> { + Self::from_config(client, DolosBlockDataSourceConfig::from_env()?, &read_mc_epoch_config()?) + .await + } + + /// Creates a new instance of [BlockDataSourceImpl], using passed configuration. + pub async fn from_config( + client: MiniBFClient, + DolosBlockDataSourceConfig { block_stability_margin }: DolosBlockDataSourceConfig, + mc_epoch_config: &MainchainEpochConfig, + ) -> Result { + let genesis = client.genesis().await?; + let active_slots_coeff = genesis.active_slots_coefficient; + let security_parameter = genesis.security_param as u32; + let k: f64 = security_parameter.into(); + let slot_duration: f64 = mc_epoch_config.slot_duration_millis.millis() as f64; + let min_slot_boundary = (slot_duration * k / active_slots_coeff).round() as i64; + let max_slot_boundary = 3 * min_slot_boundary; + let cache_size = 100; + Ok(BlockDataSourceImpl::new( + client, + security_parameter, + TimeDelta::milliseconds(min_slot_boundary), + TimeDelta::milliseconds(max_slot_boundary), + mc_epoch_config.clone(), + block_stability_margin, + cache_size, + BlocksCache::new_arc_mutex(), + )) + } + async fn get_latest_block( + &self, + max_block: McBlockNumber, + reference_timestamp: NaiveDateTime, + ) -> Result> { + let min_time_naive = self.min_block_allowed_time(reference_timestamp); + let min_time = convert_naive_datetime(min_time_naive); + let min_slot = self.date_time_to_slot(min_time_naive)?; + let max_time_naive = self.max_allowed_block_time(reference_timestamp); + let max_time = convert_naive_datetime(max_time_naive); + let max_slot = self.date_time_to_slot(max_time_naive)?; + + let mut current_block_number = max_block; + + loop { + let block = match self.client.blocks_by_id(current_block_number).await { + Ok(b) => from_block_content(b)?, + Err(_) => return Ok(None), + }; + + let is_time_match = block.timestamp >= min_time && block.timestamp <= max_time; + let is_slot_match = block.slot >= min_slot && block.slot <= max_slot; + + if is_time_match && is_slot_match { + return Ok(Some(block)); + } + + if block.timestamp < min_time || block.slot < min_slot || block.number.0 == 0 { + return Ok(None); + } + + current_block_number = block.number.saturating_sub(1u32); + } + } + + fn min_block_allowed_time(&self, reference_timestamp: NaiveDateTime) -> NaiveDateTime { + reference_timestamp - self.max_slot_boundary_as_seconds + } + + fn max_allowed_block_time(&self, reference_timestamp: NaiveDateTime) -> NaiveDateTime { + reference_timestamp - self.min_slot_boundary_as_seconds + } + + /// Rules for block selection and verification mandates that timestamp of the block + /// falls in a given range, calculated from the reference timestamp, which is either + /// PC current time or PC block timestamp. + fn is_block_time_valid(&self, block: &MainchainBlock, timestamp: NaiveDateTime) -> bool { + convert_naive_datetime(self.min_block_allowed_time(timestamp)) <= block.timestamp + && block.timestamp <= convert_naive_datetime(self.max_allowed_block_time(timestamp)) + } + + async fn get_stable_block_by_hash( + &self, + hash: McBlockHash, + reference_timestamp: NaiveDateTime, + ) -> Result> { + if let Some(block) = + self.get_stable_block_by_hash_from_cache(hash.clone(), reference_timestamp) + { + debug!("Block by hash: {hash} found in cache."); + Ok(Some(From::from(block))) + } else { + debug!("Block by hash: {hash}, not found in cache, serving from Dolos."); + if let Some(block_by_hash) = + self.get_stable_block_by_hash_from_db(hash, reference_timestamp).await? + { + self.fill_cache(&block_by_hash).await?; + Ok(Some(MainchainBlock::from(block_by_hash))) + } else { + Ok(None) + } + } + } + + fn get_stable_block_by_hash_from_cache( + &self, + hash: McBlockHash, + reference_timestamp: NaiveDateTime, + ) -> Option { + if let Ok(cache) = self.stable_blocks_cache.lock() { + cache + .find_by_hash(hash) + .filter(|block| self.is_block_time_valid(block, reference_timestamp)) + } else { + None + } + } + + /// Returns block by given hash from the cache if it is valid in reference to given timestamp + async fn get_stable_block_by_hash_from_db( + &self, + hash: McBlockHash, + reference_timestamp: NaiveDateTime, + ) -> Result> { + let block = Some(from_block_content(self.client.blocks_by_id(hash).await?)?); + let latest_block = Some(from_block_content(self.client.blocks_latest().await?)?); + Ok(block + .zip(latest_block) + .filter(|(block, latest_block)| { + block.number.saturating_add(self.security_parameter) <= latest_block.number + && self.is_block_time_valid(block, reference_timestamp) + }) + .map(|(block, _)| block)) + } + + /// Caches stable blocks for lookup by hash. + async fn fill_cache(&self, from_block: &MainchainBlock) -> Result<()> { + let from_block_no = from_block.number; + let size = u32::from(self.cache_size); + let latest_block = from_block_content(self.client.blocks_latest().await?)?; + let stable_block_num = latest_block.number.saturating_sub(self.security_parameter); + + let to_block_no = from_block_no.saturating_add(size).min(stable_block_num); + let blocks = if from_block_no < to_block_no { + let futures = (from_block_no.0..=to_block_no.0).map(|block_no| async move { + self.client + .blocks_by_id(McBlockNumber(block_no)) + .await + .map_err(|e| e.into()) + .and_then(from_block_content) + }); + futures::future::try_join_all(futures).await?.into_iter().collect() + } else { + vec![from_block.clone()] + }; + + if let Ok(mut cache) = self.stable_blocks_cache.lock() { + cache.update(blocks); + debug!("Cached blocks {} to {} for by hash lookups.", from_block_no.0, to_block_no.0); + } + Ok(()) + } + + fn date_time_to_slot(&self, dt: NaiveDateTime) -> Result { + let millis: u64 = + dt.and_utc().timestamp_millis().try_into().map_err(|_| { + DataSourceError::BadRequest(format!("Datetime out of range: {dt:?}")) + })?; + let ts = sidechain_domain::mainchain_epoch::Timestamp::from_unix_millis(millis); + let slot = self + .mainchain_epoch_config + .timestamp_to_mainchain_slot_number(ts) + .unwrap_or(self.mainchain_epoch_config.first_slot_number); + Ok(McSlotNumber(slot)) + } + + fn timestamp_to_db_type(timestamp: Timestamp) -> Result { + let millis: Option = timestamp.as_millis().try_into().ok(); + let dt = millis + .and_then(DateTime::from_timestamp_millis) + .ok_or(DataSourceError::BadRequest(format!("Timestamp out of range: {timestamp:?}")))?; + Ok(NaiveDateTime::new(dt.date_naive(), dt.time())) + } +} + +fn convert_naive_datetime(d: NaiveDateTime) -> u64 { + d.and_utc().timestamp().try_into().expect("i64 timestamp is valid u64") +} + +/// Helper structure for caching stable blocks. +#[derive(new)] +pub(crate) struct BlocksCache { + /// Continuous main chain blocks. All blocks should be stable. Used to query by hash. + #[new(default)] + from_last_by_hash: Vec, +} + +impl BlocksCache { + fn find_by_hash(&self, hash: McBlockHash) -> Option { + self.from_last_by_hash.iter().find(|b| b.hash == hash).cloned() + } + + pub fn update(&mut self, from_last_by_hash: Vec) { + self.from_last_by_hash = from_last_by_hash; + } + + pub fn new_arc_mutex() -> Arc> { + Arc::new(Mutex::new(Self::new())) + } +} diff --git a/toolkit/data-sources/dolos/src/candidate.rs b/toolkit/data-sources/dolos/src/candidate.rs index 02e6254d02..64edf3cb50 100644 --- a/toolkit/data-sources/dolos/src/candidate.rs +++ b/toolkit/data-sources/dolos/src/candidate.rs @@ -107,12 +107,10 @@ impl AuthoritySelectionDataSource for AuthoritySelectionDataSourceImpl { let candidates = self.get_registered_candidates(epoch, committee_candidate_address).await?; let pools = self.client.pools_extended().await?; let pred = |pool: PoolListExtendedInner| async move { - let history = self.client.pools_history(&pool.pool_id).await?; + let pool_id = mckeyhash_from_bech32(&pool.pool_id)?; + let history = self.client.pools_history(pool_id).await?; Result::Ok(match history.into_iter().find(|h| h.epoch == epoch.0 as i32) { - Some(e) => Some(( - MainchainKeyHash(pool.pool_id.as_bytes().try_into()?), // TODO is pool_id a pool hash? - StakeDelegation(e.active_stake.parse::()?), - )), + Some(e) => Some((pool_id, StakeDelegation(e.active_stake.parse::()?))), None => None, }) }; @@ -139,7 +137,7 @@ impl AuthoritySelectionDataSource for AuthoritySelectionDataSourceImpl { async fn get_epoch_nonce(&self, epoch_number: McEpochNumber) -> Result> { let epoch = self.get_epoch_of_data_storage(epoch_number)?; let nonce: String = self.client.epochs_parameters(epoch).await?.nonce; - Ok(Some(EpochNonce(nonce.into()))) + Ok(Some(EpochNonce::decode_hex(&nonce)?)) } async fn data_epoch(&self, for_epoch: McEpochNumber) -> Result { @@ -147,6 +145,11 @@ impl AuthoritySelectionDataSource for AuthoritySelectionDataSourceImpl { } } +fn mckeyhash_from_bech32(bech32_str: &str) -> Result { + let (_hrp, val) = bech32::decode(bech32_str).map_err(|e| e.to_string())?; + Ok(MainchainKeyHash(val.try_into().map_err(|_| "failed to convert vec to array")?)) +} + #[derive(Debug)] struct RegisteredCandidate { stake_pool_pub_key: StakePoolPublicKey, @@ -265,9 +268,9 @@ impl AuthoritySelectionDataSourceImpl { output.clone().output_index ))?; let datum = cardano_serialization_lib::PlutusData::from_hex(&datum_str) - .map_err(|e| e.to_string())?; + .map_err(|e| format!("Failed to parse datum string: {e}"))?; let utxo_id = UtxoId { - tx_hash: output.tx_hash.as_bytes().try_into()?, + tx_hash: McTxHash::decode_hex(&output.tx_hash)?, index: UtxoIndex(output.tx_index.try_into()?), }; let register_validator_datum = RegisterValidatorDatum::try_from(datum) @@ -285,7 +288,7 @@ impl AuthoritySelectionDataSourceImpl { .into_iter() .map(|input| { Ok::>(UtxoId { - tx_hash: input.tx_hash.as_bytes().try_into()?, + tx_hash: McTxHash::decode_hex(&input.tx_hash)?, index: UtxoIndex(input.output_index.try_into()?), }) }) @@ -316,7 +319,7 @@ impl AuthoritySelectionDataSourceImpl { .filter_map(|r| match r { Ok(candidate) => Some(candidate.clone()), Err(msg) => { - log::error!("{msg}"); + log::error!("Failed to parse candidate: {msg}"); None }, }) @@ -359,13 +362,10 @@ impl AuthoritySelectionDataSourceImpl { let active_utxos = match registrations_block_for_epoch_opt { Some(registrations_block_for_epoch) => { let pred = |utxo: AddressUtxoContentInner| async move { + let block = self.client.blocks_by_id(utxo.block.clone()).await?; Ok::( - self.client - .blocks_by_id(utxo.block.clone()) - .await? - .height - .ok_or("committee candidate block height missing")? as u32 - >= registrations_block_for_epoch + block.height.ok_or("committee candidate block height missing")? as u32 + <= registrations_block_for_epoch .height .ok_or("last_block_for_epoch block height missing")? as u32, ) diff --git a/toolkit/data-sources/dolos/src/client/api.rs b/toolkit/data-sources/dolos/src/client/api.rs index 3a0f63378d..4513e3c418 100644 --- a/toolkit/data-sources/dolos/src/client/api.rs +++ b/toolkit/data-sources/dolos/src/client/api.rs @@ -4,47 +4,62 @@ use blockfrost_openapi::models::{ address_utxo_content_inner::AddressUtxoContentInner, asset_addresses_inner::AssetAddressesInner, asset_transactions_inner::AssetTransactionsInner, block_content::BlockContent, epoch_param_content::EpochParamContent, - epoch_stake_pool_content_inner::EpochStakePoolContentInner, + epoch_stake_pool_content_inner::EpochStakePoolContentInner, genesis_content::GenesisContent, pool_history_inner::PoolHistoryInner, pool_list_extended_inner::PoolListExtendedInner, tx_content::TxContent, tx_content_utxo::TxContentUtxo, }; use sidechain_domain::*; +use crate::DataSourceError; + /// Mainchain block id, either a block hash or a block number -pub enum McBlockId { - /// Domain type Mainchain block hash - McBlockHash(McBlockHash), - /// Domain type Mainchain block number - McBlockNumber(McBlockNumber), - /// Mainchain block hash returned as string by Blockfrost API - Raw(String), -} +pub struct McBlockId(String); impl From for McBlockId { fn from(value: McBlockHash) -> Self { - McBlockId::McBlockHash(value) + McBlockId(value.to_string()) } } impl From for McBlockId { fn from(value: McBlockNumber) -> Self { - McBlockId::McBlockNumber(value) + McBlockId(value.to_string()) } } impl From for McBlockId { fn from(value: String) -> Self { - McBlockId::Raw(value) + McBlockId(value) } } impl std::fmt::Display for McBlockId { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - McBlockId::McBlockHash(mc_block_hash) => mc_block_hash.fmt(f), - McBlockId::McBlockNumber(mc_block_number) => mc_block_number.fmt(f), - McBlockId::Raw(str) => str.fmt(f), - } + self.0.fmt(f) + } +} + +/// Mainchain pool id as a bech32 string +pub struct McPoolId(String); + +impl From for McPoolId { + fn from(value: MainchainKeyHash) -> Self { + let pool_id = + bech32::encode::(bech32::Hrp::parse_unchecked("pool"), &value.0) + .expect("MainchainKeyHash is valid"); + McPoolId(pool_id) + } +} + +impl From for McPoolId { + fn from(value: String) -> Self { + McPoolId(value) + } +} + +impl std::fmt::Display for McPoolId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) } } @@ -55,60 +70,82 @@ pub trait MiniBFApi { async fn addresses_utxos( &self, address: MainchainAddress, - ) -> Result, String>; + ) -> Result, DataSourceError>; /// Transactions on the address. async fn addresses_transactions( &self, address: MainchainAddress, - ) -> Result, String>; + ) -> Result, DataSourceError>; - /// List of a specific asset transactions. + /// List of specific asset transactions. async fn assets_transactions( &self, asset_id: AssetId, - ) -> Result, String>; - /// List of a addresses containing a specific asset. - async fn assets_addresses(&self, asset_id: AssetId) - -> Result, String>; + ) -> Result, DataSourceError>; + /// List of addresses containing a specific asset. + async fn assets_addresses( + &self, + asset_id: AssetId, + ) -> Result, DataSourceError>; /// Return the latest block available to the backends, also known as the tip of the blockchain. - async fn blocks_latest(&self) -> Result; + async fn blocks_latest(&self) -> Result; /// Return the content of a requested block. - async fn blocks_by_id(&self, id: impl Into + Send) -> Result; + async fn blocks_by_id( + &self, + id: impl Into + Send, + ) -> Result; /// Return the content of a requested block for a specific slot. - async fn blocks_slot(&self, slot_number: McSlotNumber) -> Result; + async fn blocks_slot(&self, slot_number: McSlotNumber) + -> Result; /// Return the list of blocks following a specific block. async fn blocks_next( &self, hash: impl Into + Send, - ) -> Result, String>; + ) -> Result, DataSourceError>; /// Return the transactions within the block. - async fn blocks_txs(&self, id: impl Into + Send) -> Result, String>; + async fn blocks_txs( + &self, + id: impl Into + Send, + ) -> Result, DataSourceError>; /// Return the blocks minted for the epoch specified. - async fn epochs_blocks(&self, epoch_number: McEpochNumber) -> Result, String>; + async fn epochs_blocks( + &self, + epoch_number: McEpochNumber, + ) -> Result, DataSourceError>; /// Return the protocol parameters for the epoch specified. async fn epochs_parameters( &self, epoch_number: McEpochNumber, - ) -> Result; + ) -> Result; /// Return the active stake distribution for the epoch specified by stake pool. async fn epochs_stakes_by_pool( &self, epoch_number: McEpochNumber, - pool_id: &str, - ) -> Result, String>; + pool_id: impl Into + Send, + ) -> Result, DataSourceError>; /// History of stake pool parameters over epochs. - async fn pools_history(&self, pool_id: &str) -> Result, String>; + async fn pools_history( + &self, + pool_id: impl Into + Send, + ) -> Result, DataSourceError>; /// List of registered stake pools with additional information. - async fn pools_extended(&self) -> Result, String>; + async fn pools_extended(&self) -> Result, DataSourceError>; /// Query JSON value of a datum by its hash. - async fn scripts_datum_hash(&self, datum_hash: &str) -> Result, String>; + async fn scripts_datum_hash( + &self, + datum_hash: &str, + ) -> Result, DataSourceError>; /// Return content of the requested transaction. - async fn transaction_by_hash(&self, tx_hash: McTxHash) -> Result; + async fn transaction_by_hash(&self, tx_hash: McTxHash) -> Result; /// Return the inputs and UTXOs of the specific transaction. - async fn transactions_utxos(&self, tx_hash: McTxHash) -> Result; + async fn transactions_utxos(&self, tx_hash: McTxHash) + -> Result; + + /// Return the information about blockchain genesis. + async fn genesis(&self) -> Result; } diff --git a/toolkit/data-sources/dolos/src/client/conversions.rs b/toolkit/data-sources/dolos/src/client/conversions.rs index 18cc00106d..a31a21154e 100644 --- a/toolkit/data-sources/dolos/src/client/conversions.rs +++ b/toolkit/data-sources/dolos/src/client/conversions.rs @@ -1,21 +1,22 @@ +use crate::{DataSourceError, Result}; use blockfrost_openapi::models::block_content::BlockContent; use sidechain_domain::*; -pub fn from_block_content(value: BlockContent) -> Result { +pub fn from_block_content(value: BlockContent) -> Result { Ok(MainchainBlock { number: value .height .map(|n| sidechain_domain::McBlockNumber(n as u32)) - .ok_or("number missing")?, + .ok_or(DataSourceError::InvalidData("number missing".to_string()))?, hash: McBlockHash::decode_hex(&value.hash)?, epoch: value .epoch .map(|n| sidechain_domain::McEpochNumber(n as u32)) - .ok_or("epoch missing")?, + .ok_or(DataSourceError::InvalidData("epoch missing".to_string()))?, slot: value .slot .map(|n| sidechain_domain::McSlotNumber(n as u64)) - .ok_or("slot missing")?, + .ok_or(DataSourceError::InvalidData("slot missing".to_string()))?, timestamp: value.time as u64, }) } diff --git a/toolkit/data-sources/dolos/src/client/minibf.rs b/toolkit/data-sources/dolos/src/client/minibf.rs index 8979c8acca..910561fde4 100644 --- a/toolkit/data-sources/dolos/src/client/minibf.rs +++ b/toolkit/data-sources/dolos/src/client/minibf.rs @@ -4,7 +4,7 @@ use blockfrost_openapi::models::{ address_utxo_content_inner::AddressUtxoContentInner, asset_addresses_inner::AssetAddressesInner, asset_transactions_inner::AssetTransactionsInner, block_content::BlockContent, epoch_param_content::EpochParamContent, - epoch_stake_pool_content_inner::EpochStakePoolContentInner, + epoch_stake_pool_content_inner::EpochStakePoolContentInner, genesis_content::GenesisContent, pool_history_inner::PoolHistoryInner, pool_list_extended_inner::PoolListExtendedInner, tx_content::TxContent, tx_content_utxo::TxContentUtxo, }; @@ -13,7 +13,10 @@ use sidechain_domain::*; use std::time::Duration; use ureq::Agent; -use crate::client::api::{McBlockId, MiniBFApi}; +use crate::{ + DataSourceError, + client::api::{McBlockId, McPoolId, MiniBFApi}, +}; /// Client implementing Dolos MiniBF #[derive(Clone)] @@ -31,15 +34,19 @@ impl MiniBFClient { async fn request( &self, method: &str, - ) -> Result { + ) -> Result { let req = format!("{}/{}", self.addr, method); log::trace!("Dolos request: {req:?}"); let resp = self .agent .get(req) .call() - .map_err(|e| e.to_string()) - .and_then(|mut r| r.body_mut().read_json().map_err(|e| e.to_string())); + .map_err(|e| DataSourceError::DolosCallError(e.to_string())) + .and_then(|mut r| { + r.body_mut() + .read_json() + .map_err(|e| DataSourceError::DolosResponseParseError(e.to_string())) + }); log::trace!("Dolos response: {resp:?}"); resp } @@ -48,7 +55,7 @@ impl MiniBFClient { &self, method: &str, pagination: Pagination, - ) -> Result, String> { + ) -> Result, DataSourceError> { let mut query_pairs = url::form_urlencoded::Serializer::new(String::new()); query_pairs.extend_pairs([ ("count", &pagination.count.to_string()), @@ -62,15 +69,19 @@ impl MiniBFClient { query_pairs.append_pair("to", &to); } let mut req_url = - url::Url::parse(&format!("{}/{}", self.addr, method)).map_err(|e| e.to_string())?; + url::Url::parse(&format!("{}/{}", self.addr, method)).expect("valid Dolos url"); req_url.set_query(Some(&query_pairs.finish())); log::trace!("Dolos request: {req_url:?}"); let resp = self .agent .get(req_url.as_str()) .call() - .map_err(|e| e.to_string()) - .and_then(|mut r| r.body_mut().read_json().map_err(|e| e.to_string())); + .map_err(|e| DataSourceError::DolosCallError(e.to_string())) + .and_then(|mut r| { + r.body_mut() + .read_json() + .map_err(|e| DataSourceError::DolosResponseParseError(e.to_string())) + }); log::trace!("Dolos response: {resp:?}"); resp } @@ -78,7 +89,7 @@ impl MiniBFClient { async fn paginated_request_all( &self, method: &str, - ) -> Result, String> { + ) -> Result, DataSourceError> { let mut pagination: Pagination = Pagination::default(); let mut have_all_pages = false; let mut res = Vec::new(); @@ -99,21 +110,21 @@ impl MiniBFApi for MiniBFClient { async fn addresses_utxos( &self, address: MainchainAddress, - ) -> Result, String> { + ) -> Result, DataSourceError> { self.paginated_request_all(&format!("addresses/{address}/utxos")).await } async fn addresses_transactions( &self, address: MainchainAddress, - ) -> Result, String> { + ) -> Result, DataSourceError> { self.paginated_request_all(&format!("addresses/{address}/transactions")).await } async fn assets_transactions( &self, asset_id: AssetId, - ) -> Result, String> { + ) -> Result, DataSourceError> { let asset_id_str = format_asset_id(&asset_id); self.paginated_request_all(&format!("assets/{asset_id_str}/transactions")).await } @@ -121,76 +132,103 @@ impl MiniBFApi for MiniBFClient { async fn assets_addresses( &self, asset_id: AssetId, - ) -> Result, String> { + ) -> Result, DataSourceError> { let asset_id_str = format_asset_id(&asset_id); self.paginated_request_all(&format!("assets/{asset_id_str}/addresses")).await } - async fn blocks_latest(&self) -> Result { + async fn blocks_latest(&self) -> Result { self.request("blocks/latest").await } - async fn blocks_by_id(&self, id: impl Into + Send) -> Result { + async fn blocks_by_id( + &self, + id: impl Into + Send, + ) -> Result { let id: McBlockId = id.into(); self.request(&format!("blocks/{id}")).await } - async fn blocks_slot(&self, slot_number: McSlotNumber) -> Result { + async fn blocks_slot( + &self, + slot_number: McSlotNumber, + ) -> Result { self.request(&format!("blocks/slot/{slot_number}")).await } async fn blocks_next( &self, id: impl Into + Send, - ) -> Result, String> { + ) -> Result, DataSourceError> { let id: McBlockId = id.into(); self.request(&format!("blocks/{id}/next")).await } - async fn blocks_txs(&self, id: impl Into + Send) -> Result, String> { + async fn blocks_txs( + &self, + id: impl Into + Send, + ) -> Result, DataSourceError> { let id: McBlockId = id.into(); self.request(&format!("blocks/{id}/txs")).await } - async fn epochs_blocks(&self, epoch_number: McEpochNumber) -> Result, String> { + async fn epochs_blocks( + &self, + epoch_number: McEpochNumber, + ) -> Result, DataSourceError> { self.paginated_request_all(&format!("epochs/{epoch_number}/blocks")).await } async fn epochs_parameters( &self, epoch_number: McEpochNumber, - ) -> Result { + ) -> Result { self.request(&format!("epochs/{epoch_number}/parameters")).await } async fn epochs_stakes_by_pool( &self, epoch_number: McEpochNumber, - pool_id: &str, - ) -> Result, String> { + pool_id: impl Into + Send, + ) -> Result, DataSourceError> { + let pool_id: McPoolId = pool_id.into(); self.paginated_request_all(&format!("epochs/{epoch_number}/stakes/{pool_id}")) .await } - async fn pools_history(&self, pool_id: &str) -> Result, String> { + async fn pools_history( + &self, + pool_id: impl Into + Send, + ) -> Result, DataSourceError> { + let pool_id: McPoolId = pool_id.into(); self.paginated_request_all(&format!("pools/{pool_id}/history")).await } - async fn pools_extended(&self) -> Result, String> { + async fn pools_extended(&self) -> Result, DataSourceError> { self.paginated_request_all("pools/extended").await } - async fn scripts_datum_hash(&self, datum_hash: &str) -> Result, String> { + async fn scripts_datum_hash( + &self, + datum_hash: &str, + ) -> Result, DataSourceError> { self.request(&format!("scripts/datum/{datum_hash}")).await } - async fn transaction_by_hash(&self, tx_hash: McTxHash) -> Result { + async fn transaction_by_hash(&self, tx_hash: McTxHash) -> Result { self.request(&format!("txs/{tx_hash}")).await } - async fn transactions_utxos(&self, tx_hash: McTxHash) -> Result { + async fn transactions_utxos( + &self, + tx_hash: McTxHash, + ) -> Result { self.request(&format!("txs/{tx_hash}/utxos")).await } + + async fn genesis(&self) -> Result { + self.request("genesis").await + } } -fn format_asset_id(asset_id: &AssetId) -> String { +pub fn format_asset_id(asset_id: &AssetId) -> String { let AssetId { policy_id, asset_name } = asset_id; format!("{}{}", &policy_id.to_hex_string()[2..], &asset_name.to_hex_string()[2..]) } diff --git a/toolkit/data-sources/dolos/src/lib.rs b/toolkit/data-sources/dolos/src/lib.rs index 25598ea8b6..f24211f6e8 100644 --- a/toolkit/data-sources/dolos/src/lib.rs +++ b/toolkit/data-sources/dolos/src/lib.rs @@ -28,6 +28,10 @@ mod bridge; #[cfg(feature = "bridge")] pub use bridge::TokenBridgeDataSourceImpl; +mod block; +pub use block::BlockDataSourceImpl; +use sidechain_domain::mainchain_epoch::MainchainEpochConfig; + use crate::client::MiniBFClient; pub mod client; @@ -41,9 +45,12 @@ pub enum DataSourceError { /// Indicates that Dolos rejected a request as invalid #[error("Bad request: `{0}`.")] BadRequest(String), - /// Indicates that an internal error occured when querying Dolos - #[error("Internal error of data source: `{0}`.")] - InternalDataSourceError(String), + /// Indicates that Dolos client produced an error while calling endpoint + #[error("Dolos client call error: `{0}`.")] + DolosCallError(String), + /// Indicates that Dolos client produced an error while parsing response + #[error("Dolos client response parse error: `{0}`.")] + DolosResponseParseError(String), /// Indicates that expected data was not found when querying Dolos #[error( "'{0}' not found. Possible causes: data source configuration error, Dolos not synced fully, or data not set on the main chain." @@ -61,6 +68,7 @@ pub enum DataSourceError { /// # Environment variables read: /// - `DOLOS_MINIBF_URL`: Dolos MiniBF client, eg. `localhost:3000` pub fn get_connection_from_env() -> Result { + log::warn!("Dolos data sources are still WIP and should not be used in production"); let config = ConnectionConfig::from_env()?; Ok(MiniBFClient::new(config.dolos_minibf_url.as_str(), std::time::Duration::from_secs(30))) } @@ -73,12 +81,21 @@ pub struct ConnectionConfig { } impl ConnectionConfig { - /// Reads Postgres connection config from the environment + /// Reads Dolos connection config from the environment pub fn from_env() -> Result { let config: Self = figment::Figment::new() .merge(figment::providers::Env::raw()) .extract() - .map_err(|e| format!("Failed to read postgres data source connection: {e}"))?; + .map_err(|e| format!("Failed to read Dolos data source connection: {e}"))?; Ok(config) } } + +/// Reads Cardano main chain epoch configuration from the environment. +/// +/// See documentation of [MainchainEpochConfig::read_from_env] for the list of environment variables read. +#[cfg(feature = "block-source")] +pub fn read_mc_epoch_config() -> Result { + Ok(MainchainEpochConfig::read_from_env() + .map_err(|e| format!("Failed to read main chain config: {}", e))?) +} diff --git a/toolkit/data-sources/dolos/src/mc_hash.rs b/toolkit/data-sources/dolos/src/mc_hash.rs index 55bf1b01d7..9df431ddf7 100644 --- a/toolkit/data-sources/dolos/src/mc_hash.rs +++ b/toolkit/data-sources/dolos/src/mc_hash.rs @@ -1,17 +1,18 @@ -use crate::{ - Result, - client::{MiniBFClient, api::MiniBFApi, conversions::from_block_content}, -}; +use crate::Result; +use crate::block::BlockDataSourceImpl; use async_trait::async_trait; use sidechain_domain::*; +use sp_timestamp::Timestamp; +use std::sync::Arc; pub struct McHashDataSourceImpl { - client: MiniBFClient, + /// [BlockDataSourceImpl] instance shared with other data sources for cache reuse. + inner: Arc, } impl McHashDataSourceImpl { - pub fn new(client: MiniBFClient) -> Self { - Self { client } + pub fn new(inner: Arc) -> Self { + Self { inner } } } @@ -19,20 +20,26 @@ impl McHashDataSourceImpl { impl sidechain_mc_hash::McHashDataSource for McHashDataSourceImpl { async fn get_latest_stable_block_for( &self, - _reference_timestamp: sp_timestamp::Timestamp, + reference_timestamp: sp_timestamp::Timestamp, ) -> Result> { - Ok(Some(from_block_content(self.client.blocks_latest().await?)?)) + Ok(self + .inner + .get_latest_stable_block_for(Timestamp::new(reference_timestamp.as_millis())) + .await?) } async fn get_stable_block_for( &self, hash: McBlockHash, - _reference_timestamp: sp_timestamp::Timestamp, + reference_timestamp: sp_timestamp::Timestamp, ) -> Result> { - Ok(Some(from_block_content(self.client.blocks_by_id(hash).await?)?)) + Ok(self + .inner + .get_stable_block_for(hash, Timestamp::new(reference_timestamp.as_millis())) + .await?) } async fn get_block_by_hash(&self, hash: McBlockHash) -> Result> { - Ok(Some(from_block_content(self.client.blocks_by_id(hash).await?)?)) + Ok(self.inner.get_block_by_hash(hash).await?) } } diff --git a/toolkit/sidechain/domain/src/lib.rs b/toolkit/sidechain/domain/src/lib.rs index 952f7fee5f..838bbe9a13 100644 --- a/toolkit/sidechain/domain/src/lib.rs +++ b/toolkit/sidechain/domain/src/lib.rs @@ -774,7 +774,7 @@ const EPOCH_NONCE_LEN: usize = 32; /// Because it is subject to Cardano's consensus mechanism and has strong cryptographic guarantees, /// this value can be used as a tamper-proof shared randomness seed by Partner Chain Toolkit components. #[derive(Default, Clone, Encode, Decode, DecodeWithMemTracking, PartialEq, Eq, TypeInfo)] -#[byte_string(debug, hex_serialize)] +#[byte_string(debug, hex_serialize, decode_hex)] pub struct EpochNonce(pub Vec); impl EpochNonce {