diff --git a/Cargo.lock b/Cargo.lock index 5ffbd2d1..7b8a39bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,14 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "acropolis_cardano" +version = "0.1.0" +dependencies = [ + "acropolis_common", + "anyhow", +] + [[package]] name = "acropolis_codec" version = "0.1.0" @@ -37,7 +45,7 @@ dependencies = [ "futures", "hex", "memmap2", - "minicbor 0.26.5", + "minicbor 0.25.1", "num-rational", "num-traits", "rayon", @@ -82,7 +90,7 @@ dependencies = [ "caryatid_sdk", "config", "fjall", - "minicbor 0.26.5", + "minicbor 0.25.1", "tempfile", "tokio", "tracing", @@ -165,7 +173,8 @@ dependencies = [ "config", "fjall", "hex", - "minicbor 0.26.5", + "minicbor 0.25.1", + "pallas 0.33.0", "pallas-traverse 0.33.0", "tempfile", "tokio", @@ -274,7 +283,7 @@ dependencies = [ "config", "fjall", "hex", - "minicbor 0.26.5", + "minicbor 0.25.1", "rayon", "tokio", "tracing", @@ -289,7 +298,7 @@ dependencies = [ "caryatid_sdk", "config", "fjall", - "minicbor 0.26.5", + "minicbor 0.25.1", "tempfile", "tokio", "tracing", @@ -350,6 +359,7 @@ dependencies = [ name = "acropolis_module_rest_blockfrost" version = "0.1.0" dependencies = [ + "acropolis_cardano", "acropolis_common", "anyhow", "blake2 0.10.6", @@ -3564,16 +3574,6 @@ dependencies = [ "minicbor-derive 0.15.3", ] -[[package]] -name = "minicbor" -version = "0.26.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a309f581ade7597820083bc275075c4c6986e57e53f8d26f88507cfefc8c987" -dependencies = [ - "half 2.7.1", - "minicbor-derive 0.16.2", -] - [[package]] name = "minicbor" version = "2.1.1" @@ -3594,17 +3594,6 @@ dependencies = [ "syn 2.0.109", ] -[[package]] -name = "minicbor-derive" -version = "0.16.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9882ef5c56df184b8ffc107fc6c61e33ee3a654b021961d790a78571bb9d67a" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.109", -] - [[package]] name = "minicbor-derive" version = "0.18.2" diff --git a/Cargo.toml b/Cargo.toml index 736a8233..394e19a5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ [workspace] members = [ # Global message and common definitions + "cardano", "codec", "common", @@ -51,6 +52,7 @@ config = "0.15.11" dashmap = "6.1.0" hex = "0.4" imbl = { version = "5.0.0", features = ["serde"] } +minicbor = { version = "0.25.1", features = ["alloc", "std", "derive"] } opentelemetry = { version = "0.30.0", features = ["trace"] } opentelemetry-otlp = { version = "0.30.0", features = ["grpc-tonic", "trace", "tls"] } opentelemetry_sdk = { version = "0.30.0", features = ["rt-tokio"] } diff --git a/cardano/Cargo.toml b/cardano/Cargo.toml new file mode 100644 index 00000000..5f656dbf --- /dev/null +++ b/cardano/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "acropolis_cardano" +version = "0.1.0" +edition = "2024" + +[dependencies] +acropolis_common = { version = "0.3.0", path = "../common" } +anyhow.workspace = true diff --git a/cardano/src/lib.rs b/cardano/src/lib.rs new file mode 100644 index 00000000..37f08066 --- /dev/null +++ b/cardano/src/lib.rs @@ -0,0 +1 @@ +pub mod transaction; diff --git a/cardano/src/transaction.rs b/cardano/src/transaction.rs new file mode 100644 index 00000000..f1747369 --- /dev/null +++ b/cardano/src/transaction.rs @@ -0,0 +1,31 @@ +use acropolis_common::{Lovelace, protocol_params::ProtocolParams}; +use anyhow::{Error, anyhow}; + +pub fn calculate_transaction_fee( + recorded_fee: &Option, + inputs: &[Lovelace], + outputs: &[Lovelace], +) -> Lovelace { + match recorded_fee { + Some(fee) => *fee, + None => inputs.iter().sum::() - outputs.iter().sum::(), + } +} + +pub fn calculate_deposit( + pool_update_count: u64, + stake_cert_count: u64, + params: &ProtocolParams, +) -> Result { + match ¶ms.shelley { + Some(shelley) => Ok(stake_cert_count * shelley.protocol_params.key_deposit + + pool_update_count * shelley.protocol_params.pool_deposit), + None => { + if pool_update_count > 0 || stake_cert_count > 0 { + Err(anyhow!("No Shelley params, but deposits present")) + } else { + Ok(0) + } + } + } +} diff --git a/codec/src/map_parameters.rs b/codec/src/map_parameters.rs index 65d32b0d..150609d3 100644 --- a/codec/src/map_parameters.rs +++ b/codec/src/map_parameters.rs @@ -19,7 +19,10 @@ use acropolis_common::{ *, }; use pallas_primitives::conway::PseudoScript; -use std::collections::{HashMap, HashSet}; +use std::{ + collections::{HashMap, HashSet}, + net::{Ipv4Addr, Ipv6Addr}, +}; /// Map Pallas Network to our NetworkId pub fn map_network(network: addresses::Network) -> Result { @@ -204,7 +207,7 @@ fn map_constitution(constitution: &conway::Constitution) -> Constitution { } /// Map a Pallas Relay to ours -fn map_relay(relay: &PallasRelay) -> Relay { +pub fn map_relay(relay: &PallasRelay) -> Relay { match relay { PallasRelay::SingleHostAddr(port, ipv4, ipv6) => Relay::SingleHostAddr(SingleHostAddr { port: match port { @@ -212,11 +215,11 @@ fn map_relay(relay: &PallasRelay) -> Relay { _ => None, }, ipv4: match ipv4 { - Nullable::Some(ipv4) => ipv4.try_into().ok(), + Nullable::Some(ipv4) => <[u8; 4]>::try_from(ipv4).ok().map(Ipv4Addr::from), _ => None, }, ipv6: match ipv6 { - Nullable::Some(ipv6) => ipv6.try_into().ok(), + Nullable::Some(ipv6) => <[u8; 16]>::try_from(ipv6).ok().map(Ipv6Addr::from), _ => None, }, }), @@ -236,6 +239,53 @@ fn map_relay(relay: &PallasRelay) -> Relay { // // Certificates // +#[allow(clippy::too_many_arguments)] +pub fn to_pool_reg( + operator: &pallas_primitives::PoolKeyhash, + vrf_keyhash: &pallas_primitives::VrfKeyhash, + pledge: &pallas_primitives::Coin, + cost: &pallas_primitives::Coin, + margin: &pallas_primitives::UnitInterval, + reward_account: &pallas_primitives::RewardAccount, + pool_owners: &[pallas_primitives::AddrKeyhash], + relays: &[pallas_primitives::Relay], + pool_metadata: &Nullable, + network_id: NetworkId, + force_reward_network_id: bool, +) -> Result { + Ok(PoolRegistration { + operator: to_pool_id(operator), + vrf_key_hash: to_vrf_key(vrf_keyhash), + pledge: *pledge, + cost: *cost, + margin: Ratio { + numerator: margin.numerator, + denominator: margin.denominator, + }, + reward_account: if force_reward_network_id { + StakeAddress::new( + StakeAddress::from_binary(reward_account)?.credential, + network_id.clone(), + ) + } else { + StakeAddress::from_binary(reward_account)? + }, + pool_owners: pool_owners + .iter() + .map(|v| { + StakeAddress::new(StakeCredential::AddrKeyHash(to_hash(v)), network_id.clone()) + }) + .collect(), + relays: relays.iter().map(map_relay).collect(), + pool_metadata: match pool_metadata { + Nullable::Some(md) => Some(PoolMetadata { + url: md.url.clone(), + hash: md.hash.to_vec(), + }), + _ => None, + }, + }) +} /// Derive our TxCertificate from a Pallas Certificate pub fn map_certificate( @@ -277,34 +327,19 @@ pub fn map_certificate( relays, pool_metadata, } => Ok(TxCertificateWithPos { - cert: TxCertificate::PoolRegistration(PoolRegistration { - operator: to_pool_id(operator), - vrf_key_hash: to_vrf_key(vrf_keyhash), - pledge: *pledge, - cost: *cost, - margin: Ratio { - numerator: margin.numerator, - denominator: margin.denominator, - }, - reward_account: StakeAddress::from_binary(reward_account)?, - pool_owners: pool_owners - .iter() - .map(|v| { - StakeAddress::new( - StakeCredential::AddrKeyHash(to_hash(v)), - network_id.clone(), - ) - }) - .collect(), - relays: relays.iter().map(map_relay).collect(), - pool_metadata: match pool_metadata { - Nullable::Some(md) => Some(PoolMetadata { - url: md.url.clone(), - hash: md.hash.to_vec(), - }), - _ => None, - }, - }), + cert: TxCertificate::PoolRegistration(to_pool_reg( + operator, + vrf_keyhash, + pledge, + cost, + margin, + reward_account, + pool_owners, + relays, + pool_metadata, + network_id, + false, + )?), tx_identifier, cert_index: cert_index as u64, }), @@ -397,41 +432,23 @@ pub fn map_certificate( relays, pool_metadata, } => Ok(TxCertificateWithPos { - cert: TxCertificate::PoolRegistration(PoolRegistration { - operator: to_pool_id(operator), - vrf_key_hash: to_vrf_key(vrf_keyhash), - pledge: *pledge, - cost: *cost, - margin: Ratio { - numerator: margin.numerator, - denominator: margin.denominator, - }, + cert: TxCertificate::PoolRegistration(to_pool_reg( + operator, + vrf_keyhash, + pledge, + cost, + margin, + reward_account, + pool_owners, + relays, + pool_metadata, + network_id, // Force networkId - in mainnet epoch 208, one SPO (c63dab6d780a) uses // an e0 (testnet!) address, and this then fails to match their actual // reward account (e1). Feels like this should have been // a validation failure, but clearly wasn't! - reward_account: StakeAddress::new( - StakeAddress::from_binary(reward_account)?.credential, - network_id.clone(), - ), - pool_owners: pool_owners - .into_iter() - .map(|v| { - StakeAddress::new( - StakeCredential::AddrKeyHash(to_hash(v)), - network_id.clone(), - ) - }) - .collect(), - relays: relays.iter().map(map_relay).collect(), - pool_metadata: match pool_metadata { - Nullable::Some(md) => Some(PoolMetadata { - url: md.url.clone(), - hash: md.hash.to_vec(), - }), - _ => None, - }, - }), + true, + )?), tx_identifier, cert_index: cert_index as u64, }), @@ -1119,3 +1136,19 @@ pub fn map_reference_script(script: &Option) -> Option< None => None, } } + +pub fn map_metadata(metadata: &pallas_primitives::Metadatum) -> Metadata { + match metadata { + pallas_primitives::Metadatum::Int(pallas_primitives::Int(i)) => { + Metadata::Int(MetadataInt(*i)) + } + pallas_primitives::Metadatum::Bytes(b) => Metadata::Bytes(b.to_vec()), + pallas_primitives::Metadatum::Text(s) => Metadata::Text(s.clone()), + pallas_primitives::Metadatum::Array(a) => { + Metadata::Array(a.iter().map(map_metadata).collect()) + } + pallas_primitives::Metadatum::Map(m) => { + Metadata::Map(m.iter().map(|(k, v)| (map_metadata(k), map_metadata(v))).collect()) + } + } +} diff --git a/common/Cargo.toml b/common/Cargo.toml index fc5e948d..57444966 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -33,7 +33,7 @@ tempfile = "3" tokio = { workspace = true } tracing = { workspace = true } futures = "0.3.31" -minicbor = { version = "0.26.0", features = ["std", "half", "derive"] } +minicbor = { workspace = true, features = ["std", "half", "derive"] } num-traits = "0.2" dashmap = { workspace = true } rayon = "1.11.0" diff --git a/common/src/lib.rs b/common/src/lib.rs index daa6dda7..cc5c9ba9 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -11,6 +11,7 @@ pub mod hash; pub mod ledger_state; pub mod math; pub mod messages; +pub mod metadata; pub mod params; pub mod protocol_params; pub mod queries; @@ -28,4 +29,5 @@ pub mod validation; // Flattened re-exports pub use self::address::*; +pub use self::metadata::*; pub use self::types::*; diff --git a/common/src/metadata.rs b/common/src/metadata.rs new file mode 100644 index 00000000..62ba4ef0 --- /dev/null +++ b/common/src/metadata.rs @@ -0,0 +1,40 @@ +use minicbor::data::Int; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use std::fmt; + +#[derive(Debug, Clone)] +pub struct MetadataInt(pub Int); + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub enum Metadata { + Int(MetadataInt), + Bytes(Vec), + Text(String), + Array(Vec), + Map(Vec<(Metadata, Metadata)>), +} + +impl Serialize for MetadataInt { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_i128(self.0.into()) + } +} + +impl fmt::Display for MetadataInt { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +impl<'a> Deserialize<'a> for MetadataInt { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'a>, + { + // TODO if this is ever used, i64 may not be enough! + Ok(MetadataInt(Int::from(i64::deserialize(deserializer)?))) + } +} diff --git a/common/src/queries/transactions.rs b/common/src/queries/transactions.rs index ec1283c4..88a9cf9b 100644 --- a/common/src/queries/transactions.rs +++ b/common/src/queries/transactions.rs @@ -1,22 +1,32 @@ +use crate::{ + BlockHash, InstantaneousRewardSource, Lovelace, Metadata, NativeAsset, PoolId, + PoolRegistration, StakeAddress, TxHash, +}; + +pub const DEFAULT_TRANSACTIONS_QUERY_TOPIC: (&str, &str) = ( + "transactions-state-query-topic", + "cardano.query.transactions", +); use crate::queries::errors::QueryError; #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum TransactionsStateQuery { - GetTransactionInfo, + GetTransactionInfo { tx_hash: TxHash }, GetTransactionUTxOs, - GetTransactionStakeCertificates, - GetTransactionDelegationCertificates, - GetTransactionWithdrawals, - GetTransactionMIRs, - GetTransactionPoolUpdateCertificates, - GetTransactionPoolRetirementCertificates, - GetTransactionMetadata, + GetTransactionStakeCertificates { tx_hash: TxHash }, + GetTransactionDelegationCertificates { tx_hash: TxHash }, + GetTransactionWithdrawals { tx_hash: TxHash }, + GetTransactionMIRs { tx_hash: TxHash }, + GetTransactionPoolUpdateCertificates { tx_hash: TxHash }, + GetTransactionPoolRetirementCertificates { tx_hash: TxHash }, + GetTransactionMetadata { tx_hash: TxHash }, GetTransactionMetadataCBOR, GetTransactionRedeemers, GetTransactionRequiredSigners, GetTransactionCBOR, } +#[allow(clippy::large_enum_variant)] #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum TransactionsStateQueryResponse { TransactionInfo(TransactionInfo), @@ -36,37 +46,137 @@ pub enum TransactionsStateQueryResponse { } #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct TransactionInfo {} +pub enum TransactionOutputAmount { + Lovelace(Lovelace), + Asset(NativeAsset), +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct TransactionInfo { + pub hash: TxHash, + pub block_hash: BlockHash, + pub block_number: u64, + pub block_time: u64, + pub epoch: u64, + pub slot: u64, + pub index: u64, + pub output_amounts: Vec, + pub recorded_fee: Option, + pub size: u64, + pub invalid_before: Option, + pub invalid_after: Option, + pub utxo_count: u64, + pub withdrawal_count: u64, + pub mir_cert_count: u64, + pub delegation_count: u64, + pub stake_cert_count: u64, + pub pool_update_count: u64, + pub pool_retire_count: u64, + pub asset_mint_or_burn_count: u64, + pub redeemer_count: u64, + pub valid_contract: bool, +} #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct TransactionUTxOs {} #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct TransactionStakeCertificates {} +pub struct TransactionStakeCertificate { + pub index: u64, + pub address: StakeAddress, + pub registration: bool, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct TransactionStakeCertificates { + pub certificates: Vec, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct TransactionDelegationCertificate { + pub index: u64, + pub address: StakeAddress, + pub pool_id: PoolId, + pub active_epoch: u64, +} #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct TransactionDelegationCertificates {} +pub struct TransactionDelegationCertificates { + pub certificates: Vec, +} #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct TransactionWithdrawals {} +pub struct TransactionWithdrawal { + pub address: StakeAddress, + pub amount: Lovelace, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct TransactionWithdrawals { + pub withdrawals: Vec, +} #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct ScriptDatumJSON {} #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct TransactionMIRs {} +pub struct TransactionMIR { + pub cert_index: u64, + pub pot: InstantaneousRewardSource, + pub address: StakeAddress, + pub amount: Lovelace, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct TransactionMIRs { + pub mirs: Vec, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct TransactionPoolUpdateCertificate { + pub cert_index: u64, + pub pool_reg: PoolRegistration, + pub active_epoch: u64, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct TransactionPoolUpdateCertificates { + pub pool_updates: Vec, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct TransactionPoolRetirementCertificate { + pub cert_index: u64, + pub pool_id: PoolId, + pub retirement_epoch: u64, +} #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct TransactionPoolUpdateCertificates {} +pub struct TransactionPoolRetirementCertificates { + pub pool_retirements: Vec, +} #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct TransactionPoolRetirementCertificates {} +pub struct TransactionMetadataItem { + pub label: String, + pub json_metadata: Metadata, +} #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct TransactionMetadata {} +pub struct TransactionMetadata { + pub metadata: Vec, +} #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct TransactionMetadataCBOR {} +pub struct TransactionMetadataItemCBOR { + pub label: String, + pub metadata: Vec, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct TransactionMetadataCBOR { + pub metadata: Vec, +} #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct TransactionRedeemers {} diff --git a/common/src/queries/utils.rs b/common/src/queries/utils.rs index 897e61a2..f7ebffc2 100644 --- a/common/src/queries/utils.rs +++ b/common/src/queries/utils.rs @@ -1,6 +1,6 @@ use caryatid_sdk::Context; use serde::Serialize; -use std::sync::Arc; +use std::{future::Future, sync::Arc}; use crate::messages::{Message, RESTResponse}; use crate::queries::errors::QueryError; @@ -21,6 +21,24 @@ where extractor(message) } +pub async fn query_state_async( + context: &Arc>, + topic: &str, + request_msg: Arc, + extractor: F, +) -> Result +where + F: FnOnce(Message) -> Fut, + Fut: Future>, +{ + // build message to query + let raw_msg = context.message_bus.request(topic, request_msg).await?; + + let message = Arc::try_unwrap(raw_msg).unwrap_or_else(|arc| (*arc).clone()); + + extractor(message).await +} + pub async fn rest_query_state( context: &Arc>, topic: &str, @@ -43,3 +61,27 @@ where let json = serde_json::to_string_pretty(&data)?; Ok(RESTResponse::with_json(200, &json)) } + +pub async fn rest_query_state_async( + context: &Arc>, + topic: &str, + request_msg: Arc, + extractor: F, +) -> Result +where + F: FnOnce(Message) -> Fut, + Fut: Future>>, + T: Serialize, +{ + let data = query_state_async(context, topic, request_msg, async |response| { + extractor(response).await.ok_or_else(|| { + QueryError::internal_error(format!( + "Unexpected response message type while calling {topic}" + )) + })? + }) + .await?; + + let json = serde_json::to_string_pretty(&data)?; + Ok(RESTResponse::with_json(200, &json)) +} diff --git a/common/src/types.rs b/common/src/types.rs index d7c90545..a1a80700 100644 --- a/common/src/types.rs +++ b/common/src/types.rs @@ -22,6 +22,7 @@ use std::{ collections::{HashMap, HashSet}, fmt, fmt::{Display, Formatter}, + net::{Ipv4Addr, Ipv6Addr}, ops::{AddAssign, Neg}, str::FromStr, }; @@ -775,6 +776,16 @@ pub enum Pot { Deposits, } +impl fmt::Display for Pot { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Pot::Reserves => write!(f, "reserves"), + Pot::Treasury => write!(f, "treasury"), + Pot::Deposits => write!(f, "deposits"), + } + } +} + /// Pot Delta - internal change of pot values at genesis / era boundaries #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct PotDelta { @@ -910,10 +921,10 @@ pub struct SingleHostAddr { pub port: Option, /// Optional IPv4 address - pub ipv4: Option<[u8; 4]>, + pub ipv4: Option, /// Optional IPv6 address - pub ipv6: Option<[u8; 16]>, + pub ipv6: Option, } /// Relay hostname @@ -1133,6 +1144,15 @@ pub enum InstantaneousRewardSource { Treasury, } +impl fmt::Display for InstantaneousRewardSource { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + InstantaneousRewardSource::Reserves => write!(f, "reserves"), + InstantaneousRewardSource::Treasury => write!(f, "treasury"), + } + } +} + /// Target of a MIR #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub enum InstantaneousRewardTarget { diff --git a/modules/address_state/Cargo.toml b/modules/address_state/Cargo.toml index 980ed609..762c2b33 100644 --- a/modules/address_state/Cargo.toml +++ b/modules/address_state/Cargo.toml @@ -16,7 +16,7 @@ caryatid_sdk = { workspace = true } anyhow = { workspace = true } config = { workspace = true } fjall = "2.7.0" -minicbor = { version = "0.26.0", features = ["std", "derive"] } +minicbor = { workspace = true, features = ["std", "derive"] } tokio = { workspace = true } tracing = { workspace = true } diff --git a/modules/chain_store/Cargo.toml b/modules/chain_store/Cargo.toml index dbee7ae5..a7d2ccb7 100644 --- a/modules/chain_store/Cargo.toml +++ b/modules/chain_store/Cargo.toml @@ -14,10 +14,11 @@ anyhow = "1.0" config = "0.15.11" fjall = "2.7.0" hex = "0.4" -minicbor = { version = "0.26.0", features = ["std", "half", "derive"] } +minicbor = { workspace = true, features = ["std", "half", "derive"] } pallas-traverse = { workspace = true } tracing = "0.1.40" tokio.workspace = true +pallas.workspace = true [dev-dependencies] tempfile = "3" diff --git a/modules/chain_store/src/chain_store.rs b/modules/chain_store/src/chain_store.rs index 2dabfff4..9db194cd 100644 --- a/modules/chain_store/src/chain_store.rs +++ b/modules/chain_store/src/chain_store.rs @@ -1,12 +1,25 @@ mod stores; -use crate::stores::{fjall::FjallStore, Block, Store}; -use acropolis_codec::{block::map_to_block_issuer, map_parameters}; +use crate::stores::{fjall::FjallStore, Block, Store, Tx}; +use acropolis_codec::{ + block::map_to_block_issuer, + map_parameters, + map_parameters::{map_metadata, map_stake_address, to_pool_id, to_pool_reg}, +}; use acropolis_common::queries::blocks::TransactionHashesAndTimeStamps; use acropolis_common::queries::errors::QueryError; use acropolis_common::{ crypto::keyhash_224, messages::{CardanoMessage, Message, StateQuery, StateQueryResponse}, + queries::transactions::{ + TransactionDelegationCertificate, TransactionDelegationCertificates, TransactionInfo, + TransactionMIR, TransactionMIRs, TransactionMetadata, TransactionMetadataItem, + TransactionOutputAmount, TransactionPoolRetirementCertificate, + TransactionPoolRetirementCertificates, TransactionPoolUpdateCertificate, + TransactionPoolUpdateCertificates, TransactionStakeCertificate, + TransactionStakeCertificates, TransactionWithdrawal, TransactionWithdrawals, + TransactionsStateQuery, TransactionsStateQueryResponse, DEFAULT_TRANSACTIONS_QUERY_TOPIC, + }, queries::{ blocks::{ BlockHashes, BlockInfo, BlockInvolvedAddress, BlockInvolvedAddresses, BlockKey, @@ -17,11 +30,14 @@ use acropolis_common::{ misc::Order, }, state_history::{StateHistory, StateHistoryStore}, - BechOrdAddress, BlockHash, GenesisDelegate, HeavyDelegate, PoolId, TxHash, + AssetName, BechOrdAddress, BlockHash, GenesisDelegate, HeavyDelegate, + InstantaneousRewardSource, NativeAsset, NetworkId, PoolId, StakeAddress, TxHash, }; -use anyhow::{bail, Result}; +use anyhow::{anyhow, bail, Result}; use caryatid_sdk::{module, Context}; use config::Config; +use pallas::ledger::primitives::{alonzo, conway}; +use pallas_traverse::{MultiEraCert, MultiEraMeta}; use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use tokio::sync::Mutex; @@ -48,6 +64,11 @@ impl ChainStore { let block_queries_topic = config .get_string(DEFAULT_BLOCKS_QUERY_TOPIC.0) .unwrap_or(DEFAULT_BLOCKS_QUERY_TOPIC.1.to_string()); + let txs_queries_topic = config + .get_string(DEFAULT_TRANSACTIONS_QUERY_TOPIC.0) + .unwrap_or(DEFAULT_TRANSACTIONS_QUERY_TOPIC.1.to_string()); + let network_id: NetworkId = + config.get_string("network-id").unwrap_or("mainnet".to_string()).into(); let store_type = config.get_string("store").unwrap_or(DEFAULT_STORE.to_string()); let store: Arc = match store_type.as_str() { @@ -89,6 +110,30 @@ impl ChainStore { } }); + let query_store = store.clone(); + context.handle(&txs_queries_topic, move |req| { + let query_store = query_store.clone(); + let network_id = network_id.clone(); + async move { + let Message::StateQuery(StateQuery::Transactions(query)) = req.as_ref() else { + return Arc::new(Message::StateQueryResponse( + StateQueryResponse::Transactions(TransactionsStateQueryResponse::Error( + QueryError::internal_error("Invalid message for txs-state"), + )), + )); + }; + let res = + Self::handle_txs_query(&query_store, query, network_id).unwrap_or_else(|err| { + TransactionsStateQueryResponse::Error(QueryError::internal_error( + err.to_string(), + )) + }); + Arc::new(Message::StateQueryResponse( + StateQueryResponse::Transactions(res), + )) + } + }); + let mut new_blocks_subscription = context.subscribe(&new_blocks_topic).await?; let mut params_subscription = context.subscribe(¶ms_topic).await?; context.run(async move { @@ -733,6 +778,493 @@ impl ChainStore { Ok(BlockInvolvedAddresses { addresses }) } + fn to_tx_info(tx: &Tx) -> Result { + let block = pallas_traverse::MultiEraBlock::decode(&tx.block.bytes)?; + let txs = block.txs(); + let Some(tx_decoded) = txs.get(tx.index as usize) else { + return Err(anyhow!("Transaction not found in block for given index")); + }; + let mut output_amounts = Vec::new(); + for output in tx_decoded.outputs() { + let value = output.value(); + let lovelace_amount = value.coin(); + if lovelace_amount != 0 { + output_amounts.push(TransactionOutputAmount::Lovelace(lovelace_amount)); + } + for policy in value.assets() { + for asset in policy.assets() { + if asset.is_output() { + output_amounts.push(TransactionOutputAmount::Asset(NativeAsset { + name: AssetName::new(asset.name()).ok_or(anyhow!("Bad asset name"))?, + amount: asset.output_coin().ok_or(anyhow!("No output amount"))?, + })); + } + } + } + } + let mut mir_cert_count = 0; + let mut delegation_count = 0; + let mut stake_cert_count = 0; + let mut pool_update_count = 0; + let mut pool_retire_count = 0; + for cert in tx_decoded.certs() { + match cert { + MultiEraCert::AlonzoCompatible(cert) => match cert.as_ref().as_ref() { + alonzo::Certificate::PoolRegistration { .. } => { + pool_update_count += 1; + } + alonzo::Certificate::PoolRetirement { .. } => pool_retire_count += 1, + alonzo::Certificate::MoveInstantaneousRewardsCert { .. } => mir_cert_count += 1, + alonzo::Certificate::StakeRegistration { .. } => { + stake_cert_count += 1; + } + alonzo::Certificate::StakeDelegation { .. } => delegation_count += 1, + _ => (), + }, + MultiEraCert::Conway(cert) => match cert.as_ref().as_ref() { + conway::Certificate::PoolRegistration { .. } => { + pool_update_count += 1; + } + conway::Certificate::PoolRetirement { .. } => pool_retire_count += 1, + conway::Certificate::StakeRegistration { .. } => { + stake_cert_count += 1; + } + conway::Certificate::StakeDelegation { .. } => delegation_count += 1, + _ => (), + }, + _ => (), + } + } + Ok(TransactionInfo { + hash: TxHash::from(*tx_decoded.hash()), + block_hash: BlockHash::from(*block.hash()), + block_number: block.number(), + block_time: tx.block.extra.timestamp, + epoch: tx.block.extra.epoch, + slot: block.slot(), + index: tx.index, + output_amounts, + recorded_fee: tx_decoded.fee(), + // TODO reporting too many bytes (140) + size: tx_decoded.size() as u64, + invalid_before: tx_decoded.validity_start(), + // TODO + invalid_after: None, + utxo_count: (tx_decoded.requires().len() + tx_decoded.produces().len()) as u64, + withdrawal_count: tx_decoded.withdrawals_sorted_set().len() as u64, + mir_cert_count, + delegation_count, + stake_cert_count, + pool_update_count, + pool_retire_count, + asset_mint_or_burn_count: tx_decoded + .mints() + .iter() + .map(|p| p.assets().len()) + .sum::() as u64, + redeemer_count: tx_decoded.redeemers().len() as u64, + valid_contract: tx_decoded.is_valid(), + }) + } + + fn to_tx_stakes(tx: &Tx, network_id: NetworkId) -> Result> { + let block = pallas_traverse::MultiEraBlock::decode(&tx.block.bytes)?; + let txs = block.txs(); + let Some(tx_decoded) = txs.get(tx.index as usize) else { + return Err(anyhow!("Transaction not found in block for given index")); + }; + let mut certs = Vec::new(); + for (index, cert) in tx_decoded.certs().iter().enumerate() { + match cert { + MultiEraCert::AlonzoCompatible(cert) => match cert.as_ref().as_ref() { + alonzo::Certificate::StakeRegistration(cred) => { + certs.push(TransactionStakeCertificate { + index: index as u64, + address: map_stake_address(cred, network_id.clone()), + registration: true, + }); + } + alonzo::Certificate::StakeDeregistration(cred) => { + certs.push(TransactionStakeCertificate { + index: index as u64, + address: map_stake_address(cred, network_id.clone()), + registration: false, + }); + } + _ => (), + }, + MultiEraCert::Conway(cert) => match cert.as_ref().as_ref() { + conway::Certificate::StakeRegistration(cred) => { + certs.push(TransactionStakeCertificate { + index: index as u64, + address: map_stake_address(cred, network_id.clone()), + registration: true, + }); + } + conway::Certificate::StakeDeregistration(cred) => { + certs.push(TransactionStakeCertificate { + index: index as u64, + address: map_stake_address(cred, network_id.clone()), + registration: false, + }); + } + _ => (), + }, + _ => (), + } + } + Ok(certs) + } + + fn to_tx_delegations( + tx: &Tx, + network_id: NetworkId, + ) -> Result> { + let block = pallas_traverse::MultiEraBlock::decode(&tx.block.bytes)?; + let txs = block.txs(); + let Some(tx_decoded) = txs.get(tx.index as usize) else { + return Err(anyhow!("Transaction not found in block for given index")); + }; + let mut certs = Vec::new(); + for (index, cert) in tx_decoded.certs().iter().enumerate() { + match cert { + MultiEraCert::AlonzoCompatible(cert) => { + if let alonzo::Certificate::StakeDelegation(cred, pool_key_hash) = + cert.as_ref().as_ref() + { + certs.push(TransactionDelegationCertificate { + index: index as u64, + address: map_stake_address(cred, network_id.clone()), + pool_id: to_pool_id(pool_key_hash), + active_epoch: tx.block.extra.epoch + 1, + }); + } + } + MultiEraCert::Conway(cert) => { + if let conway::Certificate::StakeDelegation(cred, pool_key_hash) = + cert.as_ref().as_ref() + { + certs.push(TransactionDelegationCertificate { + index: index as u64, + address: map_stake_address(cred, network_id.clone()), + pool_id: to_pool_id(pool_key_hash), + active_epoch: tx.block.extra.epoch + 1, + }); + } + } + _ => (), + } + } + Ok(certs) + } + + fn to_tx_withdrawals(tx: &Tx) -> Result> { + let block = pallas_traverse::MultiEraBlock::decode(&tx.block.bytes)?; + let txs = block.txs(); + let Some(tx_decoded) = txs.get(tx.index as usize) else { + return Err(anyhow!("Transaction not found in block for given index")); + }; + let mut withdrawals = Vec::new(); + for (address, amount) in tx_decoded.withdrawals_sorted_set() { + withdrawals.push(TransactionWithdrawal { + address: StakeAddress::from_binary(address)?, + amount, + }); + } + Ok(withdrawals) + } + + fn to_tx_mirs(tx: &Tx, network_id: NetworkId) -> Result> { + let block = pallas_traverse::MultiEraBlock::decode(&tx.block.bytes)?; + let txs = block.txs(); + let Some(tx_decoded) = txs.get(tx.index as usize) else { + return Err(anyhow!("Transaction not found in block for given index")); + }; + let mut certs = Vec::new(); + for (cert_index, cert) in tx_decoded.certs().iter().enumerate() { + if let MultiEraCert::AlonzoCompatible(cert) = cert { + if let alonzo::Certificate::MoveInstantaneousRewardsCert(cert) = + cert.as_ref().as_ref() + { + match &cert.target { + alonzo::InstantaneousRewardTarget::StakeCredentials(creds) => { + for (cred, amount) in creds.clone().to_vec() { + certs.push(TransactionMIR { + cert_index: cert_index as u64, + pot: match cert.source { + alonzo::InstantaneousRewardSource::Reserves => { + InstantaneousRewardSource::Reserves + } + alonzo::InstantaneousRewardSource::Treasury => { + InstantaneousRewardSource::Treasury + } + }, + address: map_stake_address(&cred, network_id.clone()), + amount: amount as u64, + }); + } + } + alonzo::InstantaneousRewardTarget::OtherAccountingPot(_coin) => { + // TODO + } + } + } + } + } + Ok(certs) + } + + fn to_tx_pool_updates( + tx: &Tx, + network_id: NetworkId, + ) -> Result> { + let block = pallas_traverse::MultiEraBlock::decode(&tx.block.bytes)?; + let txs = block.txs(); + let Some(tx_decoded) = txs.get(tx.index as usize) else { + return Err(anyhow!("Transaction not found in block for given index")); + }; + let mut certs = Vec::new(); + for (cert_index, cert) in tx_decoded.certs().iter().enumerate() { + match cert { + MultiEraCert::AlonzoCompatible(cert) => { + if let alonzo::Certificate::PoolRegistration { + operator, + vrf_keyhash, + pledge, + cost, + margin, + reward_account, + pool_owners, + relays, + pool_metadata, + } = cert.as_ref().as_ref() + { + certs.push(TransactionPoolUpdateCertificate { + cert_index: cert_index as u64, + pool_reg: to_pool_reg( + operator, + vrf_keyhash, + pledge, + cost, + margin, + reward_account, + pool_owners, + relays, + pool_metadata, + network_id.clone(), + false, + )?, + active_epoch: tx.block.extra.epoch + 1, + }); + } + } + MultiEraCert::Conway(cert) => { + if let conway::Certificate::PoolRegistration { + operator, + vrf_keyhash, + pledge, + cost, + margin, + reward_account, + pool_owners, + relays, + pool_metadata, + } = cert.as_ref().as_ref() + { + certs.push(TransactionPoolUpdateCertificate { + cert_index: cert_index as u64, + pool_reg: to_pool_reg( + operator, + vrf_keyhash, + pledge, + cost, + margin, + reward_account, + pool_owners, + relays, + pool_metadata, + network_id.clone(), + false, + )?, + active_epoch: tx.block.extra.epoch + 1, + }); + } + } + _ => (), + } + } + Ok(certs) + } + + fn to_tx_pool_retirements(tx: &Tx) -> Result> { + let block = pallas_traverse::MultiEraBlock::decode(&tx.block.bytes)?; + let txs = block.txs(); + let Some(tx_decoded) = txs.get(tx.index as usize) else { + return Err(anyhow!("Transaction not found in block for given index")); + }; + let mut certs = Vec::new(); + for (cert_index, cert) in tx_decoded.certs().iter().enumerate() { + match cert { + MultiEraCert::AlonzoCompatible(cert) => { + if let alonzo::Certificate::PoolRetirement(operator, epoch) = + cert.as_ref().as_ref() + { + certs.push(TransactionPoolRetirementCertificate { + cert_index: cert_index as u64, + pool_id: to_pool_id(operator), + retirement_epoch: *epoch, + }); + } + } + MultiEraCert::Conway(cert) => { + if let conway::Certificate::PoolRetirement(operator, epoch) = + cert.as_ref().as_ref() + { + certs.push(TransactionPoolRetirementCertificate { + cert_index: cert_index as u64, + pool_id: to_pool_id(operator), + retirement_epoch: *epoch, + }); + } + } + _ => (), + } + } + Ok(certs) + } + + fn to_tx_metadata(tx: &Tx) -> Result> { + let block = pallas_traverse::MultiEraBlock::decode(&tx.block.bytes)?; + let txs = block.txs(); + let Some(tx_decoded) = txs.get(tx.index as usize) else { + return Err(anyhow!("Transaction not found in block for given index")); + }; + let mut items = Vec::new(); + if let MultiEraMeta::AlonzoCompatible(metadata) = tx_decoded.metadata() { + for (label, datum) in &metadata.clone().to_vec() { + items.push(TransactionMetadataItem { + label: label.to_string(), + json_metadata: map_metadata(datum), + }); + } + } + Ok(items) + } + + fn handle_txs_query( + store: &Arc, + query: &TransactionsStateQuery, + network_id: NetworkId, + ) -> Result { + match query { + TransactionsStateQuery::GetTransactionInfo { tx_hash } => { + let Some(tx) = store.get_tx_by_hash(tx_hash.as_ref())? else { + return Ok(TransactionsStateQueryResponse::Error( + QueryError::not_found("Transaction not found"), + )); + }; + Ok(TransactionsStateQueryResponse::TransactionInfo( + Self::to_tx_info(&tx)?, + )) + } + TransactionsStateQuery::GetTransactionStakeCertificates { tx_hash } => { + let Some(tx) = store.get_tx_by_hash(tx_hash.as_ref())? else { + return Ok(TransactionsStateQueryResponse::Error( + QueryError::not_found("Transaction not found"), + )); + }; + Ok( + TransactionsStateQueryResponse::TransactionStakeCertificates( + TransactionStakeCertificates { + certificates: Self::to_tx_stakes(&tx, network_id)?, + }, + ), + ) + } + TransactionsStateQuery::GetTransactionDelegationCertificates { tx_hash } => { + let Some(tx) = store.get_tx_by_hash(tx_hash.as_ref())? else { + return Ok(TransactionsStateQueryResponse::Error( + QueryError::not_found("Transaction not found"), + )); + }; + Ok( + TransactionsStateQueryResponse::TransactionDelegationCertificates( + TransactionDelegationCertificates { + certificates: Self::to_tx_delegations(&tx, network_id)?, + }, + ), + ) + } + TransactionsStateQuery::GetTransactionWithdrawals { tx_hash } => { + let Some(tx) = store.get_tx_by_hash(tx_hash.as_ref())? else { + return Ok(TransactionsStateQueryResponse::Error( + QueryError::not_found("Transaction not found"), + )); + }; + Ok(TransactionsStateQueryResponse::TransactionWithdrawals( + TransactionWithdrawals { + withdrawals: Self::to_tx_withdrawals(&tx)?, + }, + )) + } + TransactionsStateQuery::GetTransactionMIRs { tx_hash } => { + let Some(tx) = store.get_tx_by_hash(tx_hash.as_ref())? else { + return Ok(TransactionsStateQueryResponse::Error( + QueryError::not_found("Transaction not found"), + )); + }; + Ok(TransactionsStateQueryResponse::TransactionMIRs( + TransactionMIRs { + mirs: Self::to_tx_mirs(&tx, network_id)?, + }, + )) + } + TransactionsStateQuery::GetTransactionPoolUpdateCertificates { tx_hash } => { + let Some(tx) = store.get_tx_by_hash(tx_hash.as_ref())? else { + return Ok(TransactionsStateQueryResponse::Error( + QueryError::not_found("Transaction not found"), + )); + }; + Ok( + TransactionsStateQueryResponse::TransactionPoolUpdateCertificates( + TransactionPoolUpdateCertificates { + pool_updates: Self::to_tx_pool_updates(&tx, network_id)?, + }, + ), + ) + } + TransactionsStateQuery::GetTransactionPoolRetirementCertificates { tx_hash } => { + let Some(tx) = store.get_tx_by_hash(tx_hash.as_ref())? else { + return Ok(TransactionsStateQueryResponse::Error( + QueryError::not_found("Transaction not found"), + )); + }; + Ok( + TransactionsStateQueryResponse::TransactionPoolRetirementCertificates( + TransactionPoolRetirementCertificates { + pool_retirements: Self::to_tx_pool_retirements(&tx)?, + }, + ), + ) + } + TransactionsStateQuery::GetTransactionMetadata { tx_hash } => { + let Some(tx) = store.get_tx_by_hash(tx_hash.as_ref())? else { + return Ok(TransactionsStateQueryResponse::Error( + QueryError::not_found("Transaction not found"), + )); + }; + Ok(TransactionsStateQueryResponse::TransactionMetadata( + TransactionMetadata { + metadata: Self::to_tx_metadata(&tx)?, + }, + )) + } + _ => Ok(TransactionsStateQueryResponse::Error( + QueryError::not_implemented("Unimplemented".to_string()), + )), + } + } + fn handle_new_params(state: &mut State, message: Arc) -> Result<()> { if let Message::Cardano((_, CardanoMessage::ProtocolParams(params))) = message.as_ref() { if let Some(byron) = ¶ms.params.byron { diff --git a/modules/chain_store/src/stores/fjall.rs b/modules/chain_store/src/stores/fjall.rs index 3c6adecb..42762f84 100644 --- a/modules/chain_store/src/stores/fjall.rs +++ b/modules/chain_store/src/stores/fjall.rs @@ -1,11 +1,11 @@ use std::{fs, path::Path, sync::Arc}; use acropolis_common::{BlockInfo, TxHash}; -use anyhow::Result; +use anyhow::{anyhow, Result}; use config::Config; use fjall::{Batch, Keyspace, Partition}; -use crate::stores::{Block, ExtraBlockData}; +use crate::stores::{Block, ExtraBlockData, Tx}; pub struct FjallStore { keyspace: Keyspace, @@ -113,6 +113,19 @@ impl super::Store for FjallStore { fn get_latest_block(&self) -> Result> { self.blocks.get_latest() } + + fn get_tx_by_hash(&self, hash: &[u8]) -> Result> { + let Some(block_ref) = self.txs.get_by_hash(hash)? else { + return Ok(None); + }; + let Some(block) = self.blocks.get_by_hash(block_ref.block_hash.as_ref())? else { + return Err(anyhow!("Referenced block not found")); + }; + Ok(Some(Tx { + block, + index: block_ref.index as u64, + })) + } } struct FjallBlockStore { @@ -255,6 +268,13 @@ impl FjallTXStore { let bytes = minicbor::to_vec(block_ref).expect("infallible"); batch.insert(&self.txs, hash.as_ref(), bytes); } + + fn get_by_hash(&self, hash: &[u8]) -> Result> { + let Some(block_ref) = self.txs.get(hash)? else { + return Ok(None); + }; + Ok(minicbor::decode(&block_ref)?) + } } #[derive(minicbor::Decode, minicbor::Encode)] diff --git a/modules/chain_store/src/stores/mod.rs b/modules/chain_store/src/stores/mod.rs index df8a330b..8ecc9e7a 100644 --- a/modules/chain_store/src/stores/mod.rs +++ b/modules/chain_store/src/stores/mod.rs @@ -13,6 +13,7 @@ pub trait Store: Send + Sync { fn get_blocks_by_number_range(&self, min_number: u64, max_number: u64) -> Result>; fn get_block_by_epoch_slot(&self, epoch: u64, epoch_slot: u64) -> Result>; fn get_latest_block(&self) -> Result>; + fn get_tx_by_hash(&self, hash: &[u8]) -> Result>; } #[derive(Debug, PartialEq, Eq, minicbor::Decode, minicbor::Encode)] @@ -33,6 +34,11 @@ pub struct ExtraBlockData { pub timestamp: u64, } +pub struct Tx { + pub block: Block, + pub index: u64, +} + pub(crate) fn extract_tx_hashes(block: &[u8]) -> Result> { let block = pallas_traverse::MultiEraBlock::decode(block).context("could not decode block")?; Ok(block.txs().into_iter().map(|tx| TxHash::from(*tx.hash())).collect()) diff --git a/modules/historical_accounts_state/Cargo.toml b/modules/historical_accounts_state/Cargo.toml index 930c4713..8027ad47 100644 --- a/modules/historical_accounts_state/Cargo.toml +++ b/modules/historical_accounts_state/Cargo.toml @@ -15,7 +15,7 @@ caryatid_sdk = { workspace = true } anyhow = { workspace = true } config = { workspace = true } -minicbor = { version = "0.26.0", features = ["std", "derive"] } +minicbor = { workspace = true, features = ["std", "derive"] } hex = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } diff --git a/modules/historical_epochs_state/Cargo.toml b/modules/historical_epochs_state/Cargo.toml index d3d7dc57..60c4e853 100644 --- a/modules/historical_epochs_state/Cargo.toml +++ b/modules/historical_epochs_state/Cargo.toml @@ -15,7 +15,7 @@ caryatid_sdk = { workspace = true } anyhow = { workspace = true } config = { workspace = true } -minicbor = { version = "0.26.0", features = ["std", "derive"] } +minicbor = { workspace = true, features = ["std", "derive"] } tokio = { workspace = true } tracing = { workspace = true } fjall = "2.11.2" diff --git a/modules/rest_blockfrost/Cargo.toml b/modules/rest_blockfrost/Cargo.toml index 208aeb2f..208cc161 100644 --- a/modules/rest_blockfrost/Cargo.toml +++ b/modules/rest_blockfrost/Cargo.toml @@ -26,6 +26,7 @@ serde_json = { workspace = true } serde_with = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +acropolis_cardano = { version = "0.1.0", path = "../../cardano" } [lib] path = "src/rest_blockfrost.rs" diff --git a/modules/rest_blockfrost/src/handlers/mod.rs b/modules/rest_blockfrost/src/handlers/mod.rs index dd53137e..c8765128 100644 --- a/modules/rest_blockfrost/src/handlers/mod.rs +++ b/modules/rest_blockfrost/src/handlers/mod.rs @@ -5,3 +5,4 @@ pub mod blocks; pub mod epochs; pub mod governance; pub mod pools; +pub mod transactions; diff --git a/modules/rest_blockfrost/src/handlers/transactions.rs b/modules/rest_blockfrost/src/handlers/transactions.rs new file mode 100644 index 00000000..40640361 --- /dev/null +++ b/modules/rest_blockfrost/src/handlers/transactions.rs @@ -0,0 +1,629 @@ +//! REST handlers for Acropolis Blockfrost /txs endpoints +use acropolis_cardano::transaction::calculate_deposit; +use acropolis_common::rest_error::RESTError; +use acropolis_common::{ + messages::{Message, RESTResponse, StateQuery, StateQueryResponse}, + queries::{ + errors::QueryError, + parameters::{ParametersStateQuery, ParametersStateQueryResponse}, + transactions::{ + TransactionDelegationCertificate, TransactionInfo, TransactionMIR, + TransactionMetadataItem, TransactionOutputAmount, TransactionPoolRetirementCertificate, + TransactionPoolUpdateCertificate, TransactionStakeCertificate, TransactionWithdrawal, + TransactionsStateQuery, TransactionsStateQueryResponse, + }, + utils::{query_state, rest_query_state_async}, + }, + Lovelace, Metadata, Relay, TxHash, +}; +use caryatid_sdk::Context; +use hex::FromHex; +use serde::{ + ser::{Error, SerializeMap, SerializeSeq, SerializeStruct}, + Serialize, Serializer, +}; +use std::{ + net::{Ipv4Addr, Ipv6Addr}, + sync::Arc, +}; + +use crate::handlers_config::HandlersConfig; + +struct TxInfo(TransactionInfo, Lovelace, Lovelace); + +impl Serialize for TxInfo { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut state = serializer.serialize_struct("TxInfo", 22)?; + state.serialize_field("hash", &self.0.hash)?; + state.serialize_field("block", &self.0.block_hash)?; + state.serialize_field("block_height", &self.0.block_number)?; + state.serialize_field("block_time", &self.0.block_time)?; + state.serialize_field("slot", &self.0.slot)?; + state.serialize_field("index", &self.0.index)?; + state.serialize_field( + "output_amount", + &self.0.output_amounts.clone().into_iter().map(TxOutputAmount).collect::>(), + )?; + state.serialize_field("fees", &self.1.to_string())?; + state.serialize_field("deposit", &self.2.to_string())?; + state.serialize_field("size", &self.0.size)?; + state.serialize_field("invalid_before", &self.0.invalid_before)?; + state.serialize_field("invalid_hereafter", &self.0.invalid_after)?; + state.serialize_field("utxo_count", &self.0.utxo_count)?; + state.serialize_field("withdrawal_count", &self.0.withdrawal_count)?; + state.serialize_field("mir_cert_count", &self.0.mir_cert_count)?; + state.serialize_field("delegation_count", &self.0.delegation_count)?; + state.serialize_field("stake_cert_count", &self.0.stake_cert_count)?; + state.serialize_field("pool_update_count", &self.0.pool_update_count)?; + state.serialize_field("pool_retire_count", &self.0.pool_retire_count)?; + state.serialize_field("asset_mint_or_burn_count", &self.0.asset_mint_or_burn_count)?; + state.serialize_field("redeemer_count", &self.0.redeemer_count)?; + state.serialize_field("valid_contract", &self.0.valid_contract)?; + state.end() + } +} + +struct TxOutputAmount(TransactionOutputAmount); + +impl Serialize for TxOutputAmount { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut state = serializer.serialize_struct("TransactionOutputAmount", 2)?; + match &self.0 { + TransactionOutputAmount::Lovelace(lovelace) => { + state.serialize_field("unit", "lovelace")?; + state.serialize_field("quantity", &lovelace.to_string())?; + } + TransactionOutputAmount::Asset(asset) => { + state.serialize_field("unit", &asset.name)?; + state.serialize_field("quantity", &asset.amount.to_string())?; + } + } + state.end() + } +} + +/// Handle `/txs/{hash}` +pub async fn handle_transactions_blockfrost( + context: Arc>, + params: Vec, + handlers_config: Arc, +) -> Result { + let (tx_hash, param, param2) = match params.as_slice() { + [tx_hash] => (tx_hash, None, None), + [tx_hash, param] => (tx_hash, Some(param.as_str()), None), + [tx_hash, param, param2] => (tx_hash, Some(param.as_str()), Some(param2.as_str())), + _ => return Err(RESTError::BadRequest("Invalid parameters".to_string())), + }; + + let tx_hash = match TxHash::from_hex(tx_hash) { + Ok(hash) => hash, + Err(_) => { + return Err(RESTError::invalid_param( + "transaction", + "Invalid transaction hash", + )) + } + }; + + match param { + None => handle_transaction_query(context, tx_hash, handlers_config).await, + Some("utxo") => Ok(RESTResponse::with_text(501, "Not implemented")), + Some("stakes") => handle_transaction_stakes_query(context, tx_hash, handlers_config).await, + Some("delegations") => { + handle_transaction_delegations_query(context, tx_hash, handlers_config).await + } + Some("withdrawals") => { + handle_transaction_withdrawals_query(context, tx_hash, handlers_config).await + } + Some("mirs") => handle_transaction_mirs_query(context, tx_hash, handlers_config).await, + Some("pool_updates") => { + handle_transaction_pool_updates_query(context, tx_hash, handlers_config).await + } + Some("pool_retires") => { + handle_transaction_pool_retires_query(context, tx_hash, handlers_config).await + } + Some("metadata") => match param2 { + None => handle_transaction_metadata_query(context, tx_hash, handlers_config).await, + Some("cbor") => Ok(RESTResponse::with_text(501, "Not implemented")), + _ => Ok(RESTResponse::with_text(400, "Invalid parameters")), + }, + Some("redeemers") => Ok(RESTResponse::with_text(501, "Not implemented")), + Some("required_signers") => Ok(RESTResponse::with_text(501, "Not implemented")), + Some("cbor") => Ok(RESTResponse::with_text(501, "Not implemented")), + _ => Ok(RESTResponse::with_text(400, "Invalid parameters")), + } +} + +/// Handle `/txs/{hash}` +async fn handle_transaction_query( + context: Arc>, + tx_hash: TxHash, + handlers_config: Arc, +) -> Result { + let txs_info_msg = Arc::new(Message::StateQuery(StateQuery::Transactions( + TransactionsStateQuery::GetTransactionInfo { tx_hash }, + ))); + rest_query_state_async( + &context.clone(), + &handlers_config.transactions_query_topic.clone(), + txs_info_msg, + async move |message| match message { + Message::StateQueryResponse(StateQueryResponse::Transactions( + TransactionsStateQueryResponse::TransactionInfo(txs_info), + )) => { + let params_msg = Arc::new(Message::StateQuery(StateQuery::Parameters( + ParametersStateQuery::GetEpochParameters { + epoch_number: txs_info.epoch, + }, + ))); + let params = match query_state( + &context, + &handlers_config.parameters_query_topic, + params_msg, + |message| match message { + Message::StateQueryResponse(StateQueryResponse::Parameters( + ParametersStateQueryResponse::EpochParameters(params), + )) => Ok(params), + Message::StateQueryResponse(StateQueryResponse::Parameters( + ParametersStateQueryResponse::Error(e), + )) => Err(e), + _ => Err(QueryError::internal_error("Unexpected response")), + }, + ) + .await + { + Ok(params) => params, + Err(e) => return Some(Err(e)), + }; + // TODO: calc from outputs and inputs if recorded_fee is None + let fee = txs_info.recorded_fee.unwrap_or_default(); + let deposit = match calculate_deposit( + txs_info.pool_update_count, + txs_info.stake_cert_count, + ¶ms, + ) { + Ok(deposit) => deposit, + Err(e) => return Some(Err(QueryError::internal_error(e.to_string()))), + }; + Some(Ok(TxInfo(txs_info, fee, deposit))) + } + Message::StateQueryResponse(StateQueryResponse::Transactions( + TransactionsStateQueryResponse::Error(e), + )) => Some(Err(e)), + _ => None, + }, + ) + .await +} + +struct TxStake(TransactionStakeCertificate); + +impl Serialize for TxStake { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let Ok(address) = self.0.address.to_string() else { + return Err(S::Error::custom("Can't stringify address")); + }; + let mut state = serializer.serialize_struct("TxStake", 3)?; + state.serialize_field("cert_index", &self.0.index)?; + state.serialize_field("address", &address)?; + state.serialize_field("registration", &self.0.registration)?; + state.end() + } +} + +/// Handle `/txs/{hash}/stakes` +async fn handle_transaction_stakes_query( + context: Arc>, + tx_hash: TxHash, + handlers_config: Arc, +) -> Result { + let txs_info_msg = Arc::new(Message::StateQuery(StateQuery::Transactions( + TransactionsStateQuery::GetTransactionStakeCertificates { tx_hash }, + ))); + rest_query_state_async( + &context.clone(), + &handlers_config.transactions_query_topic.clone(), + txs_info_msg, + async move |message| match message { + Message::StateQueryResponse(StateQueryResponse::Transactions( + TransactionsStateQueryResponse::TransactionStakeCertificates(stakes), + )) => Some(Ok(Some( + stakes.certificates.into_iter().map(TxStake).collect::>(), + ))), + Message::StateQueryResponse(StateQueryResponse::Transactions( + TransactionsStateQueryResponse::Error(e), + )) => Some(Err(e)), + _ => None, + }, + ) + .await +} + +struct TxDelegation(TransactionDelegationCertificate); + +impl Serialize for TxDelegation { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let Ok(address) = self.0.address.to_string() else { + return Err(S::Error::custom("Can't stringify address")); + }; + let mut state = serializer.serialize_struct("TxDelegation", 4)?; + state.serialize_field("cert_index", &self.0.index)?; + state.serialize_field("address", &address)?; + state.serialize_field("pool_id", &self.0.pool_id.to_string())?; + state.serialize_field("active_epoch", &self.0.active_epoch)?; + state.end() + } +} + +/// Handle `/txs/{hash}/delegations` +async fn handle_transaction_delegations_query( + context: Arc>, + tx_hash: TxHash, + handlers_config: Arc, +) -> Result { + let txs_info_msg = Arc::new(Message::StateQuery(StateQuery::Transactions( + TransactionsStateQuery::GetTransactionDelegationCertificates { tx_hash }, + ))); + rest_query_state_async( + &context.clone(), + &handlers_config.transactions_query_topic.clone(), + txs_info_msg, + async move |message| match message { + Message::StateQueryResponse(StateQueryResponse::Transactions( + TransactionsStateQueryResponse::TransactionDelegationCertificates(delegations), + )) => Some(Ok(Some( + delegations.certificates.into_iter().map(TxDelegation).collect::>(), + ))), + Message::StateQueryResponse(StateQueryResponse::Transactions( + TransactionsStateQueryResponse::Error(e), + )) => Some(Err(e)), + _ => None, + }, + ) + .await +} + +struct TxWithdrawal(TransactionWithdrawal); + +impl Serialize for TxWithdrawal { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let Ok(address) = self.0.address.to_string() else { + return Err(S::Error::custom("Can't stringify address")); + }; + let mut state = serializer.serialize_struct("TxWithdrawal", 2)?; + state.serialize_field("address", &address)?; + state.serialize_field("amount", &self.0.amount.to_string())?; + state.end() + } +} + +/// Handle `/txs/{hash}/withdrawals` +async fn handle_transaction_withdrawals_query( + context: Arc>, + tx_hash: TxHash, + handlers_config: Arc, +) -> Result { + let txs_info_msg = Arc::new(Message::StateQuery(StateQuery::Transactions( + TransactionsStateQuery::GetTransactionWithdrawals { tx_hash }, + ))); + rest_query_state_async( + &context.clone(), + &handlers_config.transactions_query_topic.clone(), + txs_info_msg, + async move |message| match message { + Message::StateQueryResponse(StateQueryResponse::Transactions( + TransactionsStateQueryResponse::TransactionWithdrawals(withdrawals), + )) => Some(Ok(Some( + withdrawals.withdrawals.into_iter().map(TxWithdrawal).collect::>(), + ))), + Message::StateQueryResponse(StateQueryResponse::Transactions( + TransactionsStateQueryResponse::Error(e), + )) => Some(Err(e)), + _ => None, + }, + ) + .await +} + +struct TxMIR(TransactionMIR); + +impl Serialize for TxMIR { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let Ok(address) = self.0.address.to_string() else { + return Err(S::Error::custom("Can't stringify address")); + }; + let mut state = serializer.serialize_struct("TxMIR", 4)?; + state.serialize_field("pot", &self.0.pot.to_string().to_lowercase())?; + state.serialize_field("cert_index", &self.0.cert_index)?; + state.serialize_field("address", &address)?; + state.serialize_field("amount", &self.0.amount.to_string())?; + state.end() + } +} + +/// Handle `/txs/{hash}/mirs` +async fn handle_transaction_mirs_query( + context: Arc>, + tx_hash: TxHash, + handlers_config: Arc, +) -> Result { + let txs_info_msg = Arc::new(Message::StateQuery(StateQuery::Transactions( + TransactionsStateQuery::GetTransactionMIRs { tx_hash }, + ))); + rest_query_state_async( + &context.clone(), + &handlers_config.transactions_query_topic.clone(), + txs_info_msg, + async move |message| match message { + Message::StateQueryResponse(StateQueryResponse::Transactions( + TransactionsStateQueryResponse::TransactionMIRs(mirs), + )) => Some(Ok(Some( + mirs.mirs.into_iter().map(TxMIR).collect::>(), + ))), + Message::StateQueryResponse(StateQueryResponse::Transactions( + TransactionsStateQueryResponse::Error(e), + )) => Some(Err(e)), + _ => None, + }, + ) + .await +} + +struct TxRelay(Relay); + +impl Serialize for TxRelay { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + match &self.0 { + Relay::SingleHostAddr(addr) => { + let mut state = serializer.serialize_struct("TxRelay", 5)?; + state.serialize_field("ipv4", &addr.ipv4)?; + state.serialize_field("ipv6", &addr.ipv6)?; + state.serialize_field("dns", &None::)?; + state.serialize_field("dns_srv", &None::)?; + state.serialize_field("port", &addr.port)?; + state.end() + } + Relay::SingleHostName(name) => { + let mut state = serializer.serialize_struct("TxRelay", 5)?; + state.serialize_field("ipv4", &None::)?; + state.serialize_field("ipv6", &None::)?; + state.serialize_field("dns", &name.dns_name)?; + state.serialize_field("dns_srv", &None::)?; + state.serialize_field("port", &name.port)?; + state.end() + } + Relay::MultiHostName(name) => { + let mut state = serializer.serialize_struct("TxRelay", 5)?; + state.serialize_field("ipv4", &None::)?; + state.serialize_field("ipv6", &None::)?; + state.serialize_field("dns", &name.dns_name)?; + state.serialize_field("dns_srv", &None::)?; + state.serialize_field("port", &None::)?; + state.end() + } + } + } +} + +struct TxPoolUpdateCertificate(TransactionPoolUpdateCertificate); + +impl Serialize for TxPoolUpdateCertificate { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let Ok(reward_account) = self.0.pool_reg.reward_account.to_string() else { + return Err(S::Error::custom("Can't stringify reward account")); + }; + let mut state = serializer.serialize_struct("TxPoolUpdateCertificate", 11)?; + state.serialize_field("cert_index", &self.0.cert_index)?; + state.serialize_field("pool_id", &self.0.pool_reg.operator.to_string())?; + state.serialize_field("vrf_key", &self.0.pool_reg.vrf_key_hash.to_string())?; + state.serialize_field("pledge", &self.0.pool_reg.pledge.to_string())?; + state.serialize_field("margin_cost", &self.0.pool_reg.margin.to_f64())?; + state.serialize_field("fixed_cost", &self.0.pool_reg.cost.to_string())?; + state.serialize_field("reward_account", &reward_account)?; + state.serialize_field( + "owners", + &self + .0 + .pool_reg + .pool_owners + .iter() + .map(|o| o.to_string().unwrap_or("bad address".to_string())) + .collect::>(), + )?; + state.serialize_field("metadata", &self.0.pool_reg.pool_metadata)?; + state.serialize_field( + "relays", + &self.0.pool_reg.relays.clone().into_iter().map(TxRelay).collect::>(), + )?; + state.serialize_field("active_epoch", &self.0.active_epoch)?; + state.end() + } +} + +/// Handle `/txs/{hash}/pool_updates` +async fn handle_transaction_pool_updates_query( + context: Arc>, + tx_hash: TxHash, + handlers_config: Arc, +) -> Result { + let txs_info_msg = Arc::new(Message::StateQuery(StateQuery::Transactions( + TransactionsStateQuery::GetTransactionPoolUpdateCertificates { tx_hash }, + ))); + rest_query_state_async( + &context.clone(), + &handlers_config.transactions_query_topic.clone(), + txs_info_msg, + async move |message| match message { + Message::StateQueryResponse(StateQueryResponse::Transactions( + TransactionsStateQueryResponse::TransactionPoolUpdateCertificates(pool_updates), + )) => Some(Ok(Some( + pool_updates + .pool_updates + .into_iter() + .map(TxPoolUpdateCertificate) + .collect::>(), + ))), + Message::StateQueryResponse(StateQueryResponse::Transactions( + TransactionsStateQueryResponse::Error(e), + )) => Some(Err(e)), + _ => None, + }, + ) + .await +} + +struct TxPoolRetirementCertificate(TransactionPoolRetirementCertificate); + +impl Serialize for TxPoolRetirementCertificate { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut state = serializer.serialize_struct("TxPoolUpdateCertificate", 3)?; + state.serialize_field("cert_index", &self.0.cert_index)?; + state.serialize_field("pool_id", &self.0.pool_id.to_string())?; + state.serialize_field("retiring_epoch", &self.0.retirement_epoch)?; + state.end() + } +} + +/// Handle `/txs/{hash}/pool_retires` +async fn handle_transaction_pool_retires_query( + context: Arc>, + tx_hash: TxHash, + handlers_config: Arc, +) -> Result { + let txs_info_msg = Arc::new(Message::StateQuery(StateQuery::Transactions( + TransactionsStateQuery::GetTransactionPoolRetirementCertificates { tx_hash }, + ))); + rest_query_state_async( + &context.clone(), + &handlers_config.transactions_query_topic.clone(), + txs_info_msg, + async move |message| match message { + Message::StateQueryResponse(StateQueryResponse::Transactions( + TransactionsStateQueryResponse::TransactionPoolRetirementCertificates( + pool_retirements, + ), + )) => Some(Ok(Some( + pool_retirements + .pool_retirements + .into_iter() + .map(TxPoolRetirementCertificate) + .collect::>(), + ))), + Message::StateQueryResponse(StateQueryResponse::Transactions( + TransactionsStateQueryResponse::Error(e), + )) => Some(Err(e)), + _ => None, + }, + ) + .await +} + +struct TxMetadata(Metadata); + +impl Serialize for TxMetadata { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + match &self.0 { + Metadata::Int(i) => i.serialize(serializer), + Metadata::Bytes(b) => { + let h = hex::encode(b); + serializer.serialize_str(&h) + } + Metadata::Text(s) => s.serialize(serializer), + Metadata::Array(a) => { + let mut state = serializer.serialize_seq(Some(a.len()))?; + for i in a { + state.serialize_element(&TxMetadata(i.clone()))?; + } + state.end() + } + Metadata::Map(m) => { + let mut state = serializer.serialize_map(Some(m.len()))?; + for (k, v) in m { + match k { + Metadata::Int(i) => { + state.serialize_entry(&i.to_string(), &TxMetadata(v.clone()))? + } + Metadata::Bytes(b) => { + state.serialize_entry(&hex::encode(b), &TxMetadata(v.clone()))? + } + Metadata::Text(s) => state.serialize_entry(&s, &TxMetadata(v.clone()))?, + _ => return Err(S::Error::custom("Invalid key type in map")), + } + } + state.end() + } + } + } +} + +struct TxMetadataItem(TransactionMetadataItem); + +impl Serialize for TxMetadataItem { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut state = serializer.serialize_struct("TxPoolUpdateCertificate", 2)?; + state.serialize_field("label", &self.0.label)?; + state.serialize_field("json_metadata", &TxMetadata(self.0.json_metadata.clone()))?; + state.end() + } +} + +/// Handle `/txs/{hash}/metadata` +async fn handle_transaction_metadata_query( + context: Arc>, + tx_hash: TxHash, + handlers_config: Arc, +) -> Result { + let txs_info_msg = Arc::new(Message::StateQuery(StateQuery::Transactions( + TransactionsStateQuery::GetTransactionMetadata { tx_hash }, + ))); + rest_query_state_async( + &context.clone(), + &handlers_config.transactions_query_topic.clone(), + txs_info_msg, + async move |message| match message { + Message::StateQueryResponse(StateQueryResponse::Transactions( + TransactionsStateQueryResponse::TransactionMetadata(metadata), + )) => Some(Ok(Some( + metadata.metadata.into_iter().map(TxMetadataItem).collect::>(), + ))), + Message::StateQueryResponse(StateQueryResponse::Transactions( + TransactionsStateQueryResponse::Error(e), + )) => Some(Err(e)), + _ => None, + }, + ) + .await +} diff --git a/modules/rest_blockfrost/src/handlers_config.rs b/modules/rest_blockfrost/src/handlers_config.rs index 4a49c715..d4a8d8b9 100644 --- a/modules/rest_blockfrost/src/handlers_config.rs +++ b/modules/rest_blockfrost/src/handlers_config.rs @@ -10,6 +10,7 @@ use acropolis_common::queries::{ parameters::DEFAULT_PARAMETERS_QUERY_TOPIC, pools::DEFAULT_POOLS_QUERY_TOPIC, spdd::DEFAULT_SPDD_QUERY_TOPIC, + transactions::DEFAULT_TRANSACTIONS_QUERY_TOPIC, utxos::DEFAULT_UTXOS_QUERY_TOPIC, }; use config::Config; @@ -29,6 +30,7 @@ pub struct HandlersConfig { pub epochs_query_topic: String, pub historical_epochs_query_topic: String, pub spdd_query_topic: String, + pub transactions_query_topic: String, pub parameters_query_topic: String, pub utxos_query_topic: String, pub external_api_timeout: u64, @@ -81,6 +83,10 @@ impl From> for HandlersConfig { .get_string(DEFAULT_PARAMETERS_QUERY_TOPIC.0) .unwrap_or(DEFAULT_PARAMETERS_QUERY_TOPIC.1.to_string()); + let transactions_query_topic = config + .get_string(DEFAULT_TRANSACTIONS_QUERY_TOPIC.0) + .unwrap_or(DEFAULT_TRANSACTIONS_QUERY_TOPIC.1.to_string()); + let utxos_query_topic = config .get_string(DEFAULT_UTXOS_QUERY_TOPIC.0) .unwrap_or(DEFAULT_UTXOS_QUERY_TOPIC.1.to_string()); @@ -109,6 +115,7 @@ impl From> for HandlersConfig { epochs_query_topic, historical_epochs_query_topic, spdd_query_topic, + transactions_query_topic, parameters_query_topic, utxos_query_topic, external_api_timeout, diff --git a/modules/rest_blockfrost/src/rest_blockfrost.rs b/modules/rest_blockfrost/src/rest_blockfrost.rs index ffa718eb..7a3cadf5 100644 --- a/modules/rest_blockfrost/src/rest_blockfrost.rs +++ b/modules/rest_blockfrost/src/rest_blockfrost.rs @@ -64,6 +64,7 @@ use handlers::{ handle_pool_votes_blockfrost, handle_pools_extended_retired_retiring_single_blockfrost, handle_pools_list_blockfrost, }, + transactions::handle_transactions_blockfrost, }; use crate::handlers_config::HandlersConfig; @@ -227,6 +228,15 @@ const DEFAULT_HANDLE_EPOCH_POOL_BLOCKS_TOPIC: (&str, &str) = ( "rest.get.epochs.*.blocks.*", ); +// Transactions topics +const DEFAULT_HANDLE_TRANSACTIONS_TOPIC: (&str, &str) = ("handle-transactions", "rest.get.txs.*"); +const DEFAULT_HANDLE_TRANSACTIONS_SUB_TOPIC: (&str, &str) = + ("handle-transactions-sub", "rest.get.txs.*.*"); +const DEFAULT_HANDLE_TRANSACTIONS_METADATA_SUB_TOPIC: (&str, &str) = ( + "handle-transactions-metadata-sub", + "rest.get.txs.metadata.*", +); + // Assets topics const DEFAULT_HANDLE_ASSETS_LIST_TOPIC: (&str, &str) = ("handle-topic-assets-list", "rest.get.assets"); @@ -753,6 +763,30 @@ impl BlockfrostREST { handle_address_transactions_blockfrost, ); + // Handler for /txs/{hash} + register_handler( + context.clone(), + DEFAULT_HANDLE_TRANSACTIONS_TOPIC, + handlers_config.clone(), + handle_transactions_blockfrost, + ); + + // Handler for /txs/{hash}/* + register_handler( + context.clone(), + DEFAULT_HANDLE_TRANSACTIONS_SUB_TOPIC, + handlers_config.clone(), + handle_transactions_blockfrost, + ); + + // Handler for /txs/{hash}/*/* + register_handler( + context.clone(), + DEFAULT_HANDLE_TRANSACTIONS_METADATA_SUB_TOPIC, + handlers_config.clone(), + handle_transactions_blockfrost, + ); + Ok(()) } } diff --git a/modules/rest_blockfrost/src/types.rs b/modules/rest_blockfrost/src/types.rs index 405a798c..3ddf1d04 100644 --- a/modules/rest_blockfrost/src/types.rs +++ b/modules/rest_blockfrost/src/types.rs @@ -375,14 +375,8 @@ impl From for PoolRelayRest { match value { Relay::SingleHostAddr(s) => PoolRelayRest { - ipv4: s.ipv4.map(|bytes| { - let ipv4_addr = std::net::Ipv4Addr::from(bytes); - format!("{:?}", ipv4_addr) - }), - ipv6: s.ipv6.map(|bytes| { - let ipv6_addr = std::net::Ipv6Addr::from(bytes); - format!("{:?}", ipv6_addr) - }), + ipv4: s.ipv4.map(|addr| format!("{:?}", addr)), + ipv6: s.ipv6.map(|addr| format!("{:?}", addr)), dns: None, dns_srv: None, port: s.port.unwrap_or(default_port),