Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
f1a5d96
feat(cat-gateway): add Query trait, use DashMap
saibatizoku Jun 3, 2025
61f5cce
feat(cat-gateway): add prepared_queries field to CassandraSession
saibatizoku Jun 3, 2025
57b67a3
feat(cat-gateway): impl Query for TxiInsertQuery
saibatizoku Jun 3, 2025
59129b9
feat(cat-gateway): impl Query for TxoInsert
saibatizoku Jun 3, 2025
fd4c746
feat(cat-gateway): impl Query for InsertTxoAsset
saibatizoku Jun 3, 2025
500e86b
feat(cat-gateway): impl Query for InsertUnstakedTxo
saibatizoku Jun 3, 2025
bc3e870
feat(cat-gateway): impl Query for InsertUnstakedTxoAsset
saibatizoku Jun 3, 2025
ca2468e
wip(cat-gateway): cleanup queries for refactoring
saibatizoku Jun 11, 2025
877035f
feat(cat-gateway): impl Query for StakeRegistrationInsertQuery
saibatizoku Jun 11, 2025
64882a4
wip(cat-gateway): draft generic query preparation
saibatizoku Jun 11, 2025
d1f9c2e
fix(cat-gateway): remove unused dependency
saibatizoku Jun 11, 2025
2bd7c98
feat(cat-gateway): impl Query for Cip36Insert
saibatizoku Jun 12, 2025
5d6aa3d
feat(cat-gateway): impl Query for Cip36InvalidInsert
saibatizoku Jun 12, 2025
b7122eb
feat(cat-gateway): impl Query for Cip36ForVoteKeyInsert
saibatizoku Jun 12, 2025
1408c2d
feat(cat-gateway): impl Query for UpdateTxoSpentQuery
saibatizoku Jun 12, 2025
bdf3bb3
feat(cat-gateway): impl Query for GetTxiByTxnHashesQuery
saibatizoku Jun 12, 2025
029071d
feat(cat-gateway): add macros to implement Query trait
saibatizoku Jun 12, 2025
4f151a0
wip(cat-gateway): implement query trait for batched queries
saibatizoku Jun 12, 2025
464d781
wip(cat-gateway): implement query trait for statement queries
saibatizoku Jun 12, 2025
674ef1a
wip(cat-gateway): update implementation query trait for UpdateTxoSpent
saibatizoku Jun 12, 2025
e252d43
feat(cat-gateway): impl Query for prepared statement types
saibatizoku Jun 12, 2025
7b7f5df
wip(cat-gateway): impl Query for purge TxoAda
saibatizoku Jun 12, 2025
8225be2
feat(cat-gateway): add type_id associated function to Query trait
saibatizoku Jun 13, 2025
0ea4da2
feat(cat-gateway): all queries implement Query trait
saibatizoku Jun 13, 2025
b7c68e3
wip(cat-gateway): associated QUERY_STR const in Query trait
saibatizoku Jun 23, 2025
5dfc471
wip(cat-gateway): remove duplicated query preparation code
saibatizoku Jun 23, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion catalyst-gateway/bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ num-bigint = "0.4.6"
futures = "0.3.31"
rand = "0.8.5"
moka = { version = "0.12.8", features = ["future"] }
crossbeam-skiplist = "0.1.3"
poem = { version = "=3.1.6", features = ["embed", "prometheus", "compression"] }
poem-openapi = { version = "=5.1.5", features = [
"openapi-explorer",
Expand Down
12 changes: 9 additions & 3 deletions catalyst-gateway/bin/src/db/index/block/certs.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Index certs found in a transaction.

use std::{fmt::Debug, sync::Arc};
use std::{fmt, sync::Arc};

use cardano_blockchain_types::{MultiEraBlock, Slot, StakeAddress, TxnIndex, VKeyHash};
use ed25519_dalek::VerifyingKey;
Expand All @@ -16,6 +16,7 @@ use crate::{
},
types::{DbPublicKey, DbSlot, DbStakeAddress, DbTxnIndex},
},
impl_query_batch,
settings::cassandra_db,
};

Expand All @@ -42,8 +43,8 @@ pub(crate) struct StakeRegistrationInsertQuery {
pool_delegation: MaybeUnset<Vec<u8>>,
}

impl Debug for StakeRegistrationInsertQuery {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
impl fmt::Debug for StakeRegistrationInsertQuery {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let stake_public_key = hex::encode(self.stake_public_key.as_ref());
let register = match self.register {
MaybeUnset::Unset => "UNSET",
Expand Down Expand Up @@ -79,6 +80,11 @@ impl Debug for StakeRegistrationInsertQuery {
/// Insert stake registration
const INSERT_STAKE_REGISTRATION_QUERY: &str = include_str!("./cql/insert_stake_registration.cql");

impl_query_batch!(
StakeRegistrationInsertQuery,
INSERT_STAKE_REGISTRATION_QUERY
);

impl StakeRegistrationInsertQuery {
/// Create a new Insert Query.
#[allow(clippy::too_many_arguments, clippy::fn_params_excessive_bools)]
Expand Down
17 changes: 10 additions & 7 deletions catalyst-gateway/bin/src/db/index/block/cip36/insert_cip36.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Insert CIP36 Registration Query

use std::{fmt::Debug, sync::Arc};
use std::{fmt, sync::Arc};

use cardano_blockchain_types::{Cip36, Slot, TxnIndex, VotingPubKey};
use scylla::{frame::value::MaybeUnset, SerializeRow, Session};
Expand All @@ -11,6 +11,7 @@ use crate::{
index::queries::{PreparedQueries, SizedBatch},
types::{DbSlot, DbTxnIndex},
},
impl_query_batch,
settings::cassandra_db,
};

Expand All @@ -19,7 +20,7 @@ const INSERT_CIP36_REGISTRATION_QUERY: &str = include_str!("./cql/insert_cip36.c

/// Insert CIP-36 Registration Query Parameters
#[derive(SerializeRow, Clone)]
pub(crate) struct Params {
pub(crate) struct Cip36Insert {
/// Full Stake Address (not hashed, 32 byte ED25519 Public key).
stake_public_key: Vec<u8>,
/// Nonce value after normalization.
Expand All @@ -40,13 +41,15 @@ pub(crate) struct Params {
cip36: bool,
}

impl Debug for Params {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
impl_query_batch!(Cip36Insert, INSERT_CIP36_REGISTRATION_QUERY);

impl fmt::Debug for Cip36Insert {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let payment_address = match self.payment_address {
MaybeUnset::Unset => "UNSET",
MaybeUnset::Set(ref v) => &hex::encode(v),
};
f.debug_struct("Params")
f.debug_struct("Cip36Insert")
.field("stake_public_key", &self.stake_public_key)
.field("nonce", &self.nonce)
.field("slot_no", &self.slot_no)
Expand All @@ -60,7 +63,7 @@ impl Debug for Params {
}
}

impl Params {
impl Cip36Insert {
/// Create a new Insert Query.
pub fn new(vote_key: &VotingPubKey, slot_no: Slot, txn_index: TxnIndex, cip36: &Cip36) -> Self {
let stake_public_key = cip36
Expand All @@ -73,7 +76,7 @@ impl Params {
.payment_address()
.map_or(MaybeUnset::Unset, |a| MaybeUnset::Set(a.to_vec()));
let is_cip36 = cip36.is_cip36().unwrap_or_default();
Params {
Cip36Insert {
stake_public_key,
nonce: cip36.nonce().unwrap_or_default().into(),
slot_no: slot_no.into(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::{
index::queries::{PreparedQueries, SizedBatch},
types::{DbSlot, DbTxnIndex},
},
impl_query_batch,
settings::cassandra_db,
};

Expand All @@ -20,7 +21,7 @@ const INSERT_CIP36_REGISTRATION_FOR_VOTE_KEY_QUERY: &str =

/// Insert CIP-36 Registration Invalid Query Parameters
#[derive(SerializeRow, Debug)]
pub(crate) struct Params {
pub(crate) struct Cip36ForVoteKeyInsert {
/// Voting Public Key
vote_key: Vec<u8>,
/// Full Stake Address (not hashed, 32 byte ED25519 Public key).
Expand All @@ -33,12 +34,17 @@ pub(crate) struct Params {
valid: bool,
}

impl Params {
impl_query_batch!(
Cip36ForVoteKeyInsert,
INSERT_CIP36_REGISTRATION_FOR_VOTE_KEY_QUERY
);

impl Cip36ForVoteKeyInsert {
/// Create a new Insert Query.
pub fn new(
vote_key: &VotingPubKey, slot_no: Slot, txn_index: TxnIndex, cip36: &Cip36, valid: bool,
) -> Self {
Params {
Cip36ForVoteKeyInsert {
vote_key: vote_key
.voting_pk()
.map(|k| k.to_bytes().to_vec())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Insert CIP36 Registration Query (Invalid Records)

use std::{fmt::Debug, sync::Arc};
use std::{fmt, sync::Arc};

use cardano_blockchain_types::{Cip36, Slot, TxnIndex, VotingPubKey};
use pallas::ledger::addresses::Address;
Expand All @@ -12,6 +12,7 @@ use crate::{
index::queries::{PreparedQueries, SizedBatch},
types::{DbSlot, DbTxnIndex},
},
impl_query_batch,
settings::cassandra_db,
};

Expand All @@ -21,7 +22,7 @@ const INSERT_CIP36_REGISTRATION_INVALID_QUERY: &str =

/// Insert CIP-36 Registration Invalid Query Parameters
#[derive(SerializeRow, Clone)]
pub(crate) struct Params {
pub(crate) struct Cip36InvalidInsert {
/// Full Stake Address (not hashed, 32 byte ED25519 Public key).
stake_public_key: Vec<u8>,
/// Slot Number the cert is in.
Expand All @@ -46,13 +47,15 @@ pub(crate) struct Params {
problem_report: String,
}

impl Debug for Params {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
impl_query_batch!(Cip36InvalidInsert, INSERT_CIP36_REGISTRATION_INVALID_QUERY);

impl fmt::Debug for Cip36InvalidInsert {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let cip36 = match self.cip36 {
MaybeUnset::Unset => "UNSET",
MaybeUnset::Set(v) => &format!("{v:?}"),
};
f.debug_struct("Params")
f.debug_struct("Cip36InvalidInsert")
.field("stake_public_key", &self.stake_public_key)
.field("slot_no", &self.slot_no)
.field("txn_index", &self.txn_index)
Expand All @@ -68,7 +71,7 @@ impl Debug for Params {
}
}

impl Params {
impl Cip36InvalidInsert {
/// Create a new Insert Query.
pub fn new(
vote_key: Option<&VotingPubKey>, slot_no: Slot, txn_index: TxnIndex, cip36: &Cip36,
Expand All @@ -89,7 +92,7 @@ impl Params {
String::new()
});

Params {
Cip36InvalidInsert {
stake_public_key,
slot_no: slot_no.into(),
txn_index: txn_index.into(),
Expand Down
43 changes: 23 additions & 20 deletions catalyst-gateway/bin/src/db/index/block/cip36/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ use crate::{
/// Insert CIP-36 Registration Queries
pub(crate) struct Cip36InsertQuery {
/// Stake Registration Data captured during indexing.
registrations: Vec<insert_cip36::Params>,
registrations: Vec<insert_cip36::Cip36Insert>,
/// Stake Registration Data captured during indexing.
invalid: Vec<insert_cip36_invalid::Params>,
invalid: Vec<insert_cip36_invalid::Cip36InvalidInsert>,
/// Stake Registration Data captured during indexing.
for_vote_key: Vec<insert_cip36_for_vote_key::Params>,
for_vote_key: Vec<insert_cip36_for_vote_key::Cip36ForVoteKeyInsert>,
/// Stake Registration Data captured during indexing.
stake_regs: Vec<certs::StakeRegistrationInsertQuery>,
}
Expand All @@ -46,11 +46,11 @@ impl Cip36InsertQuery {
pub(crate) async fn prepare_batch(
session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
) -> anyhow::Result<(SizedBatch, SizedBatch, SizedBatch)> {
let insert_cip36_batch = insert_cip36::Params::prepare_batch(session, cfg).await;
let insert_cip36_batch = insert_cip36::Cip36Insert::prepare_batch(session, cfg).await;
let insert_cip36_invalid_batch =
insert_cip36_invalid::Params::prepare_batch(session, cfg).await;
insert_cip36_invalid::Cip36InvalidInsert::prepare_batch(session, cfg).await;
let insert_cip36_for_vote_key_addr_batch =
insert_cip36_for_vote_key::Params::prepare_batch(session, cfg).await;
insert_cip36_for_vote_key::Cip36ForVoteKeyInsert::prepare_batch(session, cfg).await;
// Its a hack of inserting `stake_regs` during the indexing CIP 36 registrations.
// Its done because some of the CIP 36 registrations contains some stake addresses which
// are not actually some how registered using cardano certs.
Expand Down Expand Up @@ -82,11 +82,11 @@ impl Cip36InsertQuery {
let stake_pk_hash = Blake2b224Hash::new(&stake_pk.to_bytes());
let stake_address = StakeAddress::new(block.network(), false, stake_pk_hash);

self.registrations.push(insert_cip36::Params::new(
self.registrations.push(insert_cip36::Cip36Insert::new(
voting_key, slot_no, index, &cip36,
));
self.for_vote_key
.push(insert_cip36_for_vote_key::Params::new(
.push(insert_cip36_for_vote_key::Cip36ForVoteKeyInsert::new(
voting_key, slot_no, index, &cip36, true,
));
self.stake_regs
Expand All @@ -107,21 +107,24 @@ impl Cip36InsertQuery {
// Cannot index an invalid CIP36, if there is no stake public key.
if let Some(stake_pk) = cip36.stake_pk() {
if cip36.voting_pks().is_empty() {
self.invalid.push(insert_cip36_invalid::Params::new(
None, slot_no, index, &cip36,
));
self.invalid
.push(insert_cip36_invalid::Cip36InvalidInsert::new(
None, slot_no, index, &cip36,
));
} else {
for voting_key in cip36.voting_pks() {
self.invalid.push(insert_cip36_invalid::Params::new(
Some(voting_key),
slot_no,
index,
&cip36,
));
self.for_vote_key
.push(insert_cip36_for_vote_key::Params::new(
voting_key, slot_no, index, &cip36, false,
self.invalid
.push(insert_cip36_invalid::Cip36InvalidInsert::new(
Some(voting_key),
slot_no,
index,
&cip36,
));
self.for_vote_key.push(
insert_cip36_for_vote_key::Cip36ForVoteKeyInsert::new(
voting_key, slot_no, index, &cip36, false,
),
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::{
index::queries::{PreparedQueries, SizedBatch},
types::{DbCatalystId, DbSlot, DbStakeAddress},
},
impl_query_batch,
settings::cassandra_db::EnvVars,
};

Expand All @@ -20,7 +21,7 @@ const QUERY: &str = include_str!("cql/insert_catalyst_id_for_stake_address.cql")

/// Insert Catalyst ID For Stake Address Query Parameters
#[derive(SerializeRow)]
pub(crate) struct Params {
pub(crate) struct CatalystIdForStakeAddressInsert {
/// A stake address.
stake_address: DbStakeAddress,
/// A Catalyst short identifier.
Expand All @@ -29,20 +30,22 @@ pub(crate) struct Params {
slot_no: DbSlot,
}

impl Debug for Params {
impl_query_batch!(CatalystIdForStakeAddressInsert, QUERY);

impl Debug for CatalystIdForStakeAddressInsert {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Params")
f.debug_struct("CatalystIdForStakeAddressInsert")
.field("stake_address", &self.stake_address)
.field("catalyst_id", &self.catalyst_id)
.field("slot_no", &self.slot_no)
.finish()
}
}

impl Params {
impl CatalystIdForStakeAddressInsert {
/// Create a new record for this transaction.
pub(crate) fn new(stake_address: StakeAddress, slot_no: Slot, catalyst_id: CatalystId) -> Self {
Params {
CatalystIdForStakeAddressInsert {
stake_address: stake_address.into(),
catalyst_id: catalyst_id.into(),
slot_no: slot_no.into(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::{
index::queries::{PreparedQueries, SizedBatch},
types::{DbCatalystId, DbTransactionId},
},
impl_query_batch,
settings::cassandra_db::EnvVars,
};

Expand All @@ -20,26 +21,28 @@ const QUERY: &str = include_str!("cql/insert_catalyst_id_for_txn_id.cql");

/// Insert Catalyst ID For Transaction ID Query Parameters
#[derive(SerializeRow)]
pub(crate) struct Params {
pub(crate) struct CatalystIdForTxnIdInsert {
/// A transaction hash.
txn_id: DbTransactionId,
/// A Catalyst short identifier.
catalyst_id: DbCatalystId,
}

impl Debug for Params {
impl_query_batch!(CatalystIdForTxnIdInsert, QUERY);

impl Debug for CatalystIdForTxnIdInsert {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Params")
f.debug_struct("CatalystIdForTxnIdInsert")
.field("txn_id", &self.txn_id)
.field("catalyst_id", &self.catalyst_id)
.finish()
}
}

impl Params {
impl CatalystIdForTxnIdInsert {
/// Creates a new record for this transaction.
pub(crate) fn new(catalyst_id: CatalystId, txn_id: TransactionId) -> Self {
Params {
CatalystIdForTxnIdInsert {
txn_id: txn_id.into(),
catalyst_id: catalyst_id.into(),
}
Expand Down
Loading
Loading