diff --git a/catalyst-gateway/bin/Cargo.toml b/catalyst-gateway/bin/Cargo.toml index af323c89548b..305e7fc5aada 100644 --- a/catalyst-gateway/bin/Cargo.toml +++ b/catalyst-gateway/bin/Cargo.toml @@ -64,7 +64,6 @@ num-bigint = "0.4.6" futures = "0.3.31" rand = "0.8.5" moka = { version = "0.12.8", features = ["future"] } -crossbeam-skiplist = "0.1.3" poem = { version = "=3.1.6", features = ["embed", "prometheus", "compression"] } poem-openapi = { version = "=5.1.5", features = [ "openapi-explorer", diff --git a/catalyst-gateway/bin/src/db/index/block/certs.rs b/catalyst-gateway/bin/src/db/index/block/certs.rs index d29614e8eeef..57a5852ee42a 100644 --- a/catalyst-gateway/bin/src/db/index/block/certs.rs +++ b/catalyst-gateway/bin/src/db/index/block/certs.rs @@ -1,6 +1,6 @@ //! Index certs found in a transaction. -use std::{fmt::Debug, sync::Arc}; +use std::{fmt, sync::Arc}; use cardano_blockchain_types::{MultiEraBlock, Slot, StakeAddress, TxnIndex, VKeyHash}; use ed25519_dalek::VerifyingKey; @@ -16,6 +16,7 @@ use crate::{ }, types::{DbPublicKey, DbSlot, DbStakeAddress, DbTxnIndex}, }, + impl_query_batch, settings::cassandra_db, }; @@ -42,8 +43,8 @@ pub(crate) struct StakeRegistrationInsertQuery { pool_delegation: MaybeUnset>, } -impl Debug for StakeRegistrationInsertQuery { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { +impl fmt::Debug for StakeRegistrationInsertQuery { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let stake_public_key = hex::encode(self.stake_public_key.as_ref()); let register = match self.register { MaybeUnset::Unset => "UNSET", @@ -79,6 +80,11 @@ impl Debug for StakeRegistrationInsertQuery { /// Insert stake registration const INSERT_STAKE_REGISTRATION_QUERY: &str = include_str!("./cql/insert_stake_registration.cql"); +impl_query_batch!( + StakeRegistrationInsertQuery, + INSERT_STAKE_REGISTRATION_QUERY +); + impl StakeRegistrationInsertQuery { /// Create a new Insert Query. #[allow(clippy::too_many_arguments, clippy::fn_params_excessive_bools)] diff --git a/catalyst-gateway/bin/src/db/index/block/cip36/insert_cip36.rs b/catalyst-gateway/bin/src/db/index/block/cip36/insert_cip36.rs index 86ab060955b7..7d94df57c162 100644 --- a/catalyst-gateway/bin/src/db/index/block/cip36/insert_cip36.rs +++ b/catalyst-gateway/bin/src/db/index/block/cip36/insert_cip36.rs @@ -1,6 +1,6 @@ //! Insert CIP36 Registration Query -use std::{fmt::Debug, sync::Arc}; +use std::{fmt, sync::Arc}; use cardano_blockchain_types::{Cip36, Slot, TxnIndex, VotingPubKey}; use scylla::{frame::value::MaybeUnset, SerializeRow, Session}; @@ -11,6 +11,7 @@ use crate::{ index::queries::{PreparedQueries, SizedBatch}, types::{DbSlot, DbTxnIndex}, }, + impl_query_batch, settings::cassandra_db, }; @@ -19,7 +20,7 @@ const INSERT_CIP36_REGISTRATION_QUERY: &str = include_str!("./cql/insert_cip36.c /// Insert CIP-36 Registration Query Parameters #[derive(SerializeRow, Clone)] -pub(crate) struct Params { +pub(crate) struct Cip36Insert { /// Full Stake Address (not hashed, 32 byte ED25519 Public key). stake_public_key: Vec, /// Nonce value after normalization. @@ -40,13 +41,15 @@ pub(crate) struct Params { cip36: bool, } -impl Debug for Params { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl_query_batch!(Cip36Insert, INSERT_CIP36_REGISTRATION_QUERY); + +impl fmt::Debug for Cip36Insert { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let payment_address = match self.payment_address { MaybeUnset::Unset => "UNSET", MaybeUnset::Set(ref v) => &hex::encode(v), }; - f.debug_struct("Params") + f.debug_struct("Cip36Insert") .field("stake_public_key", &self.stake_public_key) .field("nonce", &self.nonce) .field("slot_no", &self.slot_no) @@ -60,7 +63,7 @@ impl Debug for Params { } } -impl Params { +impl Cip36Insert { /// Create a new Insert Query. pub fn new(vote_key: &VotingPubKey, slot_no: Slot, txn_index: TxnIndex, cip36: &Cip36) -> Self { let stake_public_key = cip36 @@ -73,7 +76,7 @@ impl Params { .payment_address() .map_or(MaybeUnset::Unset, |a| MaybeUnset::Set(a.to_vec())); let is_cip36 = cip36.is_cip36().unwrap_or_default(); - Params { + Cip36Insert { stake_public_key, nonce: cip36.nonce().unwrap_or_default().into(), slot_no: slot_no.into(), diff --git a/catalyst-gateway/bin/src/db/index/block/cip36/insert_cip36_for_vote_key.rs b/catalyst-gateway/bin/src/db/index/block/cip36/insert_cip36_for_vote_key.rs index 2c80e8e3c2ba..a706f5270a7a 100644 --- a/catalyst-gateway/bin/src/db/index/block/cip36/insert_cip36_for_vote_key.rs +++ b/catalyst-gateway/bin/src/db/index/block/cip36/insert_cip36_for_vote_key.rs @@ -11,6 +11,7 @@ use crate::{ index::queries::{PreparedQueries, SizedBatch}, types::{DbSlot, DbTxnIndex}, }, + impl_query_batch, settings::cassandra_db, }; @@ -20,7 +21,7 @@ const INSERT_CIP36_REGISTRATION_FOR_VOTE_KEY_QUERY: &str = /// Insert CIP-36 Registration Invalid Query Parameters #[derive(SerializeRow, Debug)] -pub(crate) struct Params { +pub(crate) struct Cip36ForVoteKeyInsert { /// Voting Public Key vote_key: Vec, /// Full Stake Address (not hashed, 32 byte ED25519 Public key). @@ -33,12 +34,17 @@ pub(crate) struct Params { valid: bool, } -impl Params { +impl_query_batch!( + Cip36ForVoteKeyInsert, + INSERT_CIP36_REGISTRATION_FOR_VOTE_KEY_QUERY +); + +impl Cip36ForVoteKeyInsert { /// Create a new Insert Query. pub fn new( vote_key: &VotingPubKey, slot_no: Slot, txn_index: TxnIndex, cip36: &Cip36, valid: bool, ) -> Self { - Params { + Cip36ForVoteKeyInsert { vote_key: vote_key .voting_pk() .map(|k| k.to_bytes().to_vec()) diff --git a/catalyst-gateway/bin/src/db/index/block/cip36/insert_cip36_invalid.rs b/catalyst-gateway/bin/src/db/index/block/cip36/insert_cip36_invalid.rs index c14dc31d100b..ed949029d4b1 100644 --- a/catalyst-gateway/bin/src/db/index/block/cip36/insert_cip36_invalid.rs +++ b/catalyst-gateway/bin/src/db/index/block/cip36/insert_cip36_invalid.rs @@ -1,6 +1,6 @@ //! Insert CIP36 Registration Query (Invalid Records) -use std::{fmt::Debug, sync::Arc}; +use std::{fmt, sync::Arc}; use cardano_blockchain_types::{Cip36, Slot, TxnIndex, VotingPubKey}; use pallas::ledger::addresses::Address; @@ -12,6 +12,7 @@ use crate::{ index::queries::{PreparedQueries, SizedBatch}, types::{DbSlot, DbTxnIndex}, }, + impl_query_batch, settings::cassandra_db, }; @@ -21,7 +22,7 @@ const INSERT_CIP36_REGISTRATION_INVALID_QUERY: &str = /// Insert CIP-36 Registration Invalid Query Parameters #[derive(SerializeRow, Clone)] -pub(crate) struct Params { +pub(crate) struct Cip36InvalidInsert { /// Full Stake Address (not hashed, 32 byte ED25519 Public key). stake_public_key: Vec, /// Slot Number the cert is in. @@ -46,13 +47,15 @@ pub(crate) struct Params { problem_report: String, } -impl Debug for Params { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl_query_batch!(Cip36InvalidInsert, INSERT_CIP36_REGISTRATION_INVALID_QUERY); + +impl fmt::Debug for Cip36InvalidInsert { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let cip36 = match self.cip36 { MaybeUnset::Unset => "UNSET", MaybeUnset::Set(v) => &format!("{v:?}"), }; - f.debug_struct("Params") + f.debug_struct("Cip36InvalidInsert") .field("stake_public_key", &self.stake_public_key) .field("slot_no", &self.slot_no) .field("txn_index", &self.txn_index) @@ -68,7 +71,7 @@ impl Debug for Params { } } -impl Params { +impl Cip36InvalidInsert { /// Create a new Insert Query. pub fn new( vote_key: Option<&VotingPubKey>, slot_no: Slot, txn_index: TxnIndex, cip36: &Cip36, @@ -89,7 +92,7 @@ impl Params { String::new() }); - Params { + Cip36InvalidInsert { stake_public_key, slot_no: slot_no.into(), txn_index: txn_index.into(), diff --git a/catalyst-gateway/bin/src/db/index/block/cip36/mod.rs b/catalyst-gateway/bin/src/db/index/block/cip36/mod.rs index eaed97ab8fb0..dadb14a0e1b2 100644 --- a/catalyst-gateway/bin/src/db/index/block/cip36/mod.rs +++ b/catalyst-gateway/bin/src/db/index/block/cip36/mod.rs @@ -22,11 +22,11 @@ use crate::{ /// Insert CIP-36 Registration Queries pub(crate) struct Cip36InsertQuery { /// Stake Registration Data captured during indexing. - registrations: Vec, + registrations: Vec, /// Stake Registration Data captured during indexing. - invalid: Vec, + invalid: Vec, /// Stake Registration Data captured during indexing. - for_vote_key: Vec, + for_vote_key: Vec, /// Stake Registration Data captured during indexing. stake_regs: Vec, } @@ -46,11 +46,11 @@ impl Cip36InsertQuery { pub(crate) async fn prepare_batch( session: &Arc, cfg: &cassandra_db::EnvVars, ) -> anyhow::Result<(SizedBatch, SizedBatch, SizedBatch)> { - let insert_cip36_batch = insert_cip36::Params::prepare_batch(session, cfg).await; + let insert_cip36_batch = insert_cip36::Cip36Insert::prepare_batch(session, cfg).await; let insert_cip36_invalid_batch = - insert_cip36_invalid::Params::prepare_batch(session, cfg).await; + insert_cip36_invalid::Cip36InvalidInsert::prepare_batch(session, cfg).await; let insert_cip36_for_vote_key_addr_batch = - insert_cip36_for_vote_key::Params::prepare_batch(session, cfg).await; + insert_cip36_for_vote_key::Cip36ForVoteKeyInsert::prepare_batch(session, cfg).await; // Its a hack of inserting `stake_regs` during the indexing CIP 36 registrations. // Its done because some of the CIP 36 registrations contains some stake addresses which // are not actually some how registered using cardano certs. @@ -82,11 +82,11 @@ impl Cip36InsertQuery { let stake_pk_hash = Blake2b224Hash::new(&stake_pk.to_bytes()); let stake_address = StakeAddress::new(block.network(), false, stake_pk_hash); - self.registrations.push(insert_cip36::Params::new( + self.registrations.push(insert_cip36::Cip36Insert::new( voting_key, slot_no, index, &cip36, )); self.for_vote_key - .push(insert_cip36_for_vote_key::Params::new( + .push(insert_cip36_for_vote_key::Cip36ForVoteKeyInsert::new( voting_key, slot_no, index, &cip36, true, )); self.stake_regs @@ -107,21 +107,24 @@ impl Cip36InsertQuery { // Cannot index an invalid CIP36, if there is no stake public key. if let Some(stake_pk) = cip36.stake_pk() { if cip36.voting_pks().is_empty() { - self.invalid.push(insert_cip36_invalid::Params::new( - None, slot_no, index, &cip36, - )); + self.invalid + .push(insert_cip36_invalid::Cip36InvalidInsert::new( + None, slot_no, index, &cip36, + )); } else { for voting_key in cip36.voting_pks() { - self.invalid.push(insert_cip36_invalid::Params::new( - Some(voting_key), - slot_no, - index, - &cip36, - )); - self.for_vote_key - .push(insert_cip36_for_vote_key::Params::new( - voting_key, slot_no, index, &cip36, false, + self.invalid + .push(insert_cip36_invalid::Cip36InvalidInsert::new( + Some(voting_key), + slot_no, + index, + &cip36, )); + self.for_vote_key.push( + insert_cip36_for_vote_key::Cip36ForVoteKeyInsert::new( + voting_key, slot_no, index, &cip36, false, + ), + ); } } diff --git a/catalyst-gateway/bin/src/db/index/block/rbac509/insert_catalyst_id_for_stake_address.rs b/catalyst-gateway/bin/src/db/index/block/rbac509/insert_catalyst_id_for_stake_address.rs index c98e6bd091d6..f48d730625f4 100644 --- a/catalyst-gateway/bin/src/db/index/block/rbac509/insert_catalyst_id_for_stake_address.rs +++ b/catalyst-gateway/bin/src/db/index/block/rbac509/insert_catalyst_id_for_stake_address.rs @@ -12,6 +12,7 @@ use crate::{ index::queries::{PreparedQueries, SizedBatch}, types::{DbCatalystId, DbSlot, DbStakeAddress}, }, + impl_query_batch, settings::cassandra_db::EnvVars, }; @@ -20,7 +21,7 @@ const QUERY: &str = include_str!("cql/insert_catalyst_id_for_stake_address.cql") /// Insert Catalyst ID For Stake Address Query Parameters #[derive(SerializeRow)] -pub(crate) struct Params { +pub(crate) struct CatalystIdForStakeAddressInsert { /// A stake address. stake_address: DbStakeAddress, /// A Catalyst short identifier. @@ -29,9 +30,11 @@ pub(crate) struct Params { slot_no: DbSlot, } -impl Debug for Params { +impl_query_batch!(CatalystIdForStakeAddressInsert, QUERY); + +impl Debug for CatalystIdForStakeAddressInsert { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Params") + f.debug_struct("CatalystIdForStakeAddressInsert") .field("stake_address", &self.stake_address) .field("catalyst_id", &self.catalyst_id) .field("slot_no", &self.slot_no) @@ -39,10 +42,10 @@ impl Debug for Params { } } -impl Params { +impl CatalystIdForStakeAddressInsert { /// Create a new record for this transaction. pub(crate) fn new(stake_address: StakeAddress, slot_no: Slot, catalyst_id: CatalystId) -> Self { - Params { + CatalystIdForStakeAddressInsert { stake_address: stake_address.into(), catalyst_id: catalyst_id.into(), slot_no: slot_no.into(), diff --git a/catalyst-gateway/bin/src/db/index/block/rbac509/insert_catalyst_id_for_txn_id.rs b/catalyst-gateway/bin/src/db/index/block/rbac509/insert_catalyst_id_for_txn_id.rs index 2e72fa1c069d..fd2f024d3563 100644 --- a/catalyst-gateway/bin/src/db/index/block/rbac509/insert_catalyst_id_for_txn_id.rs +++ b/catalyst-gateway/bin/src/db/index/block/rbac509/insert_catalyst_id_for_txn_id.rs @@ -12,6 +12,7 @@ use crate::{ index::queries::{PreparedQueries, SizedBatch}, types::{DbCatalystId, DbTransactionId}, }, + impl_query_batch, settings::cassandra_db::EnvVars, }; @@ -20,26 +21,28 @@ const QUERY: &str = include_str!("cql/insert_catalyst_id_for_txn_id.cql"); /// Insert Catalyst ID For Transaction ID Query Parameters #[derive(SerializeRow)] -pub(crate) struct Params { +pub(crate) struct CatalystIdForTxnIdInsert { /// A transaction hash. txn_id: DbTransactionId, /// A Catalyst short identifier. catalyst_id: DbCatalystId, } -impl Debug for Params { +impl_query_batch!(CatalystIdForTxnIdInsert, QUERY); + +impl Debug for CatalystIdForTxnIdInsert { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Params") + f.debug_struct("CatalystIdForTxnIdInsert") .field("txn_id", &self.txn_id) .field("catalyst_id", &self.catalyst_id) .finish() } } -impl Params { +impl CatalystIdForTxnIdInsert { /// Creates a new record for this transaction. pub(crate) fn new(catalyst_id: CatalystId, txn_id: TransactionId) -> Self { - Params { + CatalystIdForTxnIdInsert { txn_id: txn_id.into(), catalyst_id: catalyst_id.into(), } diff --git a/catalyst-gateway/bin/src/db/index/block/rbac509/insert_rbac509.rs b/catalyst-gateway/bin/src/db/index/block/rbac509/insert_rbac509.rs index cbeb184d5a0e..dfe88d90effa 100644 --- a/catalyst-gateway/bin/src/db/index/block/rbac509/insert_rbac509.rs +++ b/catalyst-gateway/bin/src/db/index/block/rbac509/insert_rbac509.rs @@ -1,6 +1,6 @@ //! Insert RBAC 509 Registration Query. -use std::{fmt::Debug, sync::Arc}; +use std::{fmt, sync::Arc}; use cardano_blockchain_types::{Slot, TransactionId, TxnIndex}; use catalyst_types::{catalyst_id::CatalystId, uuid::UuidV4}; @@ -12,6 +12,7 @@ use crate::{ index::queries::{PreparedQueries, SizedBatch}, types::{DbCatalystId, DbSlot, DbTransactionId, DbTxnIndex, DbUuidV4}, }, + impl_query_batch, settings::cassandra_db::EnvVars, }; @@ -20,7 +21,7 @@ const QUERY: &str = include_str!("cql/insert_rbac509.cql"); /// Insert RBAC Registration Query Parameters #[derive(SerializeRow)] -pub(crate) struct Params { +pub(crate) struct Rbac509Insert { /// A Catalyst short identifier. catalyst_id: DbCatalystId, /// A transaction hash @@ -35,13 +36,15 @@ pub(crate) struct Params { purpose: DbUuidV4, } -impl Debug for Params { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl_query_batch!(Rbac509Insert, QUERY); + +impl fmt::Debug for Rbac509Insert { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let prv_txn_id = match self.prv_txn_id { MaybeUnset::Unset => "UNSET".to_owned(), MaybeUnset::Set(ref v) => format!("{v:?}"), }; - f.debug_struct("Params") + f.debug_struct("Rbac509Insert") .field("catalyst_id", &self.catalyst_id) .field("txn_id", &self.txn_id) .field("slot_no", &self.slot_no) @@ -52,7 +55,7 @@ impl Debug for Params { } } -impl Params { +impl Rbac509Insert { /// Create a new record for this transaction. pub(crate) fn new( catalyst_id: CatalystId, txn_id: TransactionId, slot_no: Slot, txn_index: TxnIndex, diff --git a/catalyst-gateway/bin/src/db/index/block/rbac509/insert_rbac509_invalid.rs b/catalyst-gateway/bin/src/db/index/block/rbac509/insert_rbac509_invalid.rs index 98d354968f49..fcac1d034be6 100644 --- a/catalyst-gateway/bin/src/db/index/block/rbac509/insert_rbac509_invalid.rs +++ b/catalyst-gateway/bin/src/db/index/block/rbac509/insert_rbac509_invalid.rs @@ -1,6 +1,6 @@ //! Insert invalid RBAC 509 Registration Query. -use std::{fmt::Debug, sync::Arc}; +use std::{fmt, sync::Arc}; use cardano_blockchain_types::{Slot, TransactionId, TxnIndex}; use catalyst_types::{catalyst_id::CatalystId, problem_report::ProblemReport, uuid::UuidV4}; @@ -12,6 +12,7 @@ use crate::{ index::queries::{PreparedQueries, SizedBatch}, types::{DbCatalystId, DbSlot, DbTransactionId, DbTxnIndex, DbUuidV4}, }, + impl_query_batch, settings::cassandra_db::EnvVars, }; @@ -20,7 +21,7 @@ const QUERY: &str = include_str!("cql/insert_rbac509_invalid.cql"); /// Insert an invalid RBAC registration query parameters. #[derive(SerializeRow)] -pub(crate) struct Params { +pub(crate) struct Rbac509InvalidInsert { /// A Catalyst short identifier. catalyst_id: DbCatalystId, /// A transaction hash of this registration. @@ -37,7 +38,9 @@ pub(crate) struct Params { problem_report: String, } -impl Params { +impl_query_batch!(Rbac509InvalidInsert, QUERY); + +impl Rbac509InvalidInsert { /// Create a new record for this transaction. pub(crate) fn new( catalyst_id: CatalystId, txn_id: TransactionId, slot_no: Slot, txn_index: TxnIndex, @@ -81,8 +84,8 @@ impl Params { } } -impl Debug for Params { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl fmt::Debug for Rbac509InvalidInsert { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let prv_txn_id = match self.prv_txn_id { MaybeUnset::Unset => "UNSET".to_owned(), MaybeUnset::Set(v) => format!("{v}"), @@ -91,7 +94,7 @@ impl Debug for Params { MaybeUnset::Unset => "UNSET".to_owned(), MaybeUnset::Set(v) => format!("{}", UuidV4::from(v)), }; - f.debug_struct("Params") + f.debug_struct("Rbac509InvalidInsert") .field("catalyst_id", &self.catalyst_id) .field("txn_id", &self.txn_id) .field("slot_no", &self.slot_no) diff --git a/catalyst-gateway/bin/src/db/index/block/rbac509/mod.rs b/catalyst-gateway/bin/src/db/index/block/rbac509/mod.rs index e75fd16309d3..3cc11384be7c 100644 --- a/catalyst-gateway/bin/src/db/index/block/rbac509/mod.rs +++ b/catalyst-gateway/bin/src/db/index/block/rbac509/mod.rs @@ -29,13 +29,14 @@ use crate::{ #[derive(Debug)] pub(crate) struct Rbac509InsertQuery { /// RBAC Registration Data captured during indexing. - pub(crate) registrations: Vec, + pub(crate) registrations: Vec, /// An invalid RBAC registration data. - pub(crate) invalid: Vec, + pub(crate) invalid: Vec, /// A Catalyst ID for transaction ID Data captured during indexing. - pub(crate) catalyst_id_for_txn_id: Vec, + pub(crate) catalyst_id_for_txn_id: Vec, /// A Catalyst ID for stake address data captured during indexing. - pub(crate) catalyst_id_for_stake_address: Vec, + pub(crate) catalyst_id_for_stake_address: + Vec, } impl Rbac509InsertQuery { @@ -54,10 +55,14 @@ impl Rbac509InsertQuery { session: &Arc, cfg: &EnvVars, ) -> anyhow::Result<(SizedBatch, SizedBatch, SizedBatch, SizedBatch)> { Ok(( - insert_rbac509::Params::prepare_batch(session, cfg).await?, - insert_rbac509_invalid::Params::prepare_batch(session, cfg).await?, - insert_catalyst_id_for_txn_id::Params::prepare_batch(session, cfg).await?, - insert_catalyst_id_for_stake_address::Params::prepare_batch(session, cfg).await?, + insert_rbac509::Rbac509Insert::prepare_batch(session, cfg).await?, + insert_rbac509_invalid::Rbac509InvalidInsert::prepare_batch(session, cfg).await?, + insert_catalyst_id_for_txn_id::CatalystIdForTxnIdInsert::prepare_batch(session, cfg) + .await?, + insert_catalyst_id_for_stake_address::CatalystIdForStakeAddressInsert::prepare_batch( + session, cfg, + ) + .await?, )) } @@ -121,7 +126,7 @@ impl Rbac509InsertQuery { let purpose = cip509.purpose(); match cip509.consume() { Ok((purpose, metadata, _)) => { - self.registrations.push(insert_rbac509::Params::new( + self.registrations.push(insert_rbac509::Rbac509Insert::new( catalyst_id.clone(), txn_hash, slot, @@ -129,14 +134,15 @@ impl Rbac509InsertQuery { purpose, previous_transaction, )); - self.catalyst_id_for_txn_id - .push(insert_catalyst_id_for_txn_id::Params::new( + self.catalyst_id_for_txn_id.push( + insert_catalyst_id_for_txn_id::CatalystIdForTxnIdInsert::new( catalyst_id.clone(), txn_hash, - )); + ), + ); for address in stake_addresses(&metadata) { self.catalyst_id_for_stake_address.push( - insert_catalyst_id_for_stake_address::Params::new( + insert_catalyst_id_for_stake_address::CatalystIdForStakeAddressInsert::new( address, slot, catalyst_id.clone(), @@ -145,15 +151,16 @@ impl Rbac509InsertQuery { } }, Err(report) => { - self.invalid.push(insert_rbac509_invalid::Params::new( - catalyst_id, - txn_hash, - slot, - index, - purpose, - previous_transaction, - &report, - )); + self.invalid + .push(insert_rbac509_invalid::Rbac509InvalidInsert::new( + catalyst_id, + txn_hash, + slot, + index, + purpose, + previous_transaction, + &report, + )); }, } } @@ -236,9 +243,9 @@ async fn catalyst_id( async fn query_catalyst_id( session: &Arc, txn_id: TransactionId, is_immutable: bool, ) -> anyhow::Result { - use crate::db::index::queries::rbac::get_catalyst_id_from_transaction_id::Query; + use crate::db::index::queries::rbac::get_catalyst_id_from_transaction_id::GetCatalystIdForTxnId; - if let Some(q) = Query::get_latest(session, txn_id.into()) + if let Some(q) = GetCatalystIdForTxnId::get_latest(session, txn_id.into()) .await .context("Failed to query Catalyst ID from transaction ID")? { @@ -252,7 +259,7 @@ async fn query_catalyst_id( // persistent database. let persistent_session = CassandraSession::get(true).context("Failed to get persistent DB session")?; - Query::get_latest(&persistent_session, txn_id.into()) + GetCatalystIdForTxnId::get_latest(&persistent_session, txn_id.into()) .await .transpose() .context("Unable to find Catalyst ID in the persistent DB")? diff --git a/catalyst-gateway/bin/src/db/index/block/txi.rs b/catalyst-gateway/bin/src/db/index/block/txi.rs index 397a42bb26c0..47683aac7547 100644 --- a/catalyst-gateway/bin/src/db/index/block/txi.rs +++ b/catalyst-gateway/bin/src/db/index/block/txi.rs @@ -15,6 +15,7 @@ use crate::{ }, types::{DbSlot, DbTransactionId, DbTxnOutputOffset}, }, + impl_query_batch, settings::cassandra_db, }; @@ -49,6 +50,8 @@ pub(crate) struct TxiInsertQuery { /// TXI by Txn hash Index const INSERT_TXI_QUERY: &str = include_str!("./cql/insert_txi.cql"); +impl_query_batch!(TxiInsertQuery, INSERT_TXI_QUERY); + impl TxiInsertQuery { /// Create a new record for this transaction. pub(crate) fn new() -> Self { diff --git a/catalyst-gateway/bin/src/db/index/block/txo/insert_txo.rs b/catalyst-gateway/bin/src/db/index/block/txo/insert_txo.rs index 46b1bdceb965..12aca8a3bc76 100644 --- a/catalyst-gateway/bin/src/db/index/block/txo/insert_txo.rs +++ b/catalyst-gateway/bin/src/db/index/block/txo/insert_txo.rs @@ -13,6 +13,7 @@ use crate::{ index::queries::{PreparedQueries, SizedBatch}, types::{DbSlot, DbStakeAddress, DbTransactionId, DbTxnIndex, DbTxnOutputOffset}, }, + impl_query_batch, settings::cassandra_db, }; @@ -22,7 +23,7 @@ const INSERT_TXO_QUERY: &str = include_str!("./cql/insert_txo.cql"); /// Insert TXO Query Parameters /// (Superset of data to support both Staked and Unstaked TXO records.) #[derive(SerializeRow, Debug)] -pub(crate) struct Params { +pub(crate) struct TxoInsertQuery { /// Stake Address - Binary 29 bytes. stake_address: DbStakeAddress, /// Block Slot Number @@ -39,7 +40,9 @@ pub(crate) struct Params { txn_hash: DbTransactionId, } -impl Params { +impl_query_batch!(TxoInsertQuery, INSERT_TXO_QUERY); + +impl TxoInsertQuery { /// Create a new record for this transaction. pub(crate) fn new( stake_address: StakeAddress, slot_no: Slot, txn_index: TxnIndex, txo: TxnOutputOffset, diff --git a/catalyst-gateway/bin/src/db/index/block/txo/insert_txo_asset.rs b/catalyst-gateway/bin/src/db/index/block/txo/insert_txo_asset.rs index 2eb83cc934f1..05cb8565eb8a 100644 --- a/catalyst-gateway/bin/src/db/index/block/txo/insert_txo_asset.rs +++ b/catalyst-gateway/bin/src/db/index/block/txo/insert_txo_asset.rs @@ -11,6 +11,7 @@ use crate::{ index::queries::{PreparedQueries, SizedBatch}, types::{DbSlot, DbStakeAddress, DbTxnIndex, DbTxnOutputOffset}, }, + impl_query_batch, settings::cassandra_db, }; @@ -37,6 +38,8 @@ pub(crate) struct Params { value: num_bigint::BigInt, } +impl_query_batch!(Params, INSERT_TXO_ASSET_QUERY); + impl Params { /// Create a new record for this transaction. /// diff --git a/catalyst-gateway/bin/src/db/index/block/txo/insert_unstaked_txo.rs b/catalyst-gateway/bin/src/db/index/block/txo/insert_unstaked_txo.rs index 346460579a15..c68c89e46fb8 100644 --- a/catalyst-gateway/bin/src/db/index/block/txo/insert_unstaked_txo.rs +++ b/catalyst-gateway/bin/src/db/index/block/txo/insert_unstaked_txo.rs @@ -10,6 +10,7 @@ use crate::{ index::queries::{PreparedQueries, SizedBatch}, types::{DbSlot, DbTransactionId, DbTxnIndex, DbTxnOutputOffset}, }, + impl_query_batch, settings::cassandra_db, }; @@ -34,6 +35,8 @@ pub(crate) struct Params { value: num_bigint::BigInt, } +impl_query_batch!(Params, INSERT_UNSTAKED_TXO_QUERY); + impl Params { /// Create a new record for this transaction. pub(crate) fn new( diff --git a/catalyst-gateway/bin/src/db/index/block/txo/insert_unstaked_txo_asset.rs b/catalyst-gateway/bin/src/db/index/block/txo/insert_unstaked_txo_asset.rs index f9b5f26a5dc3..8c41e671c8f0 100644 --- a/catalyst-gateway/bin/src/db/index/block/txo/insert_unstaked_txo_asset.rs +++ b/catalyst-gateway/bin/src/db/index/block/txo/insert_unstaked_txo_asset.rs @@ -11,6 +11,7 @@ use crate::{ index::queries::{PreparedQueries, SizedBatch}, types::{DbSlot, DbTransactionId, DbTxnIndex, DbTxnOutputOffset}, }, + impl_query_batch, settings::cassandra_db, }; @@ -37,6 +38,8 @@ pub(crate) struct Params { value: num_bigint::BigInt, } +impl_query_batch!(Params, INSERT_UNSTAKED_TXO_ASSET_QUERY); + impl Params { /// Create a new record for this transaction. /// diff --git a/catalyst-gateway/bin/src/db/index/block/txo/mod.rs b/catalyst-gateway/bin/src/db/index/block/txo/mod.rs index dd7a520447ac..5d5c682526d6 100644 --- a/catalyst-gateway/bin/src/db/index/block/txo/mod.rs +++ b/catalyst-gateway/bin/src/db/index/block/txo/mod.rs @@ -28,7 +28,7 @@ use crate::{ /// There are multiple possible parameters to a query, which are represented separately. pub(crate) struct TxoInsertQuery { /// Staked TXO Data Parameters - staked_txo: Vec, + staked_txo: Vec, /// Unstaked TXO Data Parameters unstaked_txo: Vec, /// Staked TXO Asset Data Parameters @@ -52,7 +52,7 @@ impl TxoInsertQuery { pub(crate) async fn prepare_batch( session: &Arc, cfg: &cassandra_db::EnvVars, ) -> anyhow::Result<(SizedBatch, SizedBatch, SizedBatch, SizedBatch)> { - let txo_staked_insert_batch = insert_txo::Params::prepare_batch(session, cfg).await; + let txo_staked_insert_batch = insert_txo::TxoInsertQuery::prepare_batch(session, cfg).await; let txo_unstaked_insert_batch = insert_unstaked_txo::Params::prepare_batch(session, cfg).await; let txo_staked_asset_insert_batch = @@ -152,7 +152,7 @@ impl TxoInsertQuery { let txo_index = TxnOutputOffset::from(txo_index); if let Some(stake_address) = stake_address.clone() { - let params = insert_txo::Params::new( + let params = insert_txo::TxoInsertQuery::new( stake_address, slot_no, index, diff --git a/catalyst-gateway/bin/src/db/index/queries/mod.rs b/catalyst-gateway/bin/src/db/index/queries/mod.rs index a3f6f6791081..b408ed128eb7 100644 --- a/catalyst-gateway/bin/src/db/index/queries/mod.rs +++ b/catalyst-gateway/bin/src/db/index/queries/mod.rs @@ -8,10 +8,58 @@ pub(crate) mod registrations; pub(crate) mod staked_ada; pub(crate) mod sync_status; -use std::{fmt::Debug, sync::Arc}; +use std::{any::TypeId, fmt::Debug, sync::Arc}; use anyhow::bail; -use crossbeam_skiplist::SkipMap; +use dashmap::DashMap; +use purge::{ + catalyst_id_for_stake_address::{ + DeleteQuery as PurgeCatalystIdForStakeAddressDelete, + PrimaryKeyQuery as PurgeCatalystIdForStakeAddressSelect, + }, + catalyst_id_for_txn_id::{ + DeleteQuery as PurgeCatalystIdForTxnIdDelete, + PrimaryKeyQuery as PurgeCatalystIdForTxnIdSelect, + }, + cip36_registration::{ + DeleteQuery as PurgeCip36RegistrationDelete, + PrimaryKeyQuery as PurgeCip36RegistrationSelect, + }, + cip36_registration_for_vote_key::{ + DeleteQuery as PurgeCip36RegistrationForVoteKeyDelete, + PrimaryKeyQuery as PurgeCip36RegistrationForVoteKeySelect, + }, + cip36_registration_invalid::{ + DeleteQuery as PurgeCip36RegistrationInvalidDelete, + PrimaryKeyQuery as PurgeCip36RegistrationInvalidSelect, + }, + rbac509_invalid_registration::{ + DeleteQuery as PurgeRbacRegistrationInvalidDelete, + PrimaryKeyQuery as PurgeRbacRegistrationInvalidSelect, + }, + rbac509_registration::{ + DeleteQuery as PurgeRbacRegistrationDelete, PrimaryKeyQuery as PurgeRbacRegistrationSelect, + }, + stake_registration::{ + DeleteQuery as PurgeStakeRegistrationDelete, + PrimaryKeyQuery as PurgeStakeRegistrationSelect, + }, + txi_by_hash::{DeleteQuery as PurgeTxiByHashDelete, PrimaryKeyQuery as PurgeTxiByHashSelect}, + txo_ada::{DeleteQuery as PurgeTxoAdaDelete, PrimaryKeyQuery as PurgeTxoAdaSelect}, + txo_assets::{DeleteQuery as PurgeTxoAssetsDelete, PrimaryKeyQuery as PurgeTxoAssetsSelect}, + unstaked_txo_ada::{ + DeleteQuery as PurgeTxoUnstakedAdaDelete, PrimaryKeyQuery as PurgeTxoUnstakedAdaSelect, + }, + unstaked_txo_assets::{ + DeleteQuery as PurgeTxoUnstakedAssetDelete, PrimaryKeyQuery as PurgeTxoUnstakedAssetSelect, + }, +}; +use rbac::{ + get_catalyst_id_from_stake_address::GetCatalystIdForStakeAddress, + get_catalyst_id_from_transaction_id::GetCatalystIdForTxnId, + get_rbac_invalid_registrations::GetRbac509InvalidRegistrations, + get_rbac_registrations::GetRbac509Registrations, +}; use registrations::{ get_all_invalids::GetAllInvalidRegistrationsQuery, get_all_registrations::GetAllRegistrationsQuery, get_from_stake_addr::GetRegistrationQuery, @@ -33,404 +81,267 @@ use staked_ada::{ use sync_status::update::SyncStatusInsertQuery; use tracing::error; -use super::block::{ - certs::CertInsertQuery, cip36::Cip36InsertQuery, rbac509::Rbac509InsertQuery, - txi::TxiInsertQuery, txo::TxoInsertQuery, -}; -use crate::{ - db::index::{ - queries::rbac::{ - get_catalyst_id_from_stake_address, get_catalyst_id_from_transaction_id, - get_rbac_invalid_registrations, get_rbac_registrations, +use super::{ + block::{ + certs::StakeRegistrationInsertQuery, + cip36::{ + insert_cip36::Cip36Insert, insert_cip36_for_vote_key::Cip36ForVoteKeyInsert, + insert_cip36_invalid::Cip36InvalidInsert, + }, + rbac509::{ + insert_catalyst_id_for_stake_address::CatalystIdForStakeAddressInsert, + insert_catalyst_id_for_txn_id::CatalystIdForTxnIdInsert, insert_rbac509::Rbac509Insert, + insert_rbac509_invalid::Rbac509InvalidInsert, + }, + txi::TxiInsertQuery, + txo::{ + insert_txo::TxoInsertQuery, insert_txo_asset::Params as TxoAssetInsert, + insert_unstaked_txo::Params as TxoUnstakedInsert, + insert_unstaked_txo_asset::Params as TxoUnstakedAssetInsert, }, - session::CassandraSessionError, }, - service::utilities::health::set_index_db_liveness, - settings::cassandra_db, + session::{CassandraSession, CassandraSessionError}, }; +use crate::{service::utilities::health::set_index_db_liveness, settings::cassandra_db}; /// Batches of different sizes, prepared and ready for use. -pub(crate) type SizedBatch = SkipMap>; - -/// All Prepared insert Queries that we know about. -#[derive(strum_macros::Display)] -#[allow(clippy::enum_variant_names)] -pub(crate) enum PreparedQuery { - /// TXO Insert query. - TxoAdaInsertQuery, - /// TXO Asset Insert query. - TxoAssetInsertQuery, - /// Unstaked TXO Insert query. - UnstakedTxoAdaInsertQuery, - /// Unstaked TXO Asset Insert query. - UnstakedTxoAssetInsertQuery, - /// TXI Insert query. - TxiInsertQuery, - /// Stake Registration Insert query. - StakeRegistrationInsertQuery, - /// CIP 36 Registration Insert Query. - Cip36RegistrationInsertQuery, - /// CIP 36 Registration Error Insert query. - Cip36RegistrationInsertErrorQuery, - /// CIP 36 Registration for voting key Insert query. - Cip36RegistrationForVoteKeyInsertQuery, - /// TXO spent Update query. - TxoSpentUpdateQuery, - /// RBAC 509 Registration Insert query. - Rbac509InsertQuery, - /// An invalid RBAC 509 registration Insert query. - Rbac509InvalidInsertQuery, - /// A Catalyst ID for transaction ID insert query. - CatalystIdForTxnIdInsertQuery, - /// A Catalyst ID for stake address insert query. - CatalystIdForStakeAddressInsertQuery, +pub(crate) type SizedBatch = DashMap>; + +/// Kind of result +#[derive(Clone)] +#[allow(dead_code)] +pub(crate) enum QueryKind { + /// Sized-batch + Batch(SizedBatch), + /// Prepared statement + Statement(PreparedStatement), } -/// All prepared SELECT query statements (return data). -pub(crate) enum PreparedSelectQuery { - /// Get TXO by stake address query. - TxoByStakeAddress, - /// Get TXI by transaction hash query. - TxiByTransactionHash, - /// Get native assets by stake address query. - AssetsByStakeAddress, - /// Get Registrations - RegistrationFromStakeAddr, - /// Get invalid Registration - InvalidRegistrationsFromStakeAddr, - /// Get stake addr from stake hash - StakeAddrFromStakeHash, - /// Get stake addr from vote key - StakeAddrFromVoteKey, - /// Get Catalyst ID by transaction ID. - CatalystIdByTransactionId, - /// Get Catalyst ID by stake address. - CatalystIdByStakeAddress, - /// Get RBAC registrations by Catalyst ID. - RbacRegistrationsByCatalystId, - /// Get invalid RBAC registrations by Catalyst ID. - RbacInvalidRegistrationsByCatalystId, - /// Get all registrations for snapshot - GetAllRegistrations, - /// Get all invalid registrations for snapshot - GetAllInvalidRegistrations, -} +/// A trait to prepare Index DB queries. +#[allow(dead_code)] +pub(crate) trait Query: std::fmt::Display + std::any::Any { + /// CQL for query preparation. + const QUERY_STR: &'static str; -/// All prepared UPSERT query statements (inserts/updates a single value of data). -pub(crate) enum PreparedUpsertQuery { - /// Sync Status Insert - SyncStatusInsert, -} + /// Returns the type id for the query. + fn type_id() -> std::any::TypeId { + std::any::TypeId::of::() + } + + /// Returns the CQL statement for the query in plain text. + fn query_str() -> &'static str { + Self::QUERY_STR + } -/// All prepared queries for a session. -#[allow(clippy::struct_field_names)] -pub(crate) struct PreparedQueries { - /// TXO Insert query. - txo_insert_queries: SizedBatch, - /// TXO Asset Insert query. - txo_asset_insert_queries: SizedBatch, - /// Unstaked TXO Insert query. - unstaked_txo_insert_queries: SizedBatch, - /// Unstaked TXO Asset Insert query. - unstaked_txo_asset_insert_queries: SizedBatch, - /// TXI Insert query. - txi_insert_queries: SizedBatch, - /// TXI Insert query. - stake_registration_insert_queries: SizedBatch, - /// CIP36 Registrations. - cip36_registration_insert_queries: SizedBatch, - /// CIP36 Registration errors. - cip36_registration_error_insert_queries: SizedBatch, - /// CIP36 Registration for Stake Address Insert query. - cip36_registration_for_vote_key_insert_queries: SizedBatch, - /// Update TXO spent query. - txo_spent_update_queries: SizedBatch, - /// Get TXO by stake address query. - txo_by_stake_address_query: PreparedStatement, - /// Get TXI by transaction hash. - txi_by_txn_hash_query: PreparedStatement, - /// RBAC 509 Registrations. - rbac509_registration_insert_queries: SizedBatch, - /// Invalid RBAC 509 registrations. - rbac509_invalid_registration_insert_queries: SizedBatch, - /// Catalyst ID for transaction ID insert query. - catalyst_id_for_txn_id_insert_queries: SizedBatch, - /// Catalyst ID for stake address insert query. - catalyst_id_for_stake_address_insert_queries: SizedBatch, - /// Get native assets by stake address query. - native_assets_by_stake_address_query: PreparedStatement, - /// Get registrations - registration_from_stake_addr_query: PreparedStatement, - /// stake addr from stake hash - stake_addr_from_stake_address_query: PreparedStatement, - /// stake addr from vote key - stake_addr_from_vote_key_query: PreparedStatement, - /// Get invalid registrations - invalid_registrations_from_stake_addr_query: PreparedStatement, - /// Insert Sync Status update. - sync_status_insert: PreparedStatement, - /// Get Catalyst ID by stake address. - catalyst_id_by_stake_address_query: PreparedStatement, - /// Get Catalyst ID by transaction ID. - catalyst_id_by_transaction_id_query: PreparedStatement, - /// Get RBAC registrations by Catalyst ID. - rbac_registrations_by_catalyst_id_query: PreparedStatement, - /// Get invalid RBAC registrations by Catalyst ID. - rbac_invalid_registrations_by_catalyst_id_query: PreparedStatement, - /// Get all registrations for snapshot - get_all_registrations_query: PreparedStatement, - /// Get all invalid registrations for snapshot - get_all_invalid_registrations_query: PreparedStatement, + /// Prepare the query + async fn prepare_query( + session: &Arc, cfg: &cassandra_db::EnvVars, + ) -> anyhow::Result; } -/// A set of query responses that can fail. -pub(crate) type FallibleQueryResults = anyhow::Result>; -/// A set of query responses from tasks that can fail. -pub(crate) type FallibleQueryTasks = Vec>; +/// Implement Display trait for types that implement Query +#[macro_export] +macro_rules! impl_display_for_query_type { + ($i:ty) => { + impl std::fmt::Display for $i { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}", + ::query_str() + ) + } + } + }; +} -impl PreparedQueries { - /// Create new prepared queries for a given session. - #[allow(clippy::too_many_lines)] - pub(crate) async fn new( - session: Arc, cfg: &cassandra_db::EnvVars, - ) -> anyhow::Result { - // We initialize like this, so that all errors preparing querys get shown before aborting. - let txi_insert_queries = TxiInsertQuery::prepare_batch(&session, cfg).await?; - let all_txo_queries = TxoInsertQuery::prepare_batch(&session, cfg).await; - let stake_registration_insert_queries = - CertInsertQuery::prepare_batch(&session, cfg).await?; - let all_cip36_queries = Cip36InsertQuery::prepare_batch(&session, cfg).await; - let txo_spent_update_queries = - UpdateTxoSpentQuery::prepare_batch(session.clone(), cfg).await?; - let txo_by_stake_address_query = GetTxoByStakeAddressQuery::prepare(session.clone()).await; - let txi_by_txn_hash_query = GetTxiByTxnHashesQuery::prepare(session.clone()).await; - let all_rbac_queries = Rbac509InsertQuery::prepare_batch(&session, cfg).await; - let native_assets_by_stake_address_query = - GetAssetsByStakeAddressQuery::prepare(session.clone()).await; - let registration_from_stake_addr_query = - GetRegistrationQuery::prepare(session.clone()).await; - let stake_addr_from_stake_address = GetStakeAddrQuery::prepare(session.clone()).await; - let stake_addr_from_vote_key = GetStakeAddrFromVoteKeyQuery::prepare(session.clone()).await; - let invalid_registrations = GetInvalidRegistrationQuery::prepare(session.clone()).await; - let get_all_registrations_query = GetAllRegistrationsQuery::prepare(session.clone()).await; - let get_all_invalid_registrations_query = - GetAllInvalidRegistrationsQuery::prepare(session.clone()).await; - let sync_status_insert = SyncStatusInsertQuery::prepare(session.clone()).await?; - let catalyst_id_by_stake_address_query = - get_catalyst_id_from_stake_address::Query::prepare(session.clone()).await?; - let catalyst_id_by_transaction_id_query = - get_catalyst_id_from_transaction_id::Query::prepare(session.clone()).await?; - let rbac_registrations_by_catalyst_id_query = - get_rbac_registrations::Query::prepare(session.clone()).await?; - let rbac_invalid_registrations_by_catalyst_id_query = - get_rbac_invalid_registrations::Query::prepare(session.clone()).await?; - - let ( - txo_insert_queries, - unstaked_txo_insert_queries, - txo_asset_insert_queries, - unstaked_txo_asset_insert_queries, - ) = all_txo_queries?; - - let ( - cip36_registration_insert_queries, - cip36_registration_error_insert_queries, - cip36_registration_for_vote_key_insert_queries, - ) = all_cip36_queries?; - - let ( - rbac509_registration_insert_queries, - rbac509_invalid_registration_insert_queries, - catalyst_id_for_txn_id_insert_queries, - catalyst_id_for_stake_address_insert_queries, - ) = all_rbac_queries?; - - Ok(Self { - txo_insert_queries, - txo_asset_insert_queries, - unstaked_txo_insert_queries, - unstaked_txo_asset_insert_queries, - txi_insert_queries, - stake_registration_insert_queries, - cip36_registration_insert_queries, - cip36_registration_error_insert_queries, - cip36_registration_for_vote_key_insert_queries, - txo_spent_update_queries, - txo_by_stake_address_query: txo_by_stake_address_query?, - txi_by_txn_hash_query: txi_by_txn_hash_query?, - rbac509_registration_insert_queries, - rbac509_invalid_registration_insert_queries, - catalyst_id_for_txn_id_insert_queries, - catalyst_id_for_stake_address_insert_queries, - native_assets_by_stake_address_query: native_assets_by_stake_address_query?, - registration_from_stake_addr_query: registration_from_stake_addr_query?, - stake_addr_from_stake_address_query: stake_addr_from_stake_address?, - stake_addr_from_vote_key_query: stake_addr_from_vote_key?, - invalid_registrations_from_stake_addr_query: invalid_registrations?, - sync_status_insert, - rbac_registrations_by_catalyst_id_query, - rbac_invalid_registrations_by_catalyst_id_query, - catalyst_id_by_stake_address_query, - catalyst_id_by_transaction_id_query, - get_all_registrations_query: get_all_registrations_query?, - get_all_invalid_registrations_query: get_all_invalid_registrations_query?, - }) - } +/// Implement Query trait for batched types +#[macro_export] +macro_rules! impl_query_batch { + ($i:ty, $c:ident) => { + impl $crate::db::index::queries::Query for $i { + + const QUERY_STR: &'static str = $c; + + async fn prepare_query( + session: &std::sync::Arc, + cfg: &$crate::settings::cassandra_db::EnvVars, + ) -> anyhow::Result<$crate::db::index::queries::QueryKind> { + $crate::db::index::queries::prepare_batch( + session.clone(), + $c, + cfg, + scylla::statement::Consistency::Any, + true, + false, + ) + .await + .map($crate::db::index::queries::QueryKind::Batch) + .inspect_err(|error| error!(error=%error,"Failed to prepare $c Query.")) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{{$c}}")) + } + } - /// Prepares a statement. - pub(crate) async fn prepare( - session: Arc, query: &str, consistency: scylla::statement::Consistency, - idempotent: bool, - ) -> anyhow::Result { - let mut prepared = session.prepare(query).await?; - prepared.set_consistency(consistency); - prepared.set_is_idempotent(idempotent); + $crate::impl_display_for_query_type!($i); + }; +} - Ok(prepared) - } +/// Implement Query trait for statement types +#[macro_export] +macro_rules! impl_query_statement { + ($i:ty, $c:ident) => { + impl $crate::db::index::queries::Query for $i { + + const QUERY_STR: &'static str = $c; + + async fn prepare_query( + session: &std::sync::Arc, + _: &$crate::settings::cassandra_db::EnvVars, + ) -> anyhow::Result<$crate::db::index::queries::QueryKind> { + + $crate::db::index::queries::prepare_statement( + session, + $c, + scylla::statement::Consistency::All, + true, + ) + .await + .map($crate::db::index::queries::QueryKind::Statement) + .inspect_err(|error| error!(error=%error, "Failed to prepare $c query.")) + .map_err(|error| anyhow::anyhow!("{error}\n--\n{{$c}}")) - /// Prepares all permutations of the batch from 1 to max. - /// It is necessary to do this because batches are pre-sized, they can not be dynamic. - /// Preparing the batches in advance is a very larger performance increase. - pub(crate) async fn prepare_batch( - session: Arc, query: &str, cfg: &cassandra_db::EnvVars, - consistency: scylla::statement::Consistency, idempotent: bool, logged: bool, - ) -> anyhow::Result { - let sized_batches: SizedBatch = SkipMap::new(); - - // First prepare the query. Only needs to be done once, all queries on a batch are the - // same. - let prepared = Self::prepare(session, query, consistency, idempotent).await?; - - for batch_size in cassandra_db::MIN_BATCH_SIZE..=cfg.max_batch_size { - let mut batch: Batch = Batch::new(if logged { - scylla::batch::BatchType::Logged - } else { - scylla::batch::BatchType::Unlogged - }); - batch.set_consistency(consistency); - batch.set_is_idempotent(idempotent); - for _ in cassandra_db::MIN_BATCH_SIZE..=batch_size { - batch.append_statement(prepared.clone()); } - - sized_batches.insert(batch_size.try_into()?, Arc::new(batch)); } - Ok(sized_batches) + $crate::impl_display_for_query_type!($i); + }; +} + +/// Prepare Queries +pub(crate) async fn prepare_queries( + session: &Arc, cfg: &cassandra_db::EnvVars, +) -> anyhow::Result> { + // Prepare a query dashmap + macro_rules! prepare_q { + ( $( $i:ty),* ) => { + { + let queries = vec![ + $( (<$i as Query>::type_id(), <$i as Query>::prepare_query(session, cfg).await?), )* + ]; + DashMap::from_iter(queries) + } + } } + // WIP: Adding queries as they implement trait + let queries = prepare_q!( + // prepared batch queries + TxiInsertQuery, + TxoInsertQuery, + TxoAssetInsert, + TxoUnstakedInsert, + TxoUnstakedAssetInsert, + StakeRegistrationInsertQuery, + Cip36Insert, + Cip36InvalidInsert, + Cip36ForVoteKeyInsert, + UpdateTxoSpentQuery, + Rbac509Insert, + Rbac509InvalidInsert, + CatalystIdForTxnIdInsert, + CatalystIdForStakeAddressInsert, + // prepared statement queries + GetTxoByStakeAddressQuery, + GetTxiByTxnHashesQuery, + GetAssetsByStakeAddressQuery, + GetRegistrationQuery, + GetStakeAddrQuery, + GetStakeAddrFromVoteKeyQuery, + GetInvalidRegistrationQuery, + GetAllRegistrationsQuery, + GetAllInvalidRegistrationsQuery, + SyncStatusInsertQuery, + GetCatalystIdForStakeAddress, + GetCatalystIdForTxnId, + GetRbac509Registrations, + GetRbac509InvalidRegistrations, + // purge queries + PurgeTxoAdaSelect, + PurgeTxoAdaDelete, + PurgeTxoAssetsSelect, + PurgeTxoAssetsDelete, + PurgeTxoUnstakedAdaSelect, + PurgeTxoUnstakedAdaDelete, + PurgeTxoUnstakedAssetSelect, + PurgeTxoUnstakedAssetDelete, + PurgeTxiByHashSelect, + PurgeTxiByHashDelete, + PurgeStakeRegistrationSelect, + PurgeStakeRegistrationDelete, + PurgeCip36RegistrationSelect, + PurgeCip36RegistrationDelete, + PurgeCip36RegistrationInvalidSelect, + PurgeCip36RegistrationInvalidDelete, + PurgeCip36RegistrationForVoteKeySelect, + PurgeCip36RegistrationForVoteKeyDelete, + PurgeRbacRegistrationSelect, + PurgeRbacRegistrationDelete, + PurgeRbacRegistrationInvalidSelect, + PurgeRbacRegistrationInvalidDelete, + PurgeCatalystIdForTxnIdSelect, + PurgeCatalystIdForTxnIdDelete, + PurgeCatalystIdForStakeAddressSelect, + PurgeCatalystIdForStakeAddressDelete + ); + Ok(queries) +} - /// Executes a single query with the given parameters. - /// - /// Returns no data, and an error if the query fails. - pub(crate) async fn execute_upsert

( - &self, session: Arc, upsert_query: PreparedUpsertQuery, params: P, - ) -> anyhow::Result<()> - where P: SerializeRow { - let prepared_stmt = match upsert_query { - PreparedUpsertQuery::SyncStatusInsert => &self.sync_status_insert, - }; +/// Prepares a statement. +pub(crate) async fn prepare_statement( + session: &Arc, query: Q, consistency: scylla::statement::Consistency, idempotent: bool, +) -> anyhow::Result { + let mut prepared = session.prepare(format!("{query}")).await?; + prepared.set_consistency(consistency); + prepared.set_is_idempotent(idempotent); - session - .execute_unpaged(prepared_stmt, params) - .await - .map_err(|e| { - match e { - QueryError::ConnectionPoolError(err) => { - set_index_db_liveness(false); - error!(error = %err, "Index DB connection failed. Liveness set to false."); - CassandraSessionError::ConnectionUnavailable { source: err.into() }.into() - }, - _ => anyhow::anyhow!(e), - } - })?; - - Ok(()) - } + Ok(prepared) +} - /// Executes a select query with the given parameters. - /// - /// Returns an iterator that iterates over all the result pages that the query - /// returns. - pub(crate) async fn execute_iter

( - &self, session: Arc, select_query: PreparedSelectQuery, params: P, - ) -> anyhow::Result - where P: SerializeRow { - let prepared_stmt = match select_query { - PreparedSelectQuery::TxoByStakeAddress => &self.txo_by_stake_address_query, - PreparedSelectQuery::TxiByTransactionHash => &self.txi_by_txn_hash_query, - PreparedSelectQuery::AssetsByStakeAddress => &self.native_assets_by_stake_address_query, - PreparedSelectQuery::RegistrationFromStakeAddr => { - &self.registration_from_stake_addr_query - }, - PreparedSelectQuery::StakeAddrFromStakeHash => { - &self.stake_addr_from_stake_address_query - }, - PreparedSelectQuery::StakeAddrFromVoteKey => &self.stake_addr_from_vote_key_query, - PreparedSelectQuery::InvalidRegistrationsFromStakeAddr => { - &self.invalid_registrations_from_stake_addr_query - }, - PreparedSelectQuery::RbacRegistrationsByCatalystId => { - &self.rbac_registrations_by_catalyst_id_query - }, - PreparedSelectQuery::RbacInvalidRegistrationsByCatalystId => { - &self.rbac_invalid_registrations_by_catalyst_id_query - }, - PreparedSelectQuery::CatalystIdByTransactionId => { - &self.catalyst_id_by_transaction_id_query - }, - PreparedSelectQuery::CatalystIdByStakeAddress => { - &self.catalyst_id_by_stake_address_query - }, - PreparedSelectQuery::GetAllRegistrations => &self.get_all_registrations_query, - PreparedSelectQuery::GetAllInvalidRegistrations => { - &self.get_all_invalid_registrations_query - }, - }; - session_execute_iter(session, prepared_stmt, params).await - } +/// Prepares all permutations of the batch from 1 to max. +/// It is necessary to do this because batches are pre-sized, they can not be dynamic. +/// Preparing the batches in advance is a very larger performance increase. +pub(crate) async fn prepare_batch( + session: Arc, query: Q, cfg: &cassandra_db::EnvVars, + consistency: scylla::statement::Consistency, idempotent: bool, logged: bool, +) -> anyhow::Result { + let sized_batches: SizedBatch = DashMap::new(); + + // First prepare the query. Only needs to be done once, all queries on a batch are the + // same. + let prepared = prepare_statement(&session, query, consistency, idempotent).await?; + + for batch_size in cassandra_db::MIN_BATCH_SIZE..=cfg.max_batch_size { + let mut batch: Batch = Batch::new(if logged { + scylla::batch::BatchType::Logged + } else { + scylla::batch::BatchType::Unlogged + }); + batch.set_consistency(consistency); + batch.set_is_idempotent(idempotent); + for _ in cassandra_db::MIN_BATCH_SIZE..=batch_size { + batch.append_statement(prepared.clone()); + } - /// Execute a Batch query with the given parameters. - /// - /// Values should be a Vec of values which implement `SerializeRow` and they MUST be - /// the same, and must match the query being executed. - /// - /// This will divide the batch into optimal sized chunks and execute them until all - /// values have been executed or the first error is encountered. - pub(crate) async fn execute_batch( - &self, session: Arc, cfg: Arc, query: PreparedQuery, - values: Vec, - ) -> FallibleQueryResults { - let query_map = match query { - PreparedQuery::TxoAdaInsertQuery => &self.txo_insert_queries, - PreparedQuery::TxoAssetInsertQuery => &self.txo_asset_insert_queries, - PreparedQuery::UnstakedTxoAdaInsertQuery => &self.unstaked_txo_insert_queries, - PreparedQuery::UnstakedTxoAssetInsertQuery => &self.unstaked_txo_asset_insert_queries, - PreparedQuery::TxiInsertQuery => &self.txi_insert_queries, - PreparedQuery::StakeRegistrationInsertQuery => &self.stake_registration_insert_queries, - PreparedQuery::Cip36RegistrationInsertQuery => &self.cip36_registration_insert_queries, - PreparedQuery::Cip36RegistrationInsertErrorQuery => { - &self.cip36_registration_error_insert_queries - }, - PreparedQuery::Cip36RegistrationForVoteKeyInsertQuery => { - &self.cip36_registration_for_vote_key_insert_queries - }, - PreparedQuery::TxoSpentUpdateQuery => &self.txo_spent_update_queries, - PreparedQuery::Rbac509InsertQuery => &self.rbac509_registration_insert_queries, - PreparedQuery::Rbac509InvalidInsertQuery => { - &self.rbac509_invalid_registration_insert_queries - }, - PreparedQuery::CatalystIdForTxnIdInsertQuery => { - &self.catalyst_id_for_txn_id_insert_queries - }, - PreparedQuery::CatalystIdForStakeAddressInsertQuery => { - &self.catalyst_id_for_stake_address_insert_queries - }, - }; - session_execute_batch(session, query_map, cfg, query, values).await + sized_batches.insert(batch_size.try_into()?, Arc::new(batch)); } + + Ok(sized_batches) } +/// A set of query responses that can fail. +pub(crate) type FallibleQueryResults = anyhow::Result>; +/// A set of query responses from tasks that can fail. +pub(crate) type FallibleQueryTasks = Vec>; + /// Execute a Batch query with the given parameters. /// /// Values should be a Vec of values which implement `SerializeRow` and they MUST be @@ -438,7 +349,7 @@ impl PreparedQueries { /// /// This will divide the batch into optimal sized chunks and execute them until all /// values have been executed or the first error is encountered. -async fn session_execute_batch( +pub(crate) async fn session_execute_batch( session: Arc, query_map: &SizedBatch, cfg: Arc, query: Q, values: Vec, ) -> FallibleQueryResults { @@ -499,3 +410,27 @@ where P: SerializeRow { } }) } + +/// Executes a single query with the given parameters. +/// +/// Returns no data, and an error if the query fails. +pub(crate) async fn session_execute_upsert

( + session: Arc, prepared_stmt: &PreparedStatement, params: P, +) -> anyhow::Result<()> +where P: SerializeRow { + session + .execute_unpaged(prepared_stmt, params) + .await + .map_err(|e| { + match e { + QueryError::ConnectionPoolError(err) => { + set_index_db_liveness(false); + error!(error = %err, "Index DB connection failed. Liveness set to false."); + CassandraSessionError::ConnectionUnavailable { source: err.into() }.into() + }, + _ => anyhow::anyhow!(e), + } + })?; + + Ok(()) +} diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/catalyst_id_for_stake_address.rs b/catalyst-gateway/bin/src/db/index/queries/purge/catalyst_id_for_stake_address.rs index e3a2a5211613..81a002f36842 100644 --- a/catalyst-gateway/bin/src/db/index/queries/purge/catalyst_id_for_stake_address.rs +++ b/catalyst-gateway/bin/src/db/index/queries/purge/catalyst_id_for_stake_address.rs @@ -1,25 +1,19 @@ //! Catalyst ID For Stake Address (RBAC 509 registrations) Queries used in purging data. -use std::{fmt::Debug, sync::Arc}; +use std::fmt::Debug; -use scylla::{ - prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, SerializeRow, - Session, -}; +use scylla::{transport::iterator::TypedRowStream, SerializeRow}; use tracing::error; use crate::{ db::{ index::{ - queries::{ - purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery}, - FallibleQueryResults, SizedBatch, - }, + queries::{FallibleQueryResults, Query}, session::CassandraSession, }, types::DbStakeAddress, }, - settings::cassandra_db, + impl_query_batch, impl_query_statement, }; pub(crate) mod result { @@ -59,30 +53,16 @@ impl From for Params { /// Get primary key for Catalyst ID For Stake Address registration query. pub(crate) struct PrimaryKeyQuery; -impl PrimaryKeyQuery { - /// Prepares a query to get all Catalyst ID For Stake Address registration primary - /// keys. - pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { - PreparedQueries::prepare( - session.clone(), - SELECT_QUERY, - scylla::statement::Consistency::All, - true, - ) - .await - .inspect_err( - |error| error!(error=%error, "Failed to prepare get Catalyst ID For Stake Address registration primary key query."), - ) - .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}")) - } +impl_query_statement!(PrimaryKeyQuery, SELECT_QUERY); +impl PrimaryKeyQuery { /// Executes a query to get all Catalyst ID For Stake Address registration primary /// keys. pub(crate) async fn execute( session: &CassandraSession, ) -> anyhow::Result> { let iter = session - .purge_execute_iter(PreparedSelectQuery::CatalystIdForStakeAddress) + .purge_execute_iter(::type_id()) .await? .rows_stream::()?; @@ -96,32 +76,20 @@ const DELETE_QUERY: &str = include_str!("cql/delete_catalyst_id_for_stake_addres /// Delete Catalyst ID For Stake Address registration Query pub(crate) struct DeleteQuery; +impl_query_batch!(DeleteQuery, DELETE_QUERY); + impl DeleteQuery { /// Prepare Batch of Delete Queries - pub(crate) async fn prepare_batch( - session: &Arc, cfg: &cassandra_db::EnvVars, - ) -> anyhow::Result { - PreparedQueries::prepare_batch( - session.clone(), - DELETE_QUERY, - cfg, - scylla::statement::Consistency::Any, - true, - false, - ) - .await - .inspect_err( - |error| error!(error=%error, "Failed to prepare delete Catalyst ID For Stake Address registration primary key query."), - ) - .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}")) - } - /// Executes a DELETE Query pub(crate) async fn execute( session: &CassandraSession, params: Vec, ) -> FallibleQueryResults { let results = session - .purge_execute_batch(PreparedDeleteQuery::CatalystIdForStakeAddress, params) + .purge_execute_batch( + ::type_id(), + ::query_str(), + params, + ) .await?; Ok(results) diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/catalyst_id_for_txn_id.rs b/catalyst-gateway/bin/src/db/index/queries/purge/catalyst_id_for_txn_id.rs index cb3046d09ec1..1fce9f3e414d 100644 --- a/catalyst-gateway/bin/src/db/index/queries/purge/catalyst_id_for_txn_id.rs +++ b/catalyst-gateway/bin/src/db/index/queries/purge/catalyst_id_for_txn_id.rs @@ -1,25 +1,19 @@ //! Catalyst ID For TX ID (RBAC 509 registrations) Queries used in purging data. -use std::{fmt::Debug, sync::Arc}; +use std::fmt::Debug; -use scylla::{ - prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, SerializeRow, - Session, -}; +use scylla::{transport::iterator::TypedRowStream, SerializeRow}; use tracing::error; use crate::{ db::{ index::{ - queries::{ - purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery}, - FallibleQueryResults, SizedBatch, - }, + queries::{FallibleQueryResults, Query}, session::CassandraSession, }, types::DbTransactionId, }, - settings::cassandra_db, + impl_query_batch, impl_query_statement, }; pub(crate) mod result { @@ -65,28 +59,14 @@ impl From for Params { /// Get primary key for Catalyst ID For TX ID registration query. pub(crate) struct PrimaryKeyQuery; -impl PrimaryKeyQuery { - /// Prepares a query to get all Catalyst ID For TX ID registration primary keys. - pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { - PreparedQueries::prepare( - session.clone(), - SELECT_QUERY, - scylla::statement::Consistency::All, - true, - ) - .await - .inspect_err( - |error| error!(error=%error, "Failed to prepare get Catalyst ID For TX ID registration primary key query."), - ) - .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}")) - } +impl_query_statement!(PrimaryKeyQuery, SELECT_QUERY); - /// Executes a query to get all Catalyst ID For TX ID registration primary keys. +impl PrimaryKeyQuery { pub(crate) async fn execute( session: &CassandraSession, ) -> anyhow::Result> { let iter = session - .purge_execute_iter(PreparedSelectQuery::CatalystIdForTxnId) + .purge_execute_iter(::type_id()) .await? .rows_stream::()?; @@ -100,32 +80,15 @@ const DELETE_QUERY: &str = include_str!("cql/delete_catalyst_id_for_txn_id.cql") /// Delete Catalyst ID For TX ID registration Query pub(crate) struct DeleteQuery; -impl DeleteQuery { - /// Prepare Batch of Delete Queries - pub(crate) async fn prepare_batch( - session: &Arc, cfg: &cassandra_db::EnvVars, - ) -> anyhow::Result { - PreparedQueries::prepare_batch( - session.clone(), - DELETE_QUERY, - cfg, - scylla::statement::Consistency::Any, - true, - false, - ) - .await - .inspect_err( - |error| error!(error=%error, "Failed to prepare delete Catalyst ID For TX ID registration primary key query."), - ) - .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}")) - } +impl_query_batch!(DeleteQuery, DELETE_QUERY); +impl DeleteQuery { /// Executes a DELETE Query pub(crate) async fn execute( session: &CassandraSession, params: Vec, ) -> FallibleQueryResults { let results = session - .purge_execute_batch(PreparedDeleteQuery::CatalystIdForTxnId, params) + .purge_execute_batch(Self::type_id(), Self::query_str(), params) .await?; Ok(results) diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cip36_registration.rs b/catalyst-gateway/bin/src/db/index/queries/purge/cip36_registration.rs index 208d597828bf..1c50b0fb73b7 100644 --- a/catalyst-gateway/bin/src/db/index/queries/purge/cip36_registration.rs +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cip36_registration.rs @@ -1,24 +1,18 @@ //! CIP-36 registration Queries used in purging data. -use std::{fmt::Debug, sync::Arc}; +use std::fmt::Debug; -use scylla::{ - prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, SerializeRow, - Session, -}; +use scylla::{transport::iterator::TypedRowStream, SerializeRow}; use tracing::error; use crate::{ db::{ index::{ - queries::{ - purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery}, - FallibleQueryResults, SizedBatch, - }, + queries::{FallibleQueryResults, Query}, session::CassandraSession, }, types::{DbSlot, DbTxnIndex}, }, - settings::cassandra_db, + impl_query_batch, impl_query_statement, }; pub(crate) mod result { @@ -70,28 +64,15 @@ impl From for Params { /// Get primary key for CIP-36 registration query. pub(crate) struct PrimaryKeyQuery; -impl PrimaryKeyQuery { - /// Prepares a query to get all CIP-36 registration primary keys. - pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { - PreparedQueries::prepare( - session.clone(), - SELECT_QUERY, - scylla::statement::Consistency::All, - true, - ) - .await - .inspect_err( - |error| error!(error=%error, "Failed to prepare get CIP-36 registration primary key query."), - ) - .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}")) - } +impl_query_statement!(PrimaryKeyQuery, SELECT_QUERY); +impl PrimaryKeyQuery { /// Executes a query to get all CIP-36 registration primary keys. pub(crate) async fn execute( session: &CassandraSession, ) -> anyhow::Result> { let iter = session - .purge_execute_iter(PreparedSelectQuery::Cip36Registration) + .purge_execute_iter(::type_id()) .await? .rows_stream::()?; @@ -105,32 +86,19 @@ const DELETE_QUERY: &str = include_str!("./cql/delete_cip36_registration.cql"); /// Delete CIP-36 registration Query pub(crate) struct DeleteQuery; -impl DeleteQuery { - /// Prepare Batch of Delete Queries - pub(crate) async fn prepare_batch( - session: &Arc, cfg: &cassandra_db::EnvVars, - ) -> anyhow::Result { - PreparedQueries::prepare_batch( - session.clone(), - DELETE_QUERY, - cfg, - scylla::statement::Consistency::Any, - true, - false, - ) - .await - .inspect_err( - |error| error!(error=%error, "Failed to prepare delete CIP-36 registration primary key query."), - ) - .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}")) - } +impl_query_batch!(DeleteQuery, DELETE_QUERY); +impl DeleteQuery { /// Executes a DELETE Query pub(crate) async fn execute( session: &CassandraSession, params: Vec, ) -> FallibleQueryResults { let results = session - .purge_execute_batch(PreparedDeleteQuery::Cip36Registration, params) + .purge_execute_batch( + ::type_id(), + ::query_str(), + params, + ) .await?; Ok(results) diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cip36_registration_for_vote_key.rs b/catalyst-gateway/bin/src/db/index/queries/purge/cip36_registration_for_vote_key.rs index 24a3ca157e69..983d2ad5d40d 100644 --- a/catalyst-gateway/bin/src/db/index/queries/purge/cip36_registration_for_vote_key.rs +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cip36_registration_for_vote_key.rs @@ -1,24 +1,18 @@ //! CIP-36 registration by Vote Key Queries used in purging data. -use std::{fmt::Debug, sync::Arc}; +use std::fmt::Debug; -use scylla::{ - prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, SerializeRow, - Session, -}; +use scylla::{transport::iterator::TypedRowStream, SerializeRow, Session}; use tracing::error; use crate::{ db::{ index::{ - queries::{ - purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery}, - FallibleQueryResults, SizedBatch, - }, + queries::{FallibleQueryResults, Query}, session::CassandraSession, }, types::{DbSlot, DbTxnIndex}, }, - settings::cassandra_db, + impl_query_batch, impl_query_statement, }; pub(crate) mod result { @@ -74,28 +68,15 @@ impl From for Params { /// Get primary key for CIP-36 registration by Vote key query. pub(crate) struct PrimaryKeyQuery; -impl PrimaryKeyQuery { - /// Prepares a query to get all CIP-36 registration by Vote key primary keys. - pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { - PreparedQueries::prepare( - session.clone(), - SELECT_QUERY, - scylla::statement::Consistency::All, - true, - ) - .await - .inspect_err( - |error| error!(error=%error, "Failed to prepare get CIP-36 registration by Vote key primary key query."), - ) - .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}")) - } +impl_query_statement!(PrimaryKeyQuery, SELECT_QUERY); +impl PrimaryKeyQuery { /// Executes a query to get all CIP-36 registration by Vote key primary keys. pub(crate) async fn execute( session: &CassandraSession, ) -> anyhow::Result> { let iter = session - .purge_execute_iter(PreparedSelectQuery::Cip36RegistrationForVoteKey) + .purge_execute_iter(::type_id()) .await? .rows_stream::()?; @@ -109,32 +90,19 @@ const DELETE_QUERY: &str = include_str!("./cql/delete_cip36_registration_for_vot /// Delete CIP-36 registration by Vote key Query pub(crate) struct DeleteQuery; -impl DeleteQuery { - /// Prepare Batch of Delete Queries - pub(crate) async fn prepare_batch( - session: &Arc, cfg: &cassandra_db::EnvVars, - ) -> anyhow::Result { - PreparedQueries::prepare_batch( - session.clone(), - DELETE_QUERY, - cfg, - scylla::statement::Consistency::Any, - true, - false, - ) - .await - .inspect_err( - |error| error!(error=%error, "Failed to prepare delete CIP-36 registration by Vote key primary key query."), - ) - .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}")) - } +impl_query_batch!(DeleteQuery, DELETE_QUERY); +impl DeleteQuery { /// Executes a DELETE Query pub(crate) async fn execute( session: &CassandraSession, params: Vec, ) -> FallibleQueryResults { let results = session - .purge_execute_batch(PreparedDeleteQuery::Cip36RegistrationForVoteKey, params) + .purge_execute_batch( + ::type_id(), + ::query_str(), + params, + ) .await?; Ok(results) diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/cip36_registration_invalid.rs b/catalyst-gateway/bin/src/db/index/queries/purge/cip36_registration_invalid.rs index acd56bdb0e95..295e6e2b6da4 100644 --- a/catalyst-gateway/bin/src/db/index/queries/purge/cip36_registration_invalid.rs +++ b/catalyst-gateway/bin/src/db/index/queries/purge/cip36_registration_invalid.rs @@ -1,24 +1,18 @@ //! CIP-36 Registration (Invalid) Queries used in purging data. -use std::{fmt::Debug, sync::Arc}; +use std::fmt::Debug; -use scylla::{ - prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, SerializeRow, - Session, -}; +use scylla::{transport::iterator::TypedRowStream, SerializeRow}; use tracing::error; use crate::{ db::{ index::{ - queries::{ - purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery}, - FallibleQueryResults, SizedBatch, - }, + queries::{FallibleQueryResults, Query}, session::CassandraSession, }, types::{DbSlot, DbTxnIndex}, }, - settings::cassandra_db, + impl_query_batch, impl_query_statement, }; pub(crate) mod result { @@ -66,28 +60,15 @@ impl From for Params { /// Get primary key for CIP-36 invalid registration query. pub(crate) struct PrimaryKeyQuery; -impl PrimaryKeyQuery { - /// Prepares a query to get all CIP-36 invalid registration primary keys. - pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { - PreparedQueries::prepare( - session.clone(), - SELECT_QUERY, - scylla::statement::Consistency::All, - true, - ) - .await - .inspect_err( - |error| error!(error=%error, "Failed to prepare get CIP-36 invalid registration primary key query."), - ) - .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}")) - } +impl_query_statement!(PrimaryKeyQuery, SELECT_QUERY); +impl PrimaryKeyQuery { /// Executes a query to get all CIP-36 invalid registration primary keys. pub(crate) async fn execute( session: &CassandraSession, ) -> anyhow::Result> { let iter = session - .purge_execute_iter(PreparedSelectQuery::Cip36RegistrationInvalid) + .purge_execute_iter(::type_id()) .await? .rows_stream::()?; @@ -101,32 +82,19 @@ const DELETE_QUERY: &str = include_str!("./cql/delete_cip36_registration_invalid /// Delete CIP-36 invalid registration Query pub(crate) struct DeleteQuery; -impl DeleteQuery { - /// Prepare Batch of Delete Queries - pub(crate) async fn prepare_batch( - session: &Arc, cfg: &cassandra_db::EnvVars, - ) -> anyhow::Result { - PreparedQueries::prepare_batch( - session.clone(), - DELETE_QUERY, - cfg, - scylla::statement::Consistency::Any, - true, - false, - ) - .await - .inspect_err( - |error| error!(error=%error, "Failed to prepare delete CIP-36 invalid registration primary key query."), - ) - .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}")) - } +impl_query_batch!(DeleteQuery, DELETE_QUERY); +impl DeleteQuery { /// Executes a DELETE Query pub(crate) async fn execute( session: &CassandraSession, params: Vec, ) -> FallibleQueryResults { let results = session - .purge_execute_batch(PreparedDeleteQuery::Cip36RegistrationInvalid, params) + .purge_execute_batch( + ::type_id(), + ::query_str(), + params, + ) .await?; Ok(results) diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/mod.rs b/catalyst-gateway/bin/src/db/index/queries/purge/mod.rs index bb0b1aa5153d..505b7456a307 100644 --- a/catalyst-gateway/bin/src/db/index/queries/purge/mod.rs +++ b/catalyst-gateway/bin/src/db/index/queries/purge/mod.rs @@ -13,283 +13,3 @@ pub(crate) mod txo_ada; pub(crate) mod txo_assets; pub(crate) mod unstaked_txo_ada; pub(crate) mod unstaked_txo_assets; - -use std::{fmt::Debug, sync::Arc}; - -use scylla::{ - prepared_statement::PreparedStatement, serialize::row::SerializeRow, - transport::iterator::QueryPager, Session, -}; - -use super::{FallibleQueryResults, SizedBatch}; -use crate::settings::cassandra_db; - -/// No parameters -const NO_PARAMS: () = (); - -/// All prepared DELETE query statements (purge DB table rows). -#[derive(strum_macros::Display)] -pub(crate) enum PreparedDeleteQuery { - /// TXO Delete query. - TxoAda, - /// TXO Assets Delete query. - TxoAssets, - /// Unstaked TXO Delete query. - UnstakedTxoAda, - /// Unstaked TXO Asset Delete query. - UnstakedTxoAsset, - /// TXI by TXN Hash Delete query. - Txi, - /// Stake Registration Delete query. - StakeRegistration, - /// CIP 36 Registration Delete Query. - Cip36Registration, - /// CIP 36 Registration Invalid Delete query. - Cip36RegistrationInvalid, - /// CIP 36 Registration for vote key Delete query. - Cip36RegistrationForVoteKey, - /// RBAC 509 Registration Delete query. - Rbac509, - /// Invalid RBAC 509 Registration Delete query. - Rbac509Invalid, - /// Catalyst ID For Transaction ID Delete query. - CatalystIdForTxnId, - /// Catalyst ID For Stake Address Delete query. - CatalystIdForStakeAddress, -} - -/// All prepared SELECT query statements (primary keys from table). -#[derive(strum_macros::Display)] -pub(crate) enum PreparedSelectQuery { - /// TXO Select query. - TxoAda, - /// TXO Asset Select query. - TxoAssets, - /// Unstaked TXO Select query. - UnstakedTxoAda, - /// Unstaked TXO Asset Select query. - UnstakedTxoAsset, - /// TXI by TXN Hash Select query. - Txi, - /// Stake Registration Select query. - StakeRegistration, - /// CIP 36 Registration Select Query. - Cip36Registration, - /// CIP 36 Registration Invalid Select query. - Cip36RegistrationInvalid, - /// CIP 36 Registration for vote key Select query. - Cip36RegistrationForVoteKey, - /// RBAC 509 Registration Select query. - Rbac509, - /// Invalid RBAC 509 Registration Select query. - Rbac509Invalid, - /// Catalyst ID For Transaction ID Select query. - CatalystIdForTxnId, - /// Catalyst ID For Stake Address Select query. - CatalystIdForStakeAddress, -} - -/// All prepared purge queries for a session. -pub(crate) struct PreparedQueries { - /// TXO ADA Primary Key Query. - select_txo_ada: PreparedStatement, - /// TXO Delete Query. - delete_txo_ada: SizedBatch, - /// TXO Asset Primary Key Query. - select_txo_assets: PreparedStatement, - /// TXO Assets Delete Query. - delete_txo_assets: SizedBatch, - /// Unstaked TXO ADA Primary Key Query. - select_unstaked_txo_ada: PreparedStatement, - /// Unstaked TXO ADA Delete Query. - delete_unstaked_txo_ada: SizedBatch, - /// Unstaked TXO Assets Primary Key Query. - select_unstaked_txo_assets: PreparedStatement, - /// Unstaked TXO Asset Delete Query. - delete_unstaked_txo_assets: SizedBatch, - /// TXI by TXN Hash by TXN Hash Primary Key Query. - select_txi_by_hash: PreparedStatement, - /// TXI by TXN Hash Delete Query. - delete_txi_by_hash: SizedBatch, - /// Stake Registration Primary Key Query. - select_stake_registration: PreparedStatement, - /// Stake Registration Delete Query. - delete_stake_registration: SizedBatch, - /// CIP36 Registrations Primary Key Query. - select_cip36_registration: PreparedStatement, - /// CIP36 Registrations Delete Query. - delete_cip36_registration: SizedBatch, - /// CIP36 Registration Invalid Primary Key Query. - select_cip36_registration_invalid: PreparedStatement, - /// CIP36 Registration Invalid Delete Query. - delete_cip36_registration_invalid: SizedBatch, - /// CIP36 Registration for Vote Key Primary Key Query. - select_cip36_registration_for_vote_key: PreparedStatement, - /// CIP36 Registration for Vote Key Delete Query. - delete_cip36_registration_for_vote_key: SizedBatch, - /// RBAC 509 Registrations Primary Key Query. - select_rbac509_registration: PreparedStatement, - /// RBAC 509 Registrations Delete Query. - delete_rbac509_registration: SizedBatch, - /// RBAC 509 invalid registrations Primary Key Query. - select_rbac509_invalid_registration: PreparedStatement, - /// RBAC 509 invalid registrations Delete Query. - delete_rbac509_invalid_registration: SizedBatch, - /// Catalyst ID for TX ID Primary Key Query.. - select_catalyst_id_for_txn_id: PreparedStatement, - /// Catalyst ID for TX ID Delete Query.. - delete_catalyst_id_for_txn_id: SizedBatch, - /// Catalyst ID for Stake Address Primary Key Query.. - select_catalyst_id_for_stake_address: PreparedStatement, - /// Catalyst ID for Stake Address Delete Query.. - delete_catalyst_id_for_stake_address: SizedBatch, -} - -impl PreparedQueries { - /// Create new prepared queries for a given session. - pub(crate) async fn new( - session: Arc, cfg: &cassandra_db::EnvVars, - ) -> anyhow::Result { - // We initialize like this, so that all errors preparing querys get shown before aborting. - Ok(Self { - select_txo_ada: txo_ada::PrimaryKeyQuery::prepare(&session).await?, - delete_txo_ada: txo_ada::DeleteQuery::prepare_batch(&session, cfg).await?, - select_txo_assets: txo_assets::PrimaryKeyQuery::prepare(&session).await?, - delete_txo_assets: txo_assets::DeleteQuery::prepare_batch(&session, cfg).await?, - select_unstaked_txo_ada: unstaked_txo_ada::PrimaryKeyQuery::prepare(&session).await?, - delete_unstaked_txo_ada: unstaked_txo_ada::DeleteQuery::prepare_batch(&session, cfg) - .await?, - select_unstaked_txo_assets: unstaked_txo_assets::PrimaryKeyQuery::prepare(&session) - .await?, - delete_unstaked_txo_assets: unstaked_txo_assets::DeleteQuery::prepare_batch( - &session, cfg, - ) - .await?, - select_txi_by_hash: txi_by_hash::PrimaryKeyQuery::prepare(&session).await?, - delete_txi_by_hash: txi_by_hash::DeleteQuery::prepare_batch(&session, cfg).await?, - select_stake_registration: stake_registration::PrimaryKeyQuery::prepare(&session) - .await?, - delete_stake_registration: stake_registration::DeleteQuery::prepare_batch( - &session, cfg, - ) - .await?, - select_cip36_registration: cip36_registration::PrimaryKeyQuery::prepare(&session) - .await?, - delete_cip36_registration: cip36_registration::DeleteQuery::prepare_batch( - &session, cfg, - ) - .await?, - select_cip36_registration_invalid: - cip36_registration_invalid::PrimaryKeyQuery::prepare(&session).await?, - delete_cip36_registration_invalid: - cip36_registration_invalid::DeleteQuery::prepare_batch(&session, cfg).await?, - select_cip36_registration_for_vote_key: - cip36_registration_for_vote_key::PrimaryKeyQuery::prepare(&session).await?, - delete_cip36_registration_for_vote_key: - cip36_registration_for_vote_key::DeleteQuery::prepare_batch(&session, cfg).await?, - select_rbac509_registration: rbac509_registration::PrimaryKeyQuery::prepare(&session) - .await?, - delete_rbac509_registration: rbac509_registration::DeleteQuery::prepare_batch( - &session, cfg, - ) - .await?, - select_rbac509_invalid_registration: - rbac509_invalid_registration::PrimaryKeyQuery::prepare(&session).await?, - delete_rbac509_invalid_registration: - rbac509_invalid_registration::DeleteQuery::prepare_batch(&session, cfg).await?, - select_catalyst_id_for_txn_id: catalyst_id_for_txn_id::PrimaryKeyQuery::prepare( - &session, - ) - .await?, - delete_catalyst_id_for_txn_id: catalyst_id_for_txn_id::DeleteQuery::prepare_batch( - &session, cfg, - ) - .await?, - select_catalyst_id_for_stake_address: - catalyst_id_for_stake_address::PrimaryKeyQuery::prepare(&session).await?, - delete_catalyst_id_for_stake_address: - catalyst_id_for_stake_address::DeleteQuery::prepare_batch(&session, cfg).await?, - }) - } - - /// Prepares a statement. - pub(crate) async fn prepare( - session: Arc, query: &str, consistency: scylla::statement::Consistency, - idempotent: bool, - ) -> anyhow::Result { - super::PreparedQueries::prepare(session, query, consistency, idempotent).await - } - - /// Prepares all permutations of the batch from 1 to max. - /// It is necessary to do this because batches are pre-sized, they can not be dynamic. - /// Preparing the batches in advance is a very larger performance increase. - pub(crate) async fn prepare_batch( - session: Arc, query: &str, cfg: &cassandra_db::EnvVars, - consistency: scylla::statement::Consistency, idempotent: bool, logged: bool, - ) -> anyhow::Result { - super::PreparedQueries::prepare_batch(session, query, cfg, consistency, idempotent, logged) - .await - } - - /// Executes a select query with the given parameters. - /// - /// Returns an iterator that iterates over all the result pages that the query - /// returns. - pub(crate) async fn execute_iter( - &self, session: Arc, select_query: PreparedSelectQuery, - ) -> anyhow::Result { - let prepared_stmt = match select_query { - PreparedSelectQuery::TxoAda => &self.select_txo_ada, - PreparedSelectQuery::TxoAssets => &self.select_txo_assets, - PreparedSelectQuery::UnstakedTxoAda => &self.select_unstaked_txo_ada, - PreparedSelectQuery::UnstakedTxoAsset => &self.select_unstaked_txo_assets, - PreparedSelectQuery::Txi => &self.select_txi_by_hash, - PreparedSelectQuery::StakeRegistration => &self.select_stake_registration, - PreparedSelectQuery::Cip36Registration => &self.select_cip36_registration, - PreparedSelectQuery::Cip36RegistrationInvalid => { - &self.select_cip36_registration_invalid - }, - PreparedSelectQuery::Cip36RegistrationForVoteKey => { - &self.select_cip36_registration_for_vote_key - }, - PreparedSelectQuery::Rbac509 => &self.select_rbac509_registration, - PreparedSelectQuery::Rbac509Invalid => &self.select_rbac509_invalid_registration, - PreparedSelectQuery::CatalystIdForTxnId => &self.select_catalyst_id_for_txn_id, - PreparedSelectQuery::CatalystIdForStakeAddress => { - &self.select_catalyst_id_for_stake_address - }, - }; - - super::session_execute_iter(session, prepared_stmt, NO_PARAMS).await - } - - /// Execute a purge query with the given parameters. - pub(crate) async fn execute_batch( - &self, session: Arc, cfg: Arc, query: PreparedDeleteQuery, - values: Vec, - ) -> FallibleQueryResults { - let query_map = match query { - PreparedDeleteQuery::TxoAda => &self.delete_txo_ada, - PreparedDeleteQuery::TxoAssets => &self.delete_txo_assets, - PreparedDeleteQuery::UnstakedTxoAda => &self.delete_unstaked_txo_ada, - PreparedDeleteQuery::UnstakedTxoAsset => &self.delete_unstaked_txo_assets, - PreparedDeleteQuery::Txi => &self.delete_txi_by_hash, - PreparedDeleteQuery::StakeRegistration => &self.delete_stake_registration, - PreparedDeleteQuery::Cip36Registration => &self.delete_cip36_registration, - PreparedDeleteQuery::Cip36RegistrationInvalid => { - &self.delete_cip36_registration_invalid - }, - PreparedDeleteQuery::Cip36RegistrationForVoteKey => { - &self.delete_cip36_registration_for_vote_key - }, - PreparedDeleteQuery::Rbac509 => &self.delete_rbac509_registration, - PreparedDeleteQuery::Rbac509Invalid => &self.delete_rbac509_invalid_registration, - PreparedDeleteQuery::CatalystIdForTxnId => &self.delete_catalyst_id_for_txn_id, - PreparedDeleteQuery::CatalystIdForStakeAddress => { - &self.delete_catalyst_id_for_stake_address - }, - }; - - super::session_execute_batch(session, query_map, cfg, query, values).await - } -} diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/rbac509_invalid_registration.rs b/catalyst-gateway/bin/src/db/index/queries/purge/rbac509_invalid_registration.rs index b5e9a5c004dd..d198aac27b33 100644 --- a/catalyst-gateway/bin/src/db/index/queries/purge/rbac509_invalid_registration.rs +++ b/catalyst-gateway/bin/src/db/index/queries/purge/rbac509_invalid_registration.rs @@ -1,24 +1,18 @@ //! RBAC 509 Registration Queries used in purging data. -use std::{fmt::Debug, sync::Arc}; +use std::fmt::Debug; -use scylla::{ - prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, SerializeRow, - Session, -}; +use scylla::{transport::iterator::TypedRowStream, SerializeRow}; use tracing::error; use crate::{ db::{ index::{ - queries::{ - purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery}, - FallibleQueryResults, SizedBatch, - }, + queries::{FallibleQueryResults, Query}, session::CassandraSession, }, types::{DbCatalystId, DbTransactionId}, }, - settings::cassandra_db, + impl_query_batch, impl_query_statement, }; pub(crate) mod result { @@ -62,28 +56,15 @@ impl From for Params { /// Get primary key for RBAC 509 invalid registration query. pub(crate) struct PrimaryKeyQuery; -impl PrimaryKeyQuery { - /// Prepares a query to get all RBAC 509 invalid registration primary keys. - pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { - PreparedQueries::prepare( - session.clone(), - SELECT_QUERY, - scylla::statement::Consistency::All, - true, - ) - .await - .inspect_err( - |error| error!(error=%error, "Failed to prepare get RBAC 509 invalid registration primary key query."), - ) - .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}")) - } +impl_query_statement!(PrimaryKeyQuery, DELETE_QUERY); +impl PrimaryKeyQuery { /// Executes a query to get all RBAC 509 invalid registration primary keys. pub(crate) async fn execute( session: &CassandraSession, ) -> anyhow::Result> { let iter = session - .purge_execute_iter(PreparedSelectQuery::Rbac509Invalid) + .purge_execute_iter(::type_id()) .await? .rows_stream::()?; @@ -97,32 +78,19 @@ const DELETE_QUERY: &str = include_str!("cql/delete_rbac_invalid_registration.cq /// Delete RBAC 509 invalid registration Query pub(crate) struct DeleteQuery; -impl DeleteQuery { - /// Prepare Batch of Delete Queries - pub(crate) async fn prepare_batch( - session: &Arc, cfg: &cassandra_db::EnvVars, - ) -> anyhow::Result { - PreparedQueries::prepare_batch( - session.clone(), - DELETE_QUERY, - cfg, - scylla::statement::Consistency::Any, - true, - false, - ) - .await - .inspect_err( - |error| error!(error=%error, "Failed to prepare delete RBAC 509 invalid registration primary key query."), - ) - .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}")) - } +impl_query_batch!(DeleteQuery, DELETE_QUERY); +impl DeleteQuery { /// Executes a DELETE Query pub(crate) async fn execute( session: &CassandraSession, params: Vec, ) -> FallibleQueryResults { let results = session - .purge_execute_batch(PreparedDeleteQuery::Rbac509Invalid, params) + .purge_execute_batch( + ::type_id(), + ::query_str(), + params, + ) .await?; Ok(results) } diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/rbac509_registration.rs b/catalyst-gateway/bin/src/db/index/queries/purge/rbac509_registration.rs index 6a6a04f4907e..47252dbec6b1 100644 --- a/catalyst-gateway/bin/src/db/index/queries/purge/rbac509_registration.rs +++ b/catalyst-gateway/bin/src/db/index/queries/purge/rbac509_registration.rs @@ -1,24 +1,18 @@ //! RBAC 509 Registration Queries used in purging data. -use std::{fmt::Debug, sync::Arc}; +use std::fmt::Debug; -use scylla::{ - prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, SerializeRow, - Session, -}; +use scylla::{transport::iterator::TypedRowStream, SerializeRow}; use tracing::error; use crate::{ db::{ index::{ - queries::{ - purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery}, - FallibleQueryResults, SizedBatch, - }, + queries::{FallibleQueryResults, Query}, session::CassandraSession, }, types::{DbCatalystId, DbTransactionId}, }, - settings::cassandra_db, + impl_query_batch, impl_query_statement, }; pub(crate) mod result { @@ -62,28 +56,15 @@ impl From for Params { /// Get primary key for RBAC 509 registration query. pub(crate) struct PrimaryKeyQuery; -impl PrimaryKeyQuery { - /// Prepares a query to get all RBAC 509 registration primary keys. - pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { - PreparedQueries::prepare( - session.clone(), - SELECT_QUERY, - scylla::statement::Consistency::All, - true, - ) - .await - .inspect_err( - |error| error!(error=%error, "Failed to prepare get RBAC 509 registration primary key query."), - ) - .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}")) - } +impl_query_statement!(PrimaryKeyQuery, SELECT_QUERY); +impl PrimaryKeyQuery { /// Executes a query to get all RBAC 509 registration primary keys. pub(crate) async fn execute( session: &CassandraSession, ) -> anyhow::Result> { let iter = session - .purge_execute_iter(PreparedSelectQuery::Rbac509) + .purge_execute_iter(::type_id()) .await? .rows_stream::()?; @@ -97,32 +78,19 @@ const DELETE_QUERY: &str = include_str!("cql/delete_rbac_registration.cql"); /// Delete RBAC 509 registration Query pub(crate) struct DeleteQuery; -impl DeleteQuery { - /// Prepare Batch of Delete Queries - pub(crate) async fn prepare_batch( - session: &Arc, cfg: &cassandra_db::EnvVars, - ) -> anyhow::Result { - PreparedQueries::prepare_batch( - session.clone(), - DELETE_QUERY, - cfg, - scylla::statement::Consistency::Any, - true, - false, - ) - .await - .inspect_err( - |error| error!(error=%error, "Failed to prepare delete RBAC 509 registration primary key query."), - ) - .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}")) - } +impl_query_batch!(DeleteQuery, DELETE_QUERY); +impl DeleteQuery { /// Executes a DELETE Query pub(crate) async fn execute( session: &CassandraSession, params: Vec, ) -> FallibleQueryResults { let results = session - .purge_execute_batch(PreparedDeleteQuery::Rbac509, params) + .purge_execute_batch( + ::type_id(), + ::query_str(), + params, + ) .await?; Ok(results) diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/stake_registration.rs b/catalyst-gateway/bin/src/db/index/queries/purge/stake_registration.rs index 406c216bb0a7..50b5a6aba040 100644 --- a/catalyst-gateway/bin/src/db/index/queries/purge/stake_registration.rs +++ b/catalyst-gateway/bin/src/db/index/queries/purge/stake_registration.rs @@ -1,24 +1,18 @@ //! Stake Registration Queries used in purging data. -use std::{fmt::Debug, sync::Arc}; +use std::fmt::Debug; -use scylla::{ - prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, SerializeRow, - Session, -}; +use scylla::{transport::iterator::TypedRowStream, SerializeRow}; use tracing::error; use crate::{ db::{ index::{ - queries::{ - purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery}, - FallibleQueryResults, SizedBatch, - }, + queries::{FallibleQueryResults, Query}, session::CassandraSession, }, types::{DbSlot, DbStakeAddress, DbTxnIndex}, }, - settings::cassandra_db, + impl_query_batch, impl_query_statement, }; pub(crate) mod result { @@ -70,28 +64,15 @@ impl From for Params { /// Get primary key for Stake Registration query. pub(crate) struct PrimaryKeyQuery; -impl PrimaryKeyQuery { - /// Prepares a query to get all Stake Registration primary keys. - pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { - PreparedQueries::prepare( - session.clone(), - SELECT_QUERY, - scylla::statement::Consistency::All, - true, - ) - .await - .inspect_err( - |error| error!(error=%error, "Failed to prepare get Stake Registration primary key query."), - ) - .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}")) - } +impl_query_statement!(PrimaryKeyQuery, SELECT_QUERY); +impl PrimaryKeyQuery { /// Executes a query to get all Stake Registration primary keys. pub(crate) async fn execute( session: &CassandraSession, ) -> anyhow::Result> { let iter = session - .purge_execute_iter(PreparedSelectQuery::StakeRegistration) + .purge_execute_iter(::type_id()) .await? .rows_stream::()?; @@ -105,32 +86,19 @@ const DELETE_QUERY: &str = include_str!("./cql/delete_stake_registration.cql"); /// Delete Stake Registration Query pub(crate) struct DeleteQuery; -impl DeleteQuery { - /// Prepare Batch of Delete Queries - pub(crate) async fn prepare_batch( - session: &Arc, cfg: &cassandra_db::EnvVars, - ) -> anyhow::Result { - PreparedQueries::prepare_batch( - session.clone(), - DELETE_QUERY, - cfg, - scylla::statement::Consistency::Any, - true, - false, - ) - .await - .inspect_err( - |error| error!(error=%error, "Failed to prepare delete Stake Registration primary key query."), - ) - .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}")) - } +impl_query_batch!(DeleteQuery, DELETE_QUERY); +impl DeleteQuery { /// Executes a DELETE Query pub(crate) async fn execute( session: &CassandraSession, params: Vec, ) -> FallibleQueryResults { let results = session - .purge_execute_batch(PreparedDeleteQuery::StakeRegistration, params) + .purge_execute_batch( + ::type_id(), + ::query_str(), + params, + ) .await?; Ok(results) diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/txi_by_hash.rs b/catalyst-gateway/bin/src/db/index/queries/purge/txi_by_hash.rs index 20e59d301a52..40fdfdaed859 100644 --- a/catalyst-gateway/bin/src/db/index/queries/purge/txi_by_hash.rs +++ b/catalyst-gateway/bin/src/db/index/queries/purge/txi_by_hash.rs @@ -18,6 +18,7 @@ use crate::{ }, types::{DbTransactionId, DbTxnOutputOffset}, }, + impl_query_batch, impl_query_statement, settings::cassandra_db, }; @@ -62,6 +63,8 @@ impl From for Params { /// Get primary key for TXI by hash query. pub(crate) struct PrimaryKeyQuery; +impl_query_statement!(PrimaryKeyQuery, SELECT_QUERY); + impl PrimaryKeyQuery { /// Prepares a query to get all TXI by hash primary keys. pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { @@ -97,6 +100,8 @@ const DELETE_QUERY: &str = include_str!("./cql/delete_txi_by_txn_hashes.cql"); /// Delete TXI by hash Query pub(crate) struct DeleteQuery; +impl_query_batch!(DeleteQuery, DELETE_QUERY); + impl DeleteQuery { /// Prepare Batch of Delete Queries pub(crate) async fn prepare_batch( diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/txo_ada.rs b/catalyst-gateway/bin/src/db/index/queries/purge/txo_ada.rs index 7d0be42ed96d..cb1f811d6253 100644 --- a/catalyst-gateway/bin/src/db/index/queries/purge/txo_ada.rs +++ b/catalyst-gateway/bin/src/db/index/queries/purge/txo_ada.rs @@ -18,6 +18,7 @@ use crate::{ }, types::{DbSlot, DbStakeAddress, DbTxnIndex, DbTxnOutputOffset}, }, + impl_query_batch, impl_query_statement, settings::cassandra_db, }; @@ -70,9 +71,11 @@ impl From for Params { /// Get primary key for TXO by Stake Address query. pub(crate) struct PrimaryKeyQuery; +impl_query_statement!(PrimaryKeyQuery, SELECT_QUERY); + impl PrimaryKeyQuery { /// Prepares a query to get all TXO by stake address primary keys. - pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { + pub(crate) async fn prepare(session: Arc) -> anyhow::Result { PreparedQueries::prepare( session.clone(), SELECT_QUERY, @@ -105,6 +108,8 @@ const DELETE_QUERY: &str = include_str!("cql/delete_txo_by_stake_address.cql"); /// Delete TXO by Stake Address Query pub(crate) struct DeleteQuery; +impl_query_batch!(DeleteQuery, DELETE_QUERY); + impl DeleteQuery { /// Prepare Batch of Delete Queries pub(crate) async fn prepare_batch( diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/txo_assets.rs b/catalyst-gateway/bin/src/db/index/queries/purge/txo_assets.rs index a4597c4f1de0..b6a79b53df0d 100644 --- a/catalyst-gateway/bin/src/db/index/queries/purge/txo_assets.rs +++ b/catalyst-gateway/bin/src/db/index/queries/purge/txo_assets.rs @@ -18,6 +18,7 @@ use crate::{ }, types::{DbSlot, DbStakeAddress, DbTxnIndex, DbTxnOutputOffset}, }, + impl_query_batch, impl_query_statement, settings::cassandra_db, }; @@ -85,6 +86,8 @@ impl From for Params { /// Get primary key for TXO Assets by Stake Address query. pub(crate) struct PrimaryKeyQuery; +impl_query_statement!(PrimaryKeyQuery, SELECT_QUERY); + impl PrimaryKeyQuery { /// Prepares a query to get all TXO Assets by stake address primary keys. pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { @@ -120,6 +123,8 @@ const DELETE_QUERY: &str = include_str!("cql/delete_txo_assets_by_stake_address. /// Delete TXO Assets by Stake Address Query pub(crate) struct DeleteQuery; +impl_query_batch!(DeleteQuery, DELETE_QUERY); + impl DeleteQuery { /// Prepare Batch of Delete Queries pub(crate) async fn prepare_batch( diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/unstaked_txo_ada.rs b/catalyst-gateway/bin/src/db/index/queries/purge/unstaked_txo_ada.rs index 2b8f0ef424ae..28305863042b 100644 --- a/catalyst-gateway/bin/src/db/index/queries/purge/unstaked_txo_ada.rs +++ b/catalyst-gateway/bin/src/db/index/queries/purge/unstaked_txo_ada.rs @@ -19,6 +19,7 @@ use crate::{ }, types::{DbTransactionId, DbTxnOutputOffset}, }, + impl_query_batch, impl_query_statement, settings::cassandra_db, }; @@ -63,6 +64,8 @@ impl From for Params { /// Get primary key for Unstaked TXO ADA query. pub(crate) struct PrimaryKeyQuery; +impl_query_statement!(PrimaryKeyQuery, SELECT_QUERY); + impl PrimaryKeyQuery { /// Prepares a query to get all Unstaked TXO ADA primary keys. pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { @@ -98,6 +101,8 @@ const DELETE_QUERY: &str = include_str!("./cql/delete_unstaked_txo_by_txn_hash.c /// Delete TXO by Stake Address Query pub(crate) struct DeleteQuery; +impl_query_batch!(DeleteQuery, DELETE_QUERY); + impl DeleteQuery { /// Prepare Batch of Delete Queries pub(crate) async fn prepare_batch( diff --git a/catalyst-gateway/bin/src/db/index/queries/purge/unstaked_txo_assets.rs b/catalyst-gateway/bin/src/db/index/queries/purge/unstaked_txo_assets.rs index 4d67842d5354..73568585391d 100644 --- a/catalyst-gateway/bin/src/db/index/queries/purge/unstaked_txo_assets.rs +++ b/catalyst-gateway/bin/src/db/index/queries/purge/unstaked_txo_assets.rs @@ -18,6 +18,7 @@ use crate::{ }, types::{DbTransactionId, DbTxnOutputOffset}, }, + impl_query_batch, impl_query_statement, settings::cassandra_db, }; @@ -76,6 +77,8 @@ impl From for Params { /// Get primary key for TXO Assets by TXN Hash query. pub(crate) struct PrimaryKeyQuery; +impl_query_statement!(PrimaryKeyQuery, SELECT_QUERY); + impl PrimaryKeyQuery { /// Prepares a query to get all TXO Assets by TXN Hash primary keys. pub(crate) async fn prepare(session: &Arc) -> anyhow::Result { @@ -111,6 +114,8 @@ const DELETE_QUERY: &str = include_str!("./cql/delete_unstaked_txo_assets_by_txn /// Delete TXO Assets by TXN Hash Query pub(crate) struct DeleteQuery; +impl_query_batch!(DeleteQuery, DELETE_QUERY); + impl DeleteQuery { /// Prepare Batch of Delete Queries pub(crate) async fn prepare_batch( diff --git a/catalyst-gateway/bin/src/db/index/queries/rbac/get_catalyst_id_from_stake_address.rs b/catalyst-gateway/bin/src/db/index/queries/rbac/get_catalyst_id_from_stake_address.rs index 4e6dd70cb612..c4f532aeeec5 100644 --- a/catalyst-gateway/bin/src/db/index/queries/rbac/get_catalyst_id_from_stake_address.rs +++ b/catalyst-gateway/bin/src/db/index/queries/rbac/get_catalyst_id_from_stake_address.rs @@ -8,12 +8,15 @@ use scylla::{ }; use tracing::error; -use crate::db::{ - index::{ - queries::{PreparedQueries, PreparedSelectQuery}, - session::CassandraSession, +use crate::{ + db::{ + index::{ + queries::{PreparedQueries, PreparedSelectQuery}, + session::CassandraSession, + }, + types::{DbCatalystId, DbSlot, DbStakeAddress}, }, - types::{DbCatalystId, DbSlot, DbStakeAddress}, + impl_query_statement, }; /// Get Catalyst ID by stake address query string. @@ -28,14 +31,16 @@ pub(crate) struct QueryParams { /// Get Catalyst ID by stake address query. #[derive(Debug, Clone, DeserializeRow)] -pub(crate) struct Query { +pub(crate) struct GetCatalystIdForStakeAddress { /// Catalyst ID for the queries stake address. pub catalyst_id: DbCatalystId, /// A slot number. pub slot_no: DbSlot, } -impl Query { +impl_query_statement!(GetCatalystIdForStakeAddress, QUERY); + +impl GetCatalystIdForStakeAddress { /// Prepares a get Catalyst ID by stake address query. pub(crate) async fn prepare(session: Arc) -> anyhow::Result { PreparedQueries::prepare(session, QUERY, Consistency::All, true) @@ -48,11 +53,11 @@ impl Query { /// Executes a get Catalyst ID by stake address query. pub(crate) async fn execute( session: &CassandraSession, params: QueryParams, - ) -> anyhow::Result> { + ) -> anyhow::Result> { session .execute_iter(PreparedSelectQuery::CatalystIdByStakeAddress, params) .await? - .rows_stream::() + .rows_stream::() .map_err(Into::into) } } diff --git a/catalyst-gateway/bin/src/db/index/queries/rbac/get_catalyst_id_from_transaction_id.rs b/catalyst-gateway/bin/src/db/index/queries/rbac/get_catalyst_id_from_transaction_id.rs index 8a63507b1295..2470a1a8267b 100644 --- a/catalyst-gateway/bin/src/db/index/queries/rbac/get_catalyst_id_from_transaction_id.rs +++ b/catalyst-gateway/bin/src/db/index/queries/rbac/get_catalyst_id_from_transaction_id.rs @@ -13,17 +13,20 @@ use scylla::{ }; use tracing::error; -use crate::db::{ - index::{ - queries::{PreparedQueries, PreparedSelectQuery}, - session::CassandraSession, +use crate::{ + db::{ + index::{ + queries::{PreparedQueries, PreparedSelectQuery}, + session::CassandraSession, + }, + types::{DbCatalystId, DbTransactionId}, }, - types::{DbCatalystId, DbTransactionId}, + impl_query_statement, }; /// Cached data. #[allow(dead_code)] -static CACHE: LazyLock> = LazyLock::new(|| { +static CACHE: LazyLock> = LazyLock::new(|| { Cache::builder() .eviction_policy(EvictionPolicy::lru()) .build() @@ -41,12 +44,14 @@ pub(crate) struct QueryParams { /// Get Catalyst ID by stake address query. #[derive(Debug, Clone, DeserializeRow)] -pub(crate) struct Query { +pub(crate) struct GetCatalystIdForTxnId { /// A Catalyst ID. pub catalyst_id: DbCatalystId, } -impl Query { +impl_query_statement!(GetCatalystIdForTxnId, QUERY); + +impl GetCatalystIdForTxnId { /// Prepares a get catalyst ID by transaction ID query. pub(crate) async fn prepare(session: Arc) -> Result { PreparedQueries::prepare(session, QUERY, Consistency::All, true) @@ -61,11 +66,11 @@ impl Query { /// Don't call directly, use one of the methods instead. pub(crate) async fn execute( session: &CassandraSession, params: QueryParams, - ) -> Result> { + ) -> Result> { session .execute_iter(PreparedSelectQuery::CatalystIdByTransactionId, params) .await? - .rows_stream::() + .rows_stream::() .map_err(Into::into) } @@ -74,7 +79,7 @@ impl Query { /// Unless you really know you need an uncached result, use the cached version. pub(crate) async fn get_latest_uncached( session: &CassandraSession, txn_id: DbTransactionId, - ) -> Result> { + ) -> Result> { Self::execute(session, QueryParams { txn_id }) .await? .next() @@ -86,7 +91,7 @@ impl Query { /// Gets the latest Catalyst ID for the given transaction ID. pub(crate) async fn get_latest( session: &CassandraSession, transaction_id: DbTransactionId, - ) -> Result> { + ) -> Result> { // TODO: Caching is disabled because we want to measure the performance without it and be // sure that the logic is sound. Also caches needs to be tunable. Self::get_latest_uncached(session, transaction_id).await diff --git a/catalyst-gateway/bin/src/db/index/queries/rbac/get_rbac_invalid_registrations.rs b/catalyst-gateway/bin/src/db/index/queries/rbac/get_rbac_invalid_registrations.rs index d1187e0ad48d..412aff5efa4e 100644 --- a/catalyst-gateway/bin/src/db/index/queries/rbac/get_rbac_invalid_registrations.rs +++ b/catalyst-gateway/bin/src/db/index/queries/rbac/get_rbac_invalid_registrations.rs @@ -8,12 +8,15 @@ use scylla::{ }; use tracing::error; -use crate::db::{ - index::{ - queries::{PreparedQueries, PreparedSelectQuery}, - session::CassandraSession, +use crate::{ + db::{ + index::{ + queries::{PreparedQueries, PreparedSelectQuery}, + session::CassandraSession, + }, + types::{DbCatalystId, DbSlot, DbTransactionId, DbTxnIndex, DbUuidV4}, }, - types::{DbCatalystId, DbSlot, DbTransactionId, DbTxnIndex, DbUuidV4}, + impl_query_statement, }; /// Get invalid registrations by Catalyst ID query. @@ -29,7 +32,7 @@ pub(crate) struct QueryParams { /// Get invalid registrations by Catalyst ID query. #[allow(dead_code)] #[derive(DeserializeRow)] -pub(crate) struct Query { +pub(crate) struct GetRbac509InvalidRegistrations { /// Registration transaction id. pub txn_id: DbTransactionId, /// A block slot number. @@ -44,7 +47,9 @@ pub(crate) struct Query { pub problem_report: String, } -impl Query { +impl_query_statement!(GetRbac509InvalidRegistrations, QUERY); + +impl GetRbac509InvalidRegistrations { /// Prepares a query. pub(crate) async fn prepare(session: Arc) -> anyhow::Result { PreparedQueries::prepare(session, QUERY, Consistency::All, true) @@ -59,14 +64,14 @@ impl Query { #[allow(dead_code)] pub(crate) async fn execute( session: &CassandraSession, params: QueryParams, - ) -> anyhow::Result> { + ) -> anyhow::Result> { session .execute_iter( PreparedSelectQuery::RbacInvalidRegistrationsByCatalystId, params, ) .await? - .rows_stream::() + .rows_stream::() .map_err(Into::into) } } diff --git a/catalyst-gateway/bin/src/db/index/queries/rbac/get_rbac_registrations.rs b/catalyst-gateway/bin/src/db/index/queries/rbac/get_rbac_registrations.rs index 4eeb3e64fe1f..246c356fb3ae 100644 --- a/catalyst-gateway/bin/src/db/index/queries/rbac/get_rbac_registrations.rs +++ b/catalyst-gateway/bin/src/db/index/queries/rbac/get_rbac_registrations.rs @@ -14,12 +14,15 @@ use scylla::{ }; use tracing::{debug, error}; -use crate::db::{ - index::{ - queries::{PreparedQueries, PreparedSelectQuery}, - session::CassandraSession, +use crate::{ + db::{ + index::{ + queries::{PreparedQueries, PreparedSelectQuery}, + session::CassandraSession, + }, + types::{DbCatalystId, DbSlot, DbTransactionId, DbTxnIndex, DbUuidV4}, }, - types::{DbCatalystId, DbSlot, DbTransactionId, DbTxnIndex, DbUuidV4}, + impl_query_statement, }; /// Get registrations by Catalyst ID query. @@ -34,7 +37,7 @@ pub(crate) struct QueryParams { /// Get registrations by Catalyst ID query. #[derive(DeserializeRow, Clone)] -pub(crate) struct Query { +pub(crate) struct GetRbac509Registrations { /// Registration transaction id. #[allow(dead_code)] pub txn_id: DbTransactionId, @@ -49,7 +52,9 @@ pub(crate) struct Query { pub purpose: DbUuidV4, } -impl Query { +impl_query_statement!(GetRbac509Registrations, QUERY); + +impl GetRbac509Registrations { /// Prepares a query. pub(crate) async fn prepare(session: Arc) -> anyhow::Result { PreparedQueries::prepare(session, QUERY, Consistency::All, true) @@ -62,11 +67,11 @@ impl Query { /// Executes a get registrations by Catalyst ID query. pub(crate) async fn execute( session: &CassandraSession, params: QueryParams, - ) -> anyhow::Result> { + ) -> anyhow::Result> { session .execute_iter(PreparedSelectQuery::RbacRegistrationsByCatalystId, params) .await? - .rows_stream::() + .rows_stream::() .map_err(Into::into) } } @@ -75,8 +80,8 @@ impl Query { /// database. pub(crate) async fn indexed_registrations( session: &CassandraSession, catalyst_id: &CatalystId, -) -> anyhow::Result> { - let mut result: Vec<_> = Query::execute(session, QueryParams { +) -> anyhow::Result> { + let mut result: Vec<_> = GetRbac509Registrations::execute(session, QueryParams { catalyst_id: catalyst_id.clone().into(), }) .and_then(|r| r.try_collect().map_err(Into::into)) @@ -90,7 +95,7 @@ pub(crate) async fn indexed_registrations( /// /// # NOTE: provided `reg_queries` must be sorted by `slot_no`, look into `indexed_registrations` function. pub(crate) async fn build_reg_chain( - mut reg_queries_iter: impl Iterator, network: Network, + mut reg_queries_iter: impl Iterator, network: Network, mut on_success: OnSuccessFn, ) -> anyhow::Result> { let Some((is_persistent, root)) = reg_queries_iter.next() else { diff --git a/catalyst-gateway/bin/src/db/index/queries/registrations/get_all_invalids.rs b/catalyst-gateway/bin/src/db/index/queries/registrations/get_all_invalids.rs index 8b475e5b968a..6269cf0c943d 100644 --- a/catalyst-gateway/bin/src/db/index/queries/registrations/get_all_invalids.rs +++ b/catalyst-gateway/bin/src/db/index/queries/registrations/get_all_invalids.rs @@ -8,9 +8,12 @@ use scylla::{ }; use tracing::error; -use crate::db::index::{ - queries::{PreparedQueries, PreparedSelectQuery}, - session::CassandraSession, +use crate::{ + db::index::{ + queries::{PreparedQueries, PreparedSelectQuery}, + session::CassandraSession, + }, + impl_query_statement, }; /// Get all invalid registrations @@ -39,6 +42,8 @@ pub(crate) struct GetAllInvalidRegistrationsQuery { pub cip36: bool, } +impl_query_statement!(GetAllInvalidRegistrationsQuery, GET_ALL_INVALIDS); + impl GetAllInvalidRegistrationsQuery { /// Prepares get all registrations pub(crate) async fn prepare(session: Arc) -> anyhow::Result { diff --git a/catalyst-gateway/bin/src/db/index/queries/registrations/get_all_registrations.rs b/catalyst-gateway/bin/src/db/index/queries/registrations/get_all_registrations.rs index 1f0cbed63bf0..a30a719522ba 100644 --- a/catalyst-gateway/bin/src/db/index/queries/registrations/get_all_registrations.rs +++ b/catalyst-gateway/bin/src/db/index/queries/registrations/get_all_registrations.rs @@ -1,19 +1,14 @@ //! Get all registrations for snapshot -use std::sync::Arc; - -use scylla::{ - prepared_statement::PreparedStatement, transport::iterator::TypedRowStream, DeserializeRow, - SerializeRow, Session, -}; +use scylla::{transport::iterator::TypedRowStream, DeserializeRow, SerializeRow, Session}; use tracing::error; -use crate::db::{ - index::{ - queries::{PreparedQueries, PreparedSelectQuery}, - session::CassandraSession, +use crate::{ + db::{ + index::{queries::Query, session::CassandraSession}, + types::DbTxnIndex, }, - types::DbTxnIndex, + impl_query_statement, }; /// Get all registrations @@ -44,26 +39,15 @@ pub(crate) struct GetAllRegistrationsQuery { pub cip36: bool, } -impl GetAllRegistrationsQuery { - /// Prepares get all registrations - pub(crate) async fn prepare(session: Arc) -> anyhow::Result { - PreparedQueries::prepare( - session, - GET_ALL_REGISTRATIONS, - scylla::statement::Consistency::All, - true, - ) - .await - .inspect_err(|error| error!(error=%error, "Failed to prepare get all registrations")) - .map_err(|error| anyhow::anyhow!("{error}\n--\n{GET_ALL_REGISTRATIONS}")) - } +impl_query_statement!(GetAllRegistrationsQuery, GET_ALL_REGISTRATIONS); +impl GetAllRegistrationsQuery { /// Executes get all registrations for snapshot pub(crate) async fn execute( session: &CassandraSession, params: GetAllRegistrationsParams, ) -> anyhow::Result> { let iter = session - .execute_iter(PreparedSelectQuery::GetAllRegistrations, params) + .execute_iter(::type_id(), params) .await? .rows_stream::()?; diff --git a/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_stake_addr.rs b/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_stake_addr.rs index f1cdf090c4af..081edfbae385 100644 --- a/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_stake_addr.rs +++ b/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_stake_addr.rs @@ -8,12 +8,15 @@ use scylla::{ }; use tracing::error; -use crate::db::{ - index::{ - queries::{PreparedQueries, PreparedSelectQuery}, - session::CassandraSession, +use crate::{ + db::{ + index::{ + queries::{PreparedQueries, PreparedSelectQuery}, + session::CassandraSession, + }, + types::{DbSlot, DbTxnIndex}, }, - types::{DbSlot, DbTxnIndex}, + impl_query_statement, }; /// Get registrations from stake addr query. @@ -54,6 +57,11 @@ pub(crate) struct GetRegistrationQuery { pub cip36: bool, } +impl_query_statement!( + GetRegistrationQuery, + GET_REGISTRATIONS_FROM_STAKE_ADDR_QUERY +); + impl GetRegistrationQuery { /// Prepares a get registration query. pub(crate) async fn prepare(session: Arc) -> anyhow::Result { diff --git a/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_stake_address.rs b/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_stake_address.rs index 5f79629f457c..b5cd0131dac4 100644 --- a/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_stake_address.rs +++ b/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_stake_address.rs @@ -9,12 +9,15 @@ use scylla::{ }; use tracing::error; -use crate::db::{ - index::{ - queries::{PreparedQueries, PreparedSelectQuery}, - session::CassandraSession, +use crate::{ + db::{ + index::{ + queries::{PreparedQueries, PreparedSelectQuery}, + session::CassandraSession, + }, + types::DbStakeAddress, }, - types::DbStakeAddress, + impl_query_statement, }; /// Get stake addr from stake hash query string. @@ -43,6 +46,8 @@ pub(crate) struct GetStakeAddrQuery { pub stake_public_key: Vec, } +impl_query_statement!(GetStakeAddrQuery, GET_QUERY); + impl GetStakeAddrQuery { /// Prepares a get get stake addr from stake hash query. pub(crate) async fn prepare(session: Arc) -> anyhow::Result { diff --git a/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_vote_key.rs b/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_vote_key.rs index b67a2ed20fa7..300cb5519c44 100644 --- a/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_vote_key.rs +++ b/catalyst-gateway/bin/src/db/index/queries/registrations/get_from_vote_key.rs @@ -7,9 +7,12 @@ use scylla::{ }; use tracing::error; -use crate::db::index::{ - queries::{PreparedQueries, PreparedSelectQuery}, - session::CassandraSession, +use crate::{ + db::index::{ + queries::{PreparedQueries, PreparedSelectQuery}, + session::CassandraSession, + }, + impl_query_statement, }; /// Get stake addr from vote key query. @@ -36,6 +39,8 @@ pub(crate) struct GetStakeAddrFromVoteKeyQuery { pub stake_public_key: Vec, } +impl_query_statement!(GetStakeAddrFromVoteKeyQuery, GET_STAKE_ADDR_FROM_VOTE_KEY); + impl GetStakeAddrFromVoteKeyQuery { /// Prepares a get stake addr from vote key query. pub(crate) async fn prepare(session: Arc) -> anyhow::Result { diff --git a/catalyst-gateway/bin/src/db/index/queries/registrations/get_invalid.rs b/catalyst-gateway/bin/src/db/index/queries/registrations/get_invalid.rs index bfca481f7a4c..51354f84ba4e 100644 --- a/catalyst-gateway/bin/src/db/index/queries/registrations/get_invalid.rs +++ b/catalyst-gateway/bin/src/db/index/queries/registrations/get_invalid.rs @@ -16,6 +16,7 @@ use crate::{ }, types::DbSlot, }, + impl_query_statement, service::common::types::cardano::slot_no::SlotNo, }; @@ -59,6 +60,11 @@ pub(crate) struct GetInvalidRegistrationQuery { pub cip36: bool, } +impl_query_statement!( + GetInvalidRegistrationQuery, + GET_INVALID_REGISTRATIONS_FROM_STAKE_ADDR_QUERY +); + impl GetInvalidRegistrationQuery { /// Prepares a get invalid registration query. pub(crate) async fn prepare(session: Arc) -> anyhow::Result { diff --git a/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_assets_by_stake_address.rs b/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_assets_by_stake_address.rs index 765a66914ad1..02f513290aa6 100644 --- a/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_assets_by_stake_address.rs +++ b/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_assets_by_stake_address.rs @@ -8,12 +8,15 @@ use scylla::{ }; use tracing::error; -use crate::db::{ - index::{ - queries::{PreparedQueries, PreparedSelectQuery}, - session::CassandraSession, +use crate::{ + db::{ + index::{ + queries::{PreparedQueries, PreparedSelectQuery}, + session::CassandraSession, + }, + types::{DbSlot, DbStakeAddress, DbTxnIndex, DbTxnOutputOffset}, }, - types::{DbSlot, DbStakeAddress, DbTxnIndex, DbTxnOutputOffset}, + impl_query_statement, }; /// Get assets by stake address query string. @@ -56,6 +59,11 @@ pub(crate) struct GetAssetsByStakeAddressQuery { pub value: num_bigint::BigInt, } +impl_query_statement!( + GetAssetsByStakeAddressQuery, + GET_ASSETS_BY_STAKE_ADDRESS_QUERY +); + impl GetAssetsByStakeAddressQuery { /// Prepares a get assets by stake address query. pub(crate) async fn prepare(session: Arc) -> anyhow::Result { diff --git a/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_txi_by_txn_hash.rs b/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_txi_by_txn_hash.rs index bc7fc86b18f7..83d916d0e381 100644 --- a/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_txi_by_txn_hash.rs +++ b/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_txi_by_txn_hash.rs @@ -9,12 +9,15 @@ use scylla::{ }; use tracing::error; -use crate::db::{ - index::{ - queries::{PreparedQueries, PreparedSelectQuery}, - session::CassandraSession, +use crate::{ + db::{ + index::{ + queries::{PreparedQueries, PreparedSelectQuery}, + session::CassandraSession, + }, + types::{DbSlot, DbTransactionId, DbTxnOutputOffset}, }, - types::{DbSlot, DbTransactionId, DbTxnOutputOffset}, + impl_query_statement, }; /// Get TXI query string. @@ -46,11 +49,13 @@ pub(crate) struct GetTxiByTxnHashesQuery { pub slot_no: DbSlot, } +impl_query_statement!(GetTxiByTxnHashesQuery, GET_TXI_BY_TXN_HASHES_QUERY); + impl GetTxiByTxnHashesQuery { /// Prepares a get txi query. pub(crate) async fn prepare(session: Arc) -> anyhow::Result { PreparedQueries::prepare( - session, + session.clone(), GET_TXI_BY_TXN_HASHES_QUERY, scylla::statement::Consistency::All, true, diff --git a/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_txo_by_stake_address.rs b/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_txo_by_stake_address.rs index dd2cfbe21096..760d2b6842e3 100644 --- a/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_txo_by_stake_address.rs +++ b/catalyst-gateway/bin/src/db/index/queries/staked_ada/get_txo_by_stake_address.rs @@ -8,12 +8,15 @@ use scylla::{ }; use tracing::error; -use crate::db::{ - index::{ - queries::{PreparedQueries, PreparedSelectQuery}, - session::CassandraSession, +use crate::{ + db::{ + index::{ + queries::{PreparedQueries, PreparedSelectQuery}, + session::CassandraSession, + }, + types::{DbSlot, DbStakeAddress, DbTransactionId, DbTxnIndex, DbTxnOutputOffset}, }, - types::{DbSlot, DbStakeAddress, DbTransactionId, DbTxnIndex, DbTxnOutputOffset}, + impl_query_statement, }; /// Get txo by stake address query string. @@ -55,6 +58,8 @@ pub(crate) struct GetTxoByStakeAddressQuery { pub spent_slot: Option, } +impl_query_statement!(GetTxoByStakeAddressQuery, GET_TXO_BY_STAKE_ADDRESS_QUERY); + impl GetTxoByStakeAddressQuery { /// Prepares a get txo by stake address query. pub(crate) async fn prepare(session: Arc) -> anyhow::Result { diff --git a/catalyst-gateway/bin/src/db/index/queries/staked_ada/update_txo_spent.rs b/catalyst-gateway/bin/src/db/index/queries/staked_ada/update_txo_spent.rs index d069d758a496..88ca8bc91c61 100644 --- a/catalyst-gateway/bin/src/db/index/queries/staked_ada/update_txo_spent.rs +++ b/catalyst-gateway/bin/src/db/index/queries/staked_ada/update_txo_spent.rs @@ -13,6 +13,7 @@ use crate::{ }, types::{DbSlot, DbStakeAddress, DbTxnIndex, DbTxnOutputOffset}, }, + impl_query_batch, settings::cassandra_db, }; @@ -37,10 +38,12 @@ pub(crate) struct UpdateTxoSpentQueryParams { /// Update TXO spent query. pub(crate) struct UpdateTxoSpentQuery; +impl_query_batch!(UpdateTxoSpentQuery, UPDATE_TXO_SPENT_QUERY); + impl UpdateTxoSpentQuery { /// Prepare a batch of update TXO spent queries. pub(crate) async fn prepare_batch( - session: Arc, cfg: &cassandra_db::EnvVars, + session: &Arc, cfg: &cassandra_db::EnvVars, ) -> anyhow::Result { PreparedQueries::prepare_batch( session.clone(), diff --git a/catalyst-gateway/bin/src/db/index/queries/sync_status/update.rs b/catalyst-gateway/bin/src/db/index/queries/sync_status/update.rs index c02e98343900..ddfe99840c5b 100644 --- a/catalyst-gateway/bin/src/db/index/queries/sync_status/update.rs +++ b/catalyst-gateway/bin/src/db/index/queries/sync_status/update.rs @@ -13,6 +13,7 @@ use crate::{ queries::{PreparedQueries, PreparedUpsertQuery}, session::CassandraSession, }, + impl_query_statement, service::utilities::convert::from_saturating, settings::Settings, }; @@ -60,6 +61,8 @@ impl SyncStatusQueryParams { /// Sync Status Insert query. pub(crate) struct SyncStatusInsertQuery; +impl_query_statement!(SyncStatusInsertQuery, INSERT_SYNC_STATUS_QUERY); + impl SyncStatusInsertQuery { /// Prepares a Sync Status Insert query. pub(crate) async fn prepare(session: Arc) -> anyhow::Result { diff --git a/catalyst-gateway/bin/src/db/index/session.rs b/catalyst-gateway/bin/src/db/index/session.rs index 6abeb86210d1..c8c68fc8b66c 100644 --- a/catalyst-gateway/bin/src/db/index/session.rs +++ b/catalyst-gateway/bin/src/db/index/session.rs @@ -1,6 +1,7 @@ //! Session creation and storage use std::{ + any::TypeId, fmt::Debug, path::PathBuf, sync::{Arc, OnceLock}, @@ -8,6 +9,7 @@ use std::{ }; use cardano_blockchain_types::Network; +use dashmap::DashMap; use openssl::ssl::{SslContextBuilder, SslFiletype, SslMethod, SslVerifyMode}; use scylla::{ frame::Compression, serialize::row::SerializeRow, transport::iterator::QueryPager, @@ -19,9 +21,8 @@ use tracing::{error, info}; use super::{ queries::{ - purge::{self, PreparedDeleteQuery}, - FallibleQueryResults, PreparedQueries, PreparedQuery, PreparedSelectQuery, - PreparedUpsertQuery, + prepare_queries, session_execute_batch, session_execute_iter, session_execute_upsert, + FallibleQueryResults, QueryKind, }, schema::create_schema, }; @@ -112,9 +113,7 @@ pub(crate) struct CassandraSession { /// The actual session. session: Arc, /// All prepared queries we can use on this session. - queries: Arc, - /// All prepared purge queries we can use on this session. - purge_queries: Arc, + prepared_queries: DashMap, } /// Session error while initialization. @@ -180,13 +179,18 @@ impl CassandraSession { /// Returns an iterator that iterates over all the result pages that the query /// returns. pub(crate) async fn execute_iter

( - &self, select_query: PreparedSelectQuery, params: P, + &self, select_query: TypeId, params: P, ) -> anyhow::Result where P: SerializeRow { let session = self.session.clone(); - let queries = self.queries.clone(); + let Some(key_value) = self.prepared_queries.get(&select_query) else { + anyhow::bail!("Unregistered Query"); + }; + let QueryKind::Statement(ref stmt) = key_value.value() else { + anyhow::bail!("Invalid query kind"); + }; - queries.execute_iter(session, select_query, params).await + session_execute_iter(session, stmt, params).await } /// Execute a Batch query with the given parameters. @@ -197,24 +201,36 @@ impl CassandraSession { /// This will divide the batch into optimal sized chunks and execute them until all /// values have been executed or the first error is encountered. pub(crate) async fn execute_batch( - &self, query: PreparedQuery, values: Vec, + &self, query: TypeId, query_str: &str, values: Vec, ) -> FallibleQueryResults { let session = self.session.clone(); let cfg = self.cfg.clone(); - let queries = self.queries.clone(); - queries.execute_batch(session, cfg, query, values).await + let Some(key_value) = self.prepared_queries.get(&query) else { + anyhow::bail!("Unregistered Query"); + }; + let QueryKind::Batch(ref batch) = key_value.value() else { + anyhow::bail!("Invalid query kind"); + }; + + session_execute_batch(session, batch, cfg, query_str, values).await } /// Execute a query which returns no results, except an error if it fails. /// Can not be batched, takes a single set of parameters. pub(crate) async fn execute_upsert( - &self, query: PreparedUpsertQuery, value: T, + &self, query: TypeId, params: T, ) -> anyhow::Result<()> { let session = self.session.clone(); - let queries = self.queries.clone(); - queries.execute_upsert(session, query, value).await + let Some(key_value) = self.prepared_queries.get(&query) else { + anyhow::bail!("Unregistered Query"); + }; + let QueryKind::Statement(ref query) = key_value.value() else { + anyhow::bail!("Invalid query kind"); + }; + + session_execute_upsert(session, query, params).await } /// Execute a purge query with the given parameters. @@ -227,7 +243,7 @@ impl CassandraSession { /// /// NOTE: This is currently only used to purge volatile data. pub(crate) async fn purge_execute_batch( - &self, query: PreparedDeleteQuery, values: Vec, + &self, query: TypeId, query_str: &str, values: Vec, ) -> FallibleQueryResults { // Only execute purge queries on the volatile session let persistent = false; @@ -236,15 +252,20 @@ impl CassandraSession { anyhow::bail!("Volatile DB Session not found"); }; let cfg = self.cfg.clone(); - let queries = self.purge_queries.clone(); let session = volatile_db.session.clone(); + let Some(key_value) = self.prepared_queries.get(&query) else { + anyhow::bail!("Unregistered Query"); + }; + let QueryKind::Batch(ref batch) = key_value.value() else { + anyhow::bail!("Invalid query kind"); + }; - queries.execute_batch(session, cfg, query, values).await + session_execute_batch(session, batch, cfg, query_str, values).await } /// Execute a select query to gather primary keys for purging. pub(crate) async fn purge_execute_iter( - &self, query: purge::PreparedSelectQuery, + &self, query_type: TypeId, ) -> anyhow::Result { // Only execute purge queries on the volatile session let persistent = false; @@ -252,11 +273,15 @@ impl CassandraSession { // This should never happen anyhow::bail!("Volatile DB Session not found"); }; - let queries = self.purge_queries.clone(); - queries - .execute_iter(volatile_db.session.clone(), query) - .await + let Some(key_value) = self.prepared_queries.get(&query_type) else { + anyhow::bail!("Unregistered Query"); + }; + let QueryKind::Statement(ref query) = key_value.value() else { + anyhow::bail!("Invalid query kind"); + }; + + session_execute_iter(volatile_db.session.clone(), query, ()).await } /// Get underlying Raw Cassandra Session. @@ -394,14 +419,14 @@ async fn retry_init(cfg: cassandra_db::EnvVars, network: Network, persistent: bo continue; } - let queries = match PreparedQueries::new(session.clone(), &cfg).await { - Ok(queries) => Arc::new(queries), + let prepared_queries = match prepare_queries(&session, &cfg).await { + Ok(queries) => queries, Err(error) => { error!( db_type = db_type, network = %network, - error = %error, - "Failed to Create Cassandra Prepared Queries" + error = format!("{error:?}"), + "Failed to Prepare Queries for Cassandra DB Session" ); drop(INIT_SESSION_ERROR.set(Arc::new( CassandraSessionError::PreparingQueriesFailed { source: error }, @@ -410,29 +435,13 @@ async fn retry_init(cfg: cassandra_db::EnvVars, network: Network, persistent: bo }, }; - let purge_queries = match Box::pin(purge::PreparedQueries::new(session.clone(), &cfg)).await - { - Ok(queries) => Arc::new(queries), - Err(error) => { - error!( - db_type = db_type, - network = %network, - error = %error, - "Failed to Create Cassandra Prepared Purge Queries" - ); - drop(INIT_SESSION_ERROR.set(Arc::new( - CassandraSessionError::PreparingPurgeQueriesFailed { source: error }, - ))); - continue; - }, - }; + let cfg = Arc::new(cfg); let cassandra_session = CassandraSession { persistent, - cfg: Arc::new(cfg), + cfg, session, - queries, - purge_queries, + prepared_queries, }; // Save the session so we can execute queries on the DB diff --git a/catalyst-gateway/bin/src/db/index/tests/scylla_purge.rs b/catalyst-gateway/bin/src/db/index/tests/scylla_purge.rs index df41225bc348..7483a6217549 100644 --- a/catalyst-gateway/bin/src/db/index/tests/scylla_purge.rs +++ b/catalyst-gateway/bin/src/db/index/tests/scylla_purge.rs @@ -28,14 +28,14 @@ async fn catalyst_id_for_stake_address() { // data let data = vec![ - rbac509::insert_catalyst_id_for_stake_address::Params::new( + rbac509::insert_catalyst_id_for_stake_address::CatalystIdForStakeAddressInsert::new( stake_address_1(), 0.into(), "cardano/FftxFnOrj2qmTuB2oZG2v0YEWJfKvQ9Gg8AgNAhDsKE" .parse() .unwrap(), ), - rbac509::insert_catalyst_id_for_stake_address::Params::new( + rbac509::insert_catalyst_id_for_stake_address::CatalystIdForStakeAddressInsert::new( stake_address_2(), 1.into(), "cardano/FftxFnOrj2qmTuB2oZG2v0YEWJfKvQ9Gg8AgNAhDsKE" @@ -98,13 +98,13 @@ async fn catalyst_id_for_txn_id() { // data let data = vec![ - rbac509::insert_catalyst_id_for_txn_id::Params::new( + rbac509::insert_catalyst_id_for_txn_id::CatalystIdForTxnIdInsert::new( "cardano/FftxFnOrj2qmTuB2oZG2v0YEWJfKvQ9Gg8AgNAhDsKE" .parse() .unwrap(), TransactionId::new(&[0]), ), - rbac509::insert_catalyst_id_for_txn_id::Params::new( + rbac509::insert_catalyst_id_for_txn_id::CatalystIdForTxnIdInsert::new( "cardano/FftxFnOrj2qmTuB2oZG2v0YEWJfKvQ9Gg8AgNAhDsKE" .parse() .unwrap(), @@ -166,7 +166,7 @@ async fn rbac509_registration() { // data let data = vec![ - rbac509::insert_rbac509::Params::new( + rbac509::insert_rbac509::Rbac509Insert::new( "cardano/FftxFnOrj2qmTuB2oZG2v0YEWJfKvQ9Gg8AgNAhDsKE" .parse() .unwrap(), @@ -176,7 +176,7 @@ async fn rbac509_registration() { UuidV4::new(), None, ), - rbac509::insert_rbac509::Params::new( + rbac509::insert_rbac509::Rbac509Insert::new( "cardano/FftxFnOrj2qmTuB2oZG2v0YEWJfKvQ9Gg8AgNAhDsKE" .parse() .unwrap(), @@ -243,7 +243,7 @@ async fn rbac509_invalid_registration() { // data let report = ProblemReport::new("test context"); let data = vec![ - rbac509::insert_rbac509_invalid::Params::new( + rbac509::insert_rbac509_invalid::Rbac509InvalidInsert::new( "cardano/FftxFnOrj2qmTuB2oZG2v0YEWJfKvQ9Gg8AgNAhDsKE" .parse() .unwrap(), @@ -254,7 +254,7 @@ async fn rbac509_invalid_registration() { None, &report, ), - rbac509::insert_rbac509_invalid::Params::new( + rbac509::insert_rbac509_invalid::Rbac509InvalidInsert::new( "cardano/FftxFnOrj2qmTuB2oZG2v0YEWJfKvQ9Gg8AgNAhDsKE" .parse() .unwrap(), @@ -321,14 +321,14 @@ async fn test_cip36_registration_for_vote_key() { // data let data = vec![ - cip36::insert_cip36_for_vote_key::Params::new( + cip36::insert_cip36_for_vote_key::Cip36ForVoteKeyInsert::new( &voting_pub_key(0), 0.into(), 0.into(), &test_utils::cip_36_1(), false, ), - cip36::insert_cip36_for_vote_key::Params::new( + cip36::insert_cip36_for_vote_key::Cip36ForVoteKeyInsert::new( &voting_pub_key(1), 1.into(), 1.into(), @@ -392,13 +392,13 @@ async fn test_cip36_registration_invalid() { // data let data = vec![ - cip36::insert_cip36_invalid::Params::new( + cip36::insert_cip36_invalid::Cip36InvalidInsert::new( Some(&voting_pub_key(0)), 0.into(), 0.into(), &test_utils::cip_36_1(), ), - cip36::insert_cip36_invalid::Params::new( + cip36::insert_cip36_invalid::Cip36InvalidInsert::new( Some(&voting_pub_key(1)), 1.into(), 1.into(), @@ -460,13 +460,13 @@ async fn test_cip36_registration() { // data let data = vec![ - cip36::insert_cip36::Params::new( + cip36::insert_cip36::Cip36Insert::new( &voting_pub_key(0), 0.into(), 0.into(), &test_utils::cip_36_1(), ), - cip36::insert_cip36::Params::new( + cip36::insert_cip36::Cip36Insert::new( &voting_pub_key(1), 1.into(), 1.into(), @@ -674,7 +674,7 @@ async fn test_txo_ada() { // data let data = vec![ - txo::insert_txo::Params::new( + txo::insert_txo::TxoInsertQuery::new( stake_address_1(), 0.into(), 0.into(), @@ -683,7 +683,7 @@ async fn test_txo_ada() { 0, TransactionId::new(&[0]), ), - txo::insert_txo::Params::new( + txo::insert_txo::TxoInsertQuery::new( stake_address_2(), 1.into(), 1.into(), diff --git a/catalyst-gateway/bin/src/db/index/tests/scylla_queries.rs b/catalyst-gateway/bin/src/db/index/tests/scylla_queries.rs index 4951821a2c81..b736a1f19fa6 100644 --- a/catalyst-gateway/bin/src/db/index/tests/scylla_queries.rs +++ b/catalyst-gateway/bin/src/db/index/tests/scylla_queries.rs @@ -49,13 +49,13 @@ async fn test_get_assets_by_stake_addr() { #[ignore = "An integration test which requires a running Scylla node instance, disabled from `testunit` CI run"] #[tokio::test] async fn get_catalyst_id_by_stake_address() { - use rbac::get_catalyst_id_from_stake_address::{Query, QueryParams}; + use rbac::get_catalyst_id_from_stake_address::{GetCatalystIdForStakeAddress, QueryParams}; let Ok((session, _)) = get_shared_session().await else { panic!("{SESSION_ERR_MSG}"); }; - let mut row_stream = Query::execute(&session, QueryParams { + let mut row_stream = GetCatalystIdForStakeAddress::execute(&session, QueryParams { stake_address: stake_address_1().into(), }) .await @@ -69,14 +69,14 @@ async fn get_catalyst_id_by_stake_address() { #[ignore = "An integration test which requires a running Scylla node instance, disabled from `testunit` CI run"] #[tokio::test] async fn get_catalyst_id_by_transaction_id() { - use rbac::get_catalyst_id_from_transaction_id::{Query, QueryParams}; + use rbac::get_catalyst_id_from_transaction_id::{GetCatalystIdForTxnId, QueryParams}; let Ok((session, _)) = get_shared_session().await else { panic!("{SESSION_ERR_MSG}"); }; let txn_id = TransactionId::new(&[1, 2, 3]).into(); - let mut row_stream = Query::execute(&session, QueryParams { txn_id }) + let mut row_stream = GetCatalystIdForTxnId::execute(&session, QueryParams { txn_id }) .await .unwrap(); @@ -107,7 +107,7 @@ async fn test_get_invalid_registration_w_stake_addr() { #[ignore = "An integration test which requires a running Scylla node instance, disabled from `testunit` CI run"] #[tokio::test] async fn get_rbac_registrations_by_catalyst_id() { - use rbac::get_rbac_registrations::{Query, QueryParams}; + use rbac::get_rbac_registrations::{GetRbac509Registrations, QueryParams}; let Ok((session, _)) = get_shared_session().await else { panic!("{SESSION_ERR_MSG}"); @@ -116,7 +116,7 @@ async fn get_rbac_registrations_by_catalyst_id() { let id: CatalystId = "cardano/FftxFnOrj2qmTuB2oZG2v0YEWJfKvQ9Gg8AgNAhDsKE" .parse() .unwrap(); - let mut row_stream = Query::execute(&session, QueryParams { + let mut row_stream = GetRbac509Registrations::execute(&session, QueryParams { catalyst_id: id.into(), }) .await @@ -130,7 +130,7 @@ async fn get_rbac_registrations_by_catalyst_id() { #[ignore = "An integration test which requires a running Scylla node instance, disabled from `testunit` CI run"] #[tokio::test] async fn get_rbac_invalid_registrations_by_catalyst_id() { - use rbac::get_rbac_invalid_registrations::{Query, QueryParams}; + use rbac::get_rbac_invalid_registrations::{GetRbac509InvalidRegistrations, QueryParams}; let Ok((session, _)) = get_shared_session().await else { panic!("{SESSION_ERR_MSG}"); @@ -139,7 +139,7 @@ async fn get_rbac_invalid_registrations_by_catalyst_id() { let id: CatalystId = "cardano/FftxFnOrj2qmTuB2oZG2v0YEWJfKvQ9Gg8AgNAhDsKE" .parse() .unwrap(); - let mut row_stream = Query::execute(&session, QueryParams { + let mut row_stream = GetRbac509InvalidRegistrations::execute(&session, QueryParams { catalyst_id: id.into(), }) .await diff --git a/catalyst-gateway/bin/src/service/api/cardano/rbac/registrations_get/chain_info.rs b/catalyst-gateway/bin/src/service/api/cardano/rbac/registrations_get/chain_info.rs index 411be500960b..4a93b61efa3d 100644 --- a/catalyst-gateway/bin/src/service/api/cardano/rbac/registrations_get/chain_info.rs +++ b/catalyst-gateway/bin/src/service/api/cardano/rbac/registrations_get/chain_info.rs @@ -10,7 +10,9 @@ use rbac_registration::registration::cardano::RegistrationChain; use crate::{ db::index::{ - queries::rbac::get_rbac_registrations::{build_reg_chain, indexed_registrations, Query}, + queries::rbac::get_rbac_registrations::{ + build_reg_chain, indexed_registrations, GetRbac509Registrations, + }, session::CassandraSession, }, settings::Settings, @@ -72,7 +74,7 @@ impl ChainInfo { async fn last_registration_chain( persistent_session: &CassandraSession, volatile_session: &CassandraSession, catalyst_id: &CatalystId, -) -> anyhow::Result> { +) -> anyhow::Result> { let (persistent_registrations, volatile_registrations) = try_join( indexed_registrations(persistent_session, catalyst_id), indexed_registrations(volatile_session, catalyst_id), diff --git a/catalyst-gateway/bin/src/service/api/cardano/rbac/registrations_get/mod.rs b/catalyst-gateway/bin/src/service/api/cardano/rbac/registrations_get/mod.rs index b5571f8d63c7..929c4466fba5 100644 --- a/catalyst-gateway/bin/src/service/api/cardano/rbac/registrations_get/mod.rs +++ b/catalyst-gateway/bin/src/service/api/cardano/rbac/registrations_get/mod.rs @@ -26,7 +26,9 @@ use tracing::error; use crate::{ db::index::{ - queries::rbac::get_catalyst_id_from_stake_address::{Query, QueryParams}, + queries::rbac::get_catalyst_id_from_stake_address::{ + GetCatalystIdForStakeAddress, QueryParams, + }, session::CassandraSession, }, service::{ @@ -110,7 +112,7 @@ pub(crate) async fn endpoint( async fn catalyst_id_from_stake( session: &CassandraSession, address: StakeAddress, ) -> anyhow::Result> { - let mut result: Vec<_> = Query::execute(session, QueryParams { + let mut result: Vec<_> = GetCatalystIdForStakeAddress::execute(session, QueryParams { stake_address: address.into(), }) .and_then(|r| r.try_collect().map_err(Into::into))