diff --git a/catalyst-gateway/bin/src/db/types/slot.rs b/catalyst-gateway/bin/src/db/types/slot.rs index 17e35945e283..b6f01d068b56 100644 --- a/catalyst-gateway/bin/src/db/types/slot.rs +++ b/catalyst-gateway/bin/src/db/types/slot.rs @@ -7,6 +7,8 @@ use scylla::_macro_internal::{ SerializeValue, TypeCheckError, WrittenCellProof, }; +use crate::service::common::types::cardano::slot_no::SlotNo; + /// A `Slot` wrapper that can be stored to and load from a database.\ #[allow(clippy::module_name_repetitions)] #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] @@ -42,6 +44,12 @@ impl From for BigInt { } } +impl From for DbSlot { + fn from(value: SlotNo) -> Self { + Self(value.into()) + } +} + impl SerializeValue for DbSlot { fn serialize<'b>( &self, typ: &ColumnType, writer: CellWriter<'b>, diff --git a/catalyst-gateway/bin/src/service/api/cardano/staking/assets_get.rs b/catalyst-gateway/bin/src/service/api/cardano/staking/assets_get.rs index fd07f5a4f94e..d0389c263b6c 100644 --- a/catalyst-gateway/bin/src/service/api/cardano/staking/assets_get.rs +++ b/catalyst-gateway/bin/src/service/api/cardano/staking/assets_get.rs @@ -1,10 +1,14 @@ //! Implementation of the GET `../assets` endpoint -use std::collections::HashMap; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; use cardano_blockchain_types::{Slot, StakeAddress, TransactionId, TxnIndex}; -use futures::StreamExt; +use futures::TryStreamExt; use poem_openapi::{payload::Json, ApiResponse}; +use tracing::debug; use crate::{ db::index::{ @@ -23,7 +27,7 @@ use crate::{ service::common::{ objects::cardano::{ network::Network, - stake_info::{FullStakeInfo, StakeInfo, StakedNativeTokenInfo}, + stake_info::{FullStakeInfo, StakeInfo, StakedTxoAssetInfo}, }, responses::WithErrorResponses, types::{ @@ -33,11 +37,9 @@ use crate::{ headers::retry_after::RetryAfterOption, }, }, + settings::Settings, }; -/// A `TxoInfo` by transaction ID map. -type TxosByTxn = HashMap>; - /// Endpoint responses. #[derive(ApiResponse)] pub(crate) enum Responses { @@ -57,10 +59,16 @@ pub(crate) enum Responses { pub(crate) type AllResponses = WithErrorResponses; /// # GET `/staked_ada` -#[allow(clippy::unused_async, clippy::no_effect_underscore_binding)] pub(crate) async fn endpoint( - stake_address: Cip19StakeAddress, _provided_network: Option, slot_num: Option, + stake_address: Cip19StakeAddress, provided_network: Option, slot_num: Option, ) -> AllResponses { + if let Some(provided_network) = provided_network { + if cardano_blockchain_types::Network::from(provided_network) != Settings::cardano_network() + { + return Responses::NotFound.into(); + } + } + let Some(persistent_session) = CassandraSession::get(true) else { tracing::error!("Failed to acquire persistent db session"); return AllResponses::service_unavailable( @@ -77,8 +85,8 @@ pub(crate) async fn endpoint( }; let (persistent_res, volatile_res) = futures::join!( - calculate_stake_info(&persistent_session, stake_address.clone(), slot_num), - calculate_stake_info(&volatile_session, stake_address, slot_num) + calculate_stake_info(persistent_session, stake_address.clone(), slot_num), + calculate_stake_info(volatile_session, stake_address, slot_num) ); let persistent_stake_info = match persistent_res { Ok(stake_info) => stake_info, @@ -114,8 +122,6 @@ struct TxoAssetInfo { struct TxoInfo { /// TXO value. value: num_bigint::BigInt, - /// TXO transaction hash. - txn_hash: TransactionId, /// TXO transaction index within the slot. txn_index: TxnIndex, /// TXO index. @@ -124,8 +130,6 @@ struct TxoInfo { slot_no: Slot, /// Whether the TXO was spent. spent_slot_no: Option, - /// TXO assets. - assets: HashMap, Vec>, } /// Calculate the stake info for a given stake address. @@ -133,189 +137,181 @@ struct TxoInfo { /// This function also updates the spent column if it detects that a TXO was spent /// between lookups. async fn calculate_stake_info( - session: &CassandraSession, stake_address: Cip19StakeAddress, slot_num: Option, + session: Arc, stake_address: Cip19StakeAddress, slot_num: Option, ) -> anyhow::Result> { let address: StakeAddress = stake_address.try_into()?; - let mut txos_by_txn = get_txo_by_txn(session, &address, slot_num).await?; - if txos_by_txn.is_empty() { + let adjusted_slot_num = slot_num.unwrap_or(SlotNo::MAXIMUM); + + let (mut txos, txo_assets) = futures::try_join!( + get_txo(&session, &address, adjusted_slot_num), + get_txo_assets(&session, &address, adjusted_slot_num) + )?; + if txos.is_empty() { return Ok(None); } - check_and_set_spent(session, &mut txos_by_txn).await?; - // TODO: This could be executed in the background, it does not actually matter if it - // succeeds. This is just an optimization step to reduce the need to query spent - // TXO's. - update_spent(session, &address, &txos_by_txn).await?; + let params = update_spent(&session, &address, &mut txos).await?; - let stake_info = build_stake_info(txos_by_txn)?; + // Sets TXOs as spent in the database in the background. + tokio::spawn(async move { + if let Err(err) = UpdateTxoSpentQuery::execute(&session, params).await { + tracing::error!("Failed to update TXO spent info, err: {err}"); + } + }); + + let stake_info = build_stake_info(txos, txo_assets, adjusted_slot_num)?; Ok(Some(stake_info)) } -/// Returns a map of TXO infos by transaction hash for the given stake address. -async fn get_txo_by_txn( - session: &CassandraSession, stake_address: &StakeAddress, slot_num: Option, -) -> anyhow::Result { - let adjusted_slot_num: u64 = slot_num.map_or(u64::MAX, Into::into); +/// `TxoInfo` map type alias +type TxoMap = HashMap<(TransactionId, i16), TxoInfo>; - let mut txo_map = HashMap::new(); - let mut txos_iter = GetTxoByStakeAddressQuery::execute( +/// Returns a map of TXO infos for the given stake address. +async fn get_txo( + session: &CassandraSession, stake_address: &StakeAddress, slot_num: SlotNo, +) -> anyhow::Result { + let txos_stream = GetTxoByStakeAddressQuery::execute( session, - GetTxoByStakeAddressQueryParams::new(stake_address.clone(), adjusted_slot_num.into()), + GetTxoByStakeAddressQueryParams::new(stake_address.clone(), slot_num.into()), ) .await?; - // Aggregate TXO info. - while let Some(row_res) = txos_iter.next().await { - let row = row_res?; - - // Filter out already known spent TXOs. - if row.spent_slot.is_some() { - continue; - } + let txo_map = txos_stream + .map_err(Into::::into) + .try_fold(HashMap::new(), |mut txo_map, row| { + async move { + let key = (row.txn_id.into(), row.txo.into()); + txo_map.insert(key, TxoInfo { + value: row.value, + txn_index: row.txn_index.into(), + txo: row.txo.into(), + slot_no: row.slot_no.into(), + spent_slot_no: row.spent_slot.map(Into::into), + }); + Ok(txo_map) + } + }) + .await?; + Ok(txo_map) +} - let key = (row.slot_no, row.txn_index, row.txo); - txo_map.insert(key, TxoInfo { - value: row.value, - txn_hash: row.txn_id.into(), - txn_index: row.txn_index.into(), - txo: row.txo.into(), - slot_no: row.slot_no.into(), - spent_slot_no: None, - assets: HashMap::new(), - }); - } +/// TXO Assets map type alias +type TxoAssetsMap = HashMap<(Slot, TxnIndex, i16), TxoAssetInfo>; - // Augment TXO info with asset info. - let mut assets_txos_iter = GetAssetsByStakeAddressQuery::execute( +/// Returns a map of txo asset infos for the given stake address. +async fn get_txo_assets( + session: &CassandraSession, stake_address: &StakeAddress, slot_num: SlotNo, +) -> anyhow::Result { + let assets_txos_stream = GetAssetsByStakeAddressQuery::execute( session, - GetAssetsByStakeAddressParams::new(stake_address.clone(), adjusted_slot_num.into()), + GetAssetsByStakeAddressParams::new(stake_address.clone(), slot_num.into()), ) .await?; - while let Some(row_res) = assets_txos_iter.next().await { - let row = row_res?; - - let txo_info_key = (row.slot_no, row.txn_index, row.txo); - let Some(txo_info) = txo_map.get_mut(&txo_info_key) else { - continue; - }; - - let entry = txo_info - .assets - .entry(row.policy_id.clone()) - .or_insert_with(Vec::new); - - match entry.iter_mut().find(|item| item.id == row.policy_id) { - Some(item) => item.amount += row.value, - None => { - entry.push(TxoAssetInfo { + let tokens_map = assets_txos_stream + .map_err(Into::::into) + .try_fold(HashMap::new(), |mut tokens_map, row| { + async move { + let key = (row.slot_no.into(), row.txn_index.into(), row.txo.into()); + tokens_map.insert(key, TxoAssetInfo { id: row.policy_id, name: row.asset_name.into(), amount: row.value, }); - }, - } - } - - let mut txos_by_txn = HashMap::new(); - for txo_info in txo_map.into_values() { - let txn_map = txos_by_txn - .entry(txo_info.txn_hash) - .or_insert(HashMap::new()); - txn_map.insert(txo_info.txo, txo_info); - } - - Ok(txos_by_txn) + Ok(tokens_map) + } + }) + .await?; + Ok(tokens_map) } /// Checks if the given TXOs were spent and mark then as such. -async fn check_and_set_spent( - session: &CassandraSession, txos_by_txn: &mut TxosByTxn, -) -> anyhow::Result<()> { - let txn_hashes = txos_by_txn.keys().copied().collect::>(); +async fn update_spent( + session: &CassandraSession, stake_address: &StakeAddress, txos: &mut TxoMap, +) -> anyhow::Result> { + let txn_hashes = txos + .iter() + .filter(|(_, txo)| txo.spent_slot_no.is_none()) + .map(|((tx_id, _), _)| *tx_id) + .collect::>() + .into_iter() + .collect::>(); + + let mut params = Vec::new(); for chunk in txn_hashes.chunks(100) { - let mut txi_iter = GetTxiByTxnHashesQuery::execute( + let mut txi_stream = GetTxiByTxnHashesQuery::execute( session, GetTxiByTxnHashesQueryParams::new(chunk.to_vec()), ) .await?; - while let Some(row_res) = txi_iter.next().await { - let row = row_res?; - - if let Some(txn_map) = txos_by_txn.get_mut(&row.txn_id.into()) { - if let Some(txo_info) = txn_map.get_mut(&row.txo.into()) { - txo_info.spent_slot_no = Some(row.slot_no.into()); - } - } - } - } - - Ok(()) -} - -/// Sets TXOs as spent in the database if they are marked as spent in the map. -async fn update_spent( - session: &CassandraSession, stake_address: &StakeAddress, txos_by_txn: &TxosByTxn, -) -> anyhow::Result<()> { - let mut params = Vec::new(); - for txn_map in txos_by_txn.values() { - for txo_info in txn_map.values() { - if txo_info.spent_slot_no.is_none() { - continue; - } - - if let Some(spent_slot) = txo_info.spent_slot_no { + while let Some(row) = txi_stream.try_next().await? { + let key = (row.txn_id.into(), row.txo.into()); + if let Some(txo_info) = txos.get_mut(&key) { params.push(UpdateTxoSpentQueryParams { stake_address: stake_address.clone().into(), txn_index: txo_info.txn_index.into(), txo: txo_info.txo.into(), slot_no: txo_info.slot_no.into(), - spent_slot: spent_slot.into(), + spent_slot: row.slot_no, }); + + txo_info.spent_slot_no = Some(row.slot_no.into()); } } } - UpdateTxoSpentQuery::execute(session, params).await?; - - Ok(()) + Ok(params) } /// Builds an instance of [`StakeInfo`] based on the TXOs given. -fn build_stake_info(txos_by_txn: TxosByTxn) -> anyhow::Result { +fn build_stake_info( + txos: TxoMap, mut tokens: TxoAssetsMap, slot_num: SlotNo, +) -> anyhow::Result { + let slot_num = slot_num.into(); let mut stake_info = StakeInfo::default(); - for txn_map in txos_by_txn.into_values() { - for txo_info in txn_map.into_values() { - if txo_info.spent_slot_no.is_none() { - let value = u64::try_from(txo_info.value)?; - stake_info.ada_amount = stake_info - .ada_amount - .checked_add(value) - .ok_or_else(|| { - anyhow::anyhow!( - "Total stake amount overflow: {} + {value}", - stake_info.ada_amount - ) - })? - .into(); - - for asset in txo_info.assets.into_values().flatten() { - stake_info.native_tokens.push(StakedNativeTokenInfo { - policy_hash: asset.id.try_into()?, - asset_name: asset.name, - amount: asset.amount.try_into()?, - }); - } + for txo_info in txos.into_values() { + // Filter out spent TXOs. + if let Some(spent_slot) = txo_info.spent_slot_no { + if spent_slot <= slot_num { + continue; + } + } - let slot_no = txo_info.slot_no.into(); - if stake_info.slot_number < slot_no { - stake_info.slot_number = slot_no; - } + let value = u64::try_from(txo_info.value)?; + stake_info.ada_amount = stake_info + .ada_amount + .checked_add(value) + .ok_or_else(|| { + anyhow::anyhow!( + "Total stake amount overflow: {} + {value}", + stake_info.ada_amount + ) + })? + .into(); + + let key = (txo_info.slot_no, txo_info.txn_index, txo_info.txo); + if let Some(native_token) = tokens.remove(&key) { + match native_token.amount.try_into() { + Ok(amount) => { + stake_info.assets.push(StakedTxoAssetInfo { + policy_hash: native_token.id.try_into()?, + asset_name: native_token.name, + amount, + }); + }, + Err(e) => { + debug!("Invalid TXO Asset for {key:?}: {e}"); + }, } } + + let slot_no = txo_info.slot_no.into(); + if stake_info.slot_number < slot_no { + stake_info.slot_number = slot_no; + } } Ok(stake_info) diff --git a/catalyst-gateway/bin/src/service/api/cardano/staking/mod.rs b/catalyst-gateway/bin/src/service/api/cardano/staking/mod.rs index fe5cba050175..397e14f88f10 100644 --- a/catalyst-gateway/bin/src/service/api/cardano/staking/mod.rs +++ b/catalyst-gateway/bin/src/service/api/cardano/staking/mod.rs @@ -51,6 +51,11 @@ impl Api { /// No Authorization required, but Token permitted. _auth: NoneOrRBAC, ) -> assets_get::AllResponses { - assets_get::endpoint(stake_address.0, network.0, SlotNo::into_option(asat.0)).await + Box::pin(assets_get::endpoint( + stake_address.0, + network.0, + SlotNo::into_option(asat.0), + )) + .await } } diff --git a/catalyst-gateway/bin/src/service/common/objects/cardano/stake_info.rs b/catalyst-gateway/bin/src/service/common/objects/cardano/stake_info.rs index e18cf53472ba..f9d629087993 100644 --- a/catalyst-gateway/bin/src/service/common/objects/cardano/stake_info.rs +++ b/catalyst-gateway/bin/src/service/common/objects/cardano/stake_info.rs @@ -14,10 +14,10 @@ use crate::service::common::types::{ }, }; -/// User's staked native token info. +/// User's staked txo asset info. #[derive(Object, Debug, Clone)] #[oai(example)] -pub(crate) struct StakedNativeTokenInfo { +pub(crate) struct StakedTxoAssetInfo { /// Token policy hash. pub(crate) policy_hash: HexEncodedHash28, /// Token policies Asset Name. @@ -26,7 +26,7 @@ pub(crate) struct StakedNativeTokenInfo { pub(crate) amount: AssetValue, } -impl Example for StakedNativeTokenInfo { +impl Example for StakedTxoAssetInfo { fn example() -> Self { Self { policy_hash: Example::example(), @@ -38,17 +38,17 @@ impl Example for StakedNativeTokenInfo { // List of User's Staked Native Token Info impl_array_types!( - StakedNativeTokenInfoList, - StakedNativeTokenInfo, + StakedAssetInfoList, + StakedTxoAssetInfo, Some(poem_openapi::registry::MetaSchema { example: Self::example().to_json(), max_items: Some(1000), - items: Some(Box::new(StakedNativeTokenInfo::schema_ref())), + items: Some(Box::new(StakedTxoAssetInfo::schema_ref())), ..poem_openapi::registry::MetaSchema::ANY }) ); -impl Example for StakedNativeTokenInfoList { +impl Example for StakedAssetInfoList { fn example() -> Self { Self(vec![Example::example()]) } @@ -64,8 +64,8 @@ pub(crate) struct StakeInfo { /// Block's slot number which contains the latest unspent UTXO. pub(crate) slot_number: SlotNo, - /// Native token infos. - pub(crate) native_tokens: StakedNativeTokenInfoList, + /// TXO assets infos. + pub(crate) assets: StakedAssetInfoList, } impl Example for StakeInfo { @@ -73,7 +73,7 @@ impl Example for StakeInfo { Self { slot_number: SlotNo::example(), ada_amount: AdaValue::example(), - native_tokens: Vec::new().into(), + assets: Vec::new().into(), } } } diff --git a/catalyst-gateway/bin/src/service/common/types/cardano/slot_no.rs b/catalyst-gateway/bin/src/service/common/types/cardano/slot_no.rs index 482747ef362f..393753a70339 100644 --- a/catalyst-gateway/bin/src/service/common/types/cardano/slot_no.rs +++ b/catalyst-gateway/bin/src/service/common/types/cardano/slot_no.rs @@ -17,10 +17,6 @@ const TITLE: &str = "Cardano Blockchain Slot Number"; const DESCRIPTION: &str = "The Slot Number of a Cardano Block on the chain."; /// Example. pub(crate) const EXAMPLE: u64 = 1_234_567; -/// Minimum. -const MINIMUM: u64 = 0; -/// Maximum. -const MAXIMUM: u64 = u64::MAX / 2; /// Schema. #[allow(clippy::cast_precision_loss)] @@ -29,8 +25,8 @@ static SCHEMA: LazyLock = LazyLock::new(|| { title: Some(TITLE.to_owned()), description: Some(DESCRIPTION), example: Some(EXAMPLE.into()), - maximum: Some(MAXIMUM as f64), - minimum: Some(MINIMUM as f64), + maximum: Some(SlotNo::MAXIMUM.0 as f64), + minimum: Some(SlotNo::MINIMUM.0 as f64), ..MetaSchema::ANY } }); @@ -40,9 +36,14 @@ static SCHEMA: LazyLock = LazyLock::new(|| { pub(crate) struct SlotNo(u64); impl SlotNo { + /// Maximum. + pub(crate) const MAXIMUM: SlotNo = SlotNo(u64::MAX / 2); + /// Minimum. + pub(crate) const MINIMUM: SlotNo = SlotNo(0); + /// Is the Slot Number valid? fn is_valid(value: u64) -> bool { - (MINIMUM..=MAXIMUM).contains(&value) + (Self::MINIMUM.0..=Self::MAXIMUM.0).contains(&value) } /// Generic conversion of `Option` to `Option`.