From 0f28a5a768e0b144c44d2fd23e2916934fe4aab0 Mon Sep 17 00:00:00 2001 From: Krisztian Pinter Date: Wed, 5 Nov 2025 16:58:29 +0100 Subject: [PATCH 1/3] stake distribution dolos data source --- .../dolos/src/stake_distribution.rs | 72 +++++++++++++++++-- 1 file changed, 65 insertions(+), 7 deletions(-) diff --git a/toolkit/data-sources/dolos/src/stake_distribution.rs b/toolkit/data-sources/dolos/src/stake_distribution.rs index d3003e3075..c507124345 100644 --- a/toolkit/data-sources/dolos/src/stake_distribution.rs +++ b/toolkit/data-sources/dolos/src/stake_distribution.rs @@ -1,11 +1,19 @@ +use crate::{ + Result, + client::{MiniBFClient, api::MiniBFApi}, +}; +use blockfrost_openapi::models::epoch_stake_pool_content_inner::EpochStakePoolContentInner; +use futures::StreamExt; use sidechain_domain::*; use sp_block_participation::inherent_data::BlockParticipationDataSource; -pub struct StakeDistributionDataSourceImpl; +pub struct StakeDistributionDataSourceImpl { + client: MiniBFClient, +} impl StakeDistributionDataSourceImpl { - pub fn new() -> Self { - Self {} + pub fn new(client: MiniBFClient) -> Self { + Self { client } } } @@ -13,9 +21,59 @@ impl StakeDistributionDataSourceImpl { impl BlockParticipationDataSource for StakeDistributionDataSourceImpl { async fn get_stake_pool_delegation_distribution_for_pools( &self, - _epoch: McEpochNumber, - _pool_hashes: &[MainchainKeyHash], - ) -> Result> { - Err("not implemented".into()) + epoch_number: McEpochNumber, + pool_hashes: &[MainchainKeyHash], + ) -> Result { + let pool_futures = futures::stream::iter(pool_hashes) + .map(|pool_id| async { + self.client + .epochs_stakes_by_pool(epoch_number, *pool_id) + .await + .map(|ss| ss.iter().map(|s| (pool_id.clone(), s.clone())).collect::>()) + }) + .collect::>() + .await; + let pools = futures::future::try_join_all(pool_futures) + .await? + .into_iter() + .flatten() + .collect::>(); + Ok(rows_to_distribution(pools)) + } +} + +fn rows_to_distribution( + rows: Vec<(sidechain_domain::MainchainKeyHash, EpochStakePoolContentInner)>, +) -> StakeDistribution { + let mut res = BTreeMap::::new(); + for (pool_id, stake) in rows { + match get_delegator_key(&stake) { + Ok(delegator_key) => { + let pool = res.entry(pool_id).or_default(); + let stake_amount = stake.amount.parse().expect("valid stake amount"); + pool.delegators + .entry(delegator_key) + .or_insert(DelegatorStakeAmount(stake_amount)); + pool.total_stake.0 += stake_amount; + }, + Err(e) => { + log::warn!("Failed to parse EpochStakePoolContentInner: {}", e) + }, + } + } + StakeDistribution(res) +} + +fn get_delegator_key(row: &EpochStakePoolContentInner) -> Result { + let (_, stake_address_hash_raw) = bech32::decode(&row.stake_address)?; + match &stake_address_hash_raw[..] { + [0xe0 | 0xe1, rest @ ..] => Ok(DelegatorKey::StakeKeyHash( + rest.try_into().expect("infallible: stake_address_hash_raw is 29 bytes"), + )), + [0xf0 | 0xf1, rest @ ..] => Ok(DelegatorKey::ScriptKeyHash { + hash_raw: rest.try_into().expect("infallible: stake_address_hash_raw is 29 bytes"), + script_hash: [0; 28], // TODO how to get this? + }), + _ => Err(format!("invalid stake address hash: {}", row.stake_address).into()), } } From b94fdbd830a85865a182f34f394750a9ad54f5d1 Mon Sep 17 00:00:00 2001 From: ladamesny Date: Fri, 21 Nov 2025 04:13:35 -0500 Subject: [PATCH 2/3] ETCM-12351: Implement governed-map data source --- demo/node/src/data_sources.rs | 10 +- .../data-sources/dolos/src/governed_map.rs | 125 ++++++++++++++++-- toolkit/data-sources/dolos/src/lib.rs | 4 + 3 files changed, 118 insertions(+), 21 deletions(-) diff --git a/demo/node/src/data_sources.rs b/demo/node/src/data_sources.rs index 14a1649660..5b077780f3 100644 --- a/demo/node/src/data_sources.rs +++ b/demo/node/src/data_sources.rs @@ -137,13 +137,9 @@ pub async fn create_dolos_data_sources( ), ), governed_map: Arc::new( - partner_chains_db_sync_data_sources::GovernedMapDataSourceCachedImpl::new( - pool.clone(), - metrics_opt.clone(), - GOVERNED_MAP_CACHE_SIZE, - block_dbsync.clone(), - ) - .await?, + partner_chains_dolos_data_sources::GovernedMapDataSourceImpl::new( + dolos_client.clone(), + ), ), bridge: Arc::new( partner_chains_db_sync_data_sources::CachedTokenBridgeDataSourceImpl::new( diff --git a/toolkit/data-sources/dolos/src/governed_map.rs b/toolkit/data-sources/dolos/src/governed_map.rs index 86a81ec4ba..f5e21972a9 100644 --- a/toolkit/data-sources/dolos/src/governed_map.rs +++ b/toolkit/data-sources/dolos/src/governed_map.rs @@ -1,28 +1,125 @@ -use crate::Result; +use crate::{client::{MiniBFClient, api::MiniBFApi}, Result, DataSourceError}; use async_trait::async_trait; +use cardano_serialization_lib::PlutusData; +use partner_chains_plutus_data::governed_map::GovernedMapDatum; use sidechain_domain::byte_string::ByteString; use sidechain_domain::*; use sp_governed_map::{GovernedMapDataSource, MainChainScriptsV1}; +use std::collections::BTreeMap; -#[derive(Debug, Default)] -pub struct GovernedMapDataSourceImpl {} +pub struct GovernedMapDataSourceImpl { + client: MiniBFClient, +} + +impl GovernedMapDataSourceImpl { + pub fn new(client: MiniBFClient) -> Self { + Self { client } + } +} #[async_trait] impl GovernedMapDataSource for GovernedMapDataSourceImpl { - async fn get_mapping_changes( + async fn get_state_at_block( &self, - _since_mc_block: Option, - _up_to_mc_block: McBlockHash, - _scripts: MainChainScriptsV1, - ) -> Result)>> { - Err("not implemented".into()) + mc_block: McBlockHash, + main_chain_scripts: MainChainScriptsV1, + ) -> Result> { + // Get the block to ensure it exists and get its number + let block = self.client.blocks_by_id(mc_block.clone()).await?; + let block_number = McBlockNumber(block.height.unwrap_or_default().try_into().unwrap_or(0u32)); + + // Get all UTXOs at the governed map validator address + let utxos = self.client.addresses_utxos(main_chain_scripts.validator_address.clone()).await?; + + // Filter UTXOs that: + // 1. Contain the governed map asset + // 2. Were created before or at the target block + let asset_unit = format_asset_unit(&main_chain_scripts.asset_policy_id); + let mut mappings = BTreeMap::new(); + + for utxo in utxos { + // Check if this UTXO was created before or at target block + let tx_hash = McTxHash::from_hex_unsafe(&utxo.tx_hash); + let tx = self.client.transaction_by_hash(tx_hash).await?; + let utxo_block_height = tx.block_height as u32; + + if utxo_block_height > block_number.0 { + continue; + } + + // Check if UTXO contains the governed map asset + let has_asset = utxo.amount.iter().any(|a| a.unit == asset_unit); + if !has_asset { + continue; + } + + // Parse the datum + if let Some(datum_hex) = &utxo.inline_datum { + match PlutusData::from_hex(datum_hex) { + Ok(plutus_data) => { + match GovernedMapDatum::try_from(plutus_data) { + Ok(GovernedMapDatum { key, value }) => { + mappings.insert(key, value); + }, + Err(err) => { + log::warn!("Failed to parse GovernedMapDatum: {}", err); + }, + } + }, + Err(err) => { + log::warn!("Failed to parse PlutusData from hex: {}", err); + }, + } + } + } + + Ok(mappings) } - async fn get_state_at_block( + async fn get_mapping_changes( &self, - _mc_block: McBlockHash, - _main_chain_scripts: MainChainScriptsV1, - ) -> Result> { - Err("not implemented".into()) + since_mc_block: Option, + up_to_mc_block: McBlockHash, + scripts: MainChainScriptsV1, + ) -> Result)>> { + // Get current state at up_to_mc_block + let current_mappings = self.get_state_at_block(up_to_mc_block, scripts.clone()).await?; + + // If no since_mc_block, return all current mappings as additions + let Some(since_mc_block) = since_mc_block else { + let changes = current_mappings + .into_iter() + .map(|(key, value)| (key, Some(value))) + .collect(); + return Ok(changes); + }; + + // Get previous state at since_mc_block + let previous_mappings = self.get_state_at_block(since_mc_block, scripts).await?; + + // Calculate changes + let mut changes = Vec::new(); + + // Find additions and modifications + for (key, value) in current_mappings.iter() { + if previous_mappings.get(key) != Some(value) { + changes.push((key.clone(), Some(value.clone()))); + } + } + + // Find deletions + for key in previous_mappings.keys() { + if !current_mappings.contains_key(key) { + changes.push((key.clone(), None)); + } + } + + Ok(changes) } } + +fn format_asset_unit(policy_id: &PolicyId) -> String { + // Asset unit format in blockfrost is policy_id + asset_name (hex) + // For empty asset names, it's just the policy_id without "0x" prefix + policy_id.to_hex_string()[2..].to_string() +} diff --git a/toolkit/data-sources/dolos/src/lib.rs b/toolkit/data-sources/dolos/src/lib.rs index f24211f6e8..608acd4237 100644 --- a/toolkit/data-sources/dolos/src/lib.rs +++ b/toolkit/data-sources/dolos/src/lib.rs @@ -28,8 +28,12 @@ mod bridge; #[cfg(feature = "bridge")] pub use bridge::TokenBridgeDataSourceImpl; +#[cfg(feature = "block-source")] mod block; +#[cfg(feature = "block-source")] pub use block::BlockDataSourceImpl; + +#[cfg(feature = "block-source")] use sidechain_domain::mainchain_epoch::MainchainEpochConfig; use crate::client::MiniBFClient; From 9754ae676efad31d6f63ae690e7634f1c6f337ed Mon Sep 17 00:00:00 2001 From: ladamesny Date: Fri, 21 Nov 2025 11:07:31 -0500 Subject: [PATCH 3/3] ETCM-12351: clean up for stake distribution --- toolkit/data-sources/dolos/src/governed_map.rs | 2 +- toolkit/data-sources/dolos/src/stake_distribution.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/toolkit/data-sources/dolos/src/governed_map.rs b/toolkit/data-sources/dolos/src/governed_map.rs index f5e21972a9..9e51b33390 100644 --- a/toolkit/data-sources/dolos/src/governed_map.rs +++ b/toolkit/data-sources/dolos/src/governed_map.rs @@ -1,4 +1,4 @@ -use crate::{client::{MiniBFClient, api::MiniBFApi}, Result, DataSourceError}; +use crate::{client::{MiniBFClient, api::MiniBFApi}, Result}; use async_trait::async_trait; use cardano_serialization_lib::PlutusData; use partner_chains_plutus_data::governed_map::GovernedMapDatum; diff --git a/toolkit/data-sources/dolos/src/stake_distribution.rs b/toolkit/data-sources/dolos/src/stake_distribution.rs index c507124345..93ff7ffb46 100644 --- a/toolkit/data-sources/dolos/src/stake_distribution.rs +++ b/toolkit/data-sources/dolos/src/stake_distribution.rs @@ -29,7 +29,7 @@ impl BlockParticipationDataSource for StakeDistributionDataSourceImpl { self.client .epochs_stakes_by_pool(epoch_number, *pool_id) .await - .map(|ss| ss.iter().map(|s| (pool_id.clone(), s.clone())).collect::>()) + .map(|ss| ss.iter().map(|s| (*pool_id, s.clone())).collect::>()) }) .collect::>() .await;