Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,12 @@ INSERT INTO rbac_registration (
txn_index,
txn_id,
prv_txn_id,
removed_stake_addresses,
purpose
) VALUES (
:catalyst_id,
:slot_no,
:txn_index,
:txn_id,
:prv_txn_id,
:removed_stake_addresses,
:purpose
);
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use tracing::error;

use crate::{
db::{
index::queries::{PreparedQueries, SizedBatch},
index::{
queries::{FallibleQueryResults, PreparedQueries, PreparedQuery, SizedBatch},
session::CassandraSession,
},
types::{DbCatalystId, DbPublicKey, DbSlot},
},
settings::cassandra_db::EnvVars,
Expand Down Expand Up @@ -57,6 +60,16 @@ impl Params {
}
}

/// Executes prepared queries as a batch.
pub(crate) async fn execute_batch(
session: &Arc<CassandraSession>,
queries: Vec<Self>,
) -> FallibleQueryResults {
session
.execute_batch(PreparedQuery::CatalystIdForPublicKeyInsertQuery, queries)
.await
}

/// Prepares a batch of queries.
pub(crate) async fn prepare_batch(
session: &Arc<Session>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use tracing::error;

use crate::{
db::{
index::queries::{PreparedQueries, SizedBatch},
index::{
queries::{FallibleQueryResults, PreparedQueries, PreparedQuery, SizedBatch},
session::CassandraSession,
},
types::{DbCatalystId, DbSlot, DbStakeAddress, DbTxnIndex},
},
settings::cassandra_db::EnvVars,
Expand Down Expand Up @@ -61,6 +64,23 @@ impl Params {
}
}

/// Executes prepared queries as a batch.
pub(crate) async fn execute_batch(
session: &Arc<CassandraSession>,
queries: Vec<Self>,
) -> FallibleQueryResults {
for q in &queries {
session
.caches()
.rbac_stake_address()
.insert(q.stake_address.clone().into(), q.catalyst_id.clone().into());
}

session
.execute_batch(PreparedQuery::CatalystIdForStakeAddressInsertQuery, queries)
.await
}

/// Prepare Batch of RBAC Registration Index Data Queries
pub(crate) async fn prepare_batch(
session: &Arc<Session>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use tracing::error;

use crate::{
db::{
index::queries::{PreparedQueries, SizedBatch},
index::{
queries::{FallibleQueryResults, PreparedQueries, PreparedQuery, SizedBatch},
session::CassandraSession,
},
types::{DbCatalystId, DbSlot, DbTransactionId},
},
settings::cassandra_db::EnvVars,
Expand Down Expand Up @@ -56,6 +59,16 @@ impl Params {
}
}

/// Executes prepared queries as a batch.
pub(crate) async fn execute_batch(
session: &Arc<CassandraSession>,
queries: Vec<Self>,
) -> FallibleQueryResults {
session
.execute_batch(PreparedQuery::CatalystIdForTxnIdInsertQuery, queries)
.await
}

/// Prepares a Batch of RBAC Registration Index Data Queries.
pub(crate) async fn prepare_batch(
session: &Arc<Session>,
Expand Down
30 changes: 17 additions & 13 deletions catalyst-gateway/bin/src/db/index/block/rbac509/insert_rbac509.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
//! Insert RBAC 509 Registration Query.

use std::{collections::HashSet, fmt::Debug, sync::Arc};
use std::{fmt::Debug, sync::Arc};

use cardano_chain_follower::{Slot, StakeAddress, TxnIndex, hashes::TransactionId};
use cardano_chain_follower::{Slot, TxnIndex, hashes::TransactionId};
use catalyst_types::{catalyst_id::CatalystId, uuid::UuidV4};
use scylla::{SerializeRow, client::session::Session, value::MaybeUnset};
use tracing::error;

use crate::{
db::{
index::queries::{PreparedQueries, SizedBatch},
types::{DbCatalystId, DbSlot, DbStakeAddress, DbTransactionId, DbTxnIndex, DbUuidV4},
index::{
queries::{FallibleQueryResults, PreparedQueries, PreparedQuery, SizedBatch},
session::CassandraSession,
},
types::{DbCatalystId, DbSlot, DbTransactionId, DbTxnIndex, DbUuidV4},
},
settings::cassandra_db::EnvVars,
};
Expand All @@ -31,8 +34,6 @@ pub(crate) struct Params {
txn_id: DbTransactionId,
/// Hash of Previous Transaction. Is `None` for the first registration. 32 Bytes.
prv_txn_id: MaybeUnset<DbTransactionId>,
/// A set of removed stake addresses.
removed_stake_addresses: HashSet<DbStakeAddress>,
/// A registration purpose.
///
/// The value of purpose is `None` if the chain is modified by the registration
Expand All @@ -55,7 +56,6 @@ impl Debug for Params {
.field("slot_no", &self.slot_no)
.field("txn_index", &self.txn_index)
.field("prv_txn_id", &prv_txn_id)
.field("removed_stake_addresses", &self.removed_stake_addresses)
.field("purpose", &self.purpose)
.finish()
}
Expand All @@ -69,14 +69,9 @@ impl Params {
slot_no: Slot,
txn_index: TxnIndex,
prv_txn_id: Option<TransactionId>,
removed_stake_addresses: HashSet<StakeAddress>,
purpose: Option<UuidV4>,
) -> Self {
let prv_txn_id = prv_txn_id.map_or(MaybeUnset::Unset, |v| MaybeUnset::Set(v.into()));
let removed_stake_addresses = removed_stake_addresses
.into_iter()
.map(Into::into)
.collect();
let purpose = purpose.map_or(MaybeUnset::Unset, |v| MaybeUnset::Set(v.into()));

Self {
Expand All @@ -85,11 +80,20 @@ impl Params {
slot_no: slot_no.into(),
txn_index: txn_index.into(),
prv_txn_id,
removed_stake_addresses,
purpose,
}
}

/// Executes prepared queries as a batch.
pub(crate) async fn execute_batch(
session: &Arc<CassandraSession>,
queries: Vec<Self>,
) -> FallibleQueryResults {
session
.execute_batch(PreparedQuery::Rbac509InsertQuery, queries)
.await
}

/// Prepare Batch of RBAC Registration Index Data Queries
pub(crate) async fn prepare_batch(
session: &Arc<Session>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use tracing::error;

use crate::{
db::{
index::queries::{PreparedQueries, SizedBatch},
index::{
queries::{FallibleQueryResults, PreparedQueries, PreparedQuery, SizedBatch},
session::CassandraSession,
},
types::{DbCatalystId, DbSlot, DbTransactionId, DbTxnIndex, DbUuidV4},
},
service::common::objects::generic::problem_report::ProblemReport,
Expand Down Expand Up @@ -65,6 +68,16 @@ impl Params {
}
}

/// Executes prepared queries as a batch.
pub(crate) async fn execute_batch(
session: &Arc<CassandraSession>,
queries: Vec<Self>,
) -> FallibleQueryResults {
session
.execute_batch(PreparedQuery::Rbac509InvalidInsertQuery, queries)
.await
}

/// Prepare Batch of RBAC Registration Index Data Queries
pub(crate) async fn prepare_batch(
session: &Arc<Session>,
Expand Down
54 changes: 20 additions & 34 deletions catalyst-gateway/bin/src/db/index/block/rbac509/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@ pub(crate) mod insert_catalyst_id_for_txn_id;
pub(crate) mod insert_rbac509;
pub(crate) mod insert_rbac509_invalid;

use std::{
collections::{BTreeSet, HashSet},
sync::Arc,
};
use std::{collections::BTreeSet, sync::Arc};

use anyhow::{Context, Result};
use cardano_chain_follower::{MultiEraBlock, Slot, TxnIndex, hashes::TransactionId};
Expand All @@ -20,7 +17,7 @@ use tracing::{debug, error};

use crate::{
db::index::{
queries::{FallibleQueryTasks, PreparedQuery, SizedBatch},
queries::{FallibleQueryTasks, SizedBatch},
session::CassandraSession,
},
metrics::caches::rbac::{inc_index_sync, inc_invalid_rbac_reg_count},
Expand Down Expand Up @@ -176,14 +173,11 @@ impl Rbac509InsertQuery {
slot,
index,
previous_transaction,
// Addresses can only be removed from other chains, so this list is always
// empty for the chain that is being updated.
HashSet::new(),
purpose,
));

// Update other chains that were affected by this registration.
for (catalyst_id, removed_addresses) in modified_chains {
for (catalyst_id, _) in modified_chains {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can update RbacValidationSuccess::modified_chains because we no longer need the stake addresses there. Am I missing something?

self.registrations.push(insert_rbac509::Params::new(
catalyst_id.clone(),
txn_hash,
Expand All @@ -193,7 +187,6 @@ impl Rbac509InsertQuery {
// make sense to include a previous transaction ID unrelated to the chain
// that is being updated.
None,
removed_addresses,
None,
));
}
Expand Down Expand Up @@ -254,54 +247,47 @@ impl Rbac509InsertQuery {
if !self.registrations.is_empty() {
let inner_session = session.clone();
query_handles.push(tokio::spawn(async move {
inner_session
.execute_batch(PreparedQuery::Rbac509InsertQuery, self.registrations)
.await
insert_rbac509::Params::execute_batch(&inner_session, self.registrations).await
}));
}

if !self.invalid.is_empty() {
let inner_session = session.clone();
query_handles.push(tokio::spawn(async move {
inner_session
.execute_batch(PreparedQuery::Rbac509InvalidInsertQuery, self.invalid)
.await
insert_rbac509_invalid::Params::execute_batch(&inner_session, self.invalid).await
}));
}

if !self.catalyst_id_for_txn_id.is_empty() {
let inner_session = session.clone();
query_handles.push(tokio::spawn(async move {
inner_session
.execute_batch(
PreparedQuery::CatalystIdForTxnIdInsertQuery,
self.catalyst_id_for_txn_id,
)
.await
insert_catalyst_id_for_txn_id::Params::execute_batch(
&inner_session,
self.catalyst_id_for_txn_id,
)
.await
}));
}

if !self.catalyst_id_for_stake_address.is_empty() {
let inner_session = session.clone();
query_handles.push(tokio::spawn(async move {
inner_session
.execute_batch(
PreparedQuery::CatalystIdForStakeAddressInsertQuery,
self.catalyst_id_for_stake_address,
)
.await
insert_catalyst_id_for_stake_address::Params::execute_batch(
&inner_session,
self.catalyst_id_for_stake_address,
)
.await
}));
}

if !self.catalyst_id_for_public_key.is_empty() {
let inner_session = session.clone();
query_handles.push(tokio::spawn(async move {
inner_session
.execute_batch(
PreparedQuery::CatalystIdForPublicKeyInsertQuery,
self.catalyst_id_for_public_key,
)
.await
insert_catalyst_id_for_public_key::Params::execute_batch(
&inner_session,
self.catalyst_id_for_public_key,
)
.await
}));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
SELECT txn_id,
slot_no,
txn_index,
prv_txn_id,
removed_stake_addresses
prv_txn_id
FROM rbac_registration
WHERE catalyst_id = :catalyst_id
Original file line number Diff line number Diff line change
Expand Up @@ -120,20 +120,6 @@ impl Query {
}
}

/// Adds the given value to the cache.
pub fn cache_stake_address(
is_persistent: bool,
stake_address: StakeAddress,
catalyst_id: CatalystId,
) {
CassandraSession::get(is_persistent).inspect(|session| {
session
.caches()
.rbac_stake_address()
.insert(stake_address, catalyst_id);
});
}

/// Removes all cached values.
pub fn invalidate_stake_addresses_cache(is_persistent: bool) {
CassandraSession::get(is_persistent).inspect(|session| {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Get RBAC registrations by Catalyst ID.

use std::{collections::HashSet, sync::Arc};
use std::sync::Arc;

use scylla::{
DeserializeRow, SerializeRow,
Expand All @@ -14,7 +14,7 @@ use crate::db::{
queries::{PreparedQueries, PreparedSelectQuery},
session::CassandraSession,
},
types::{DbCatalystId, DbSlot, DbStakeAddress, DbTransactionId, DbTxnIndex},
types::{DbCatalystId, DbSlot, DbTransactionId, DbTxnIndex},
};

/// Get registrations by Catalyst ID query.
Expand All @@ -40,8 +40,6 @@ pub(crate) struct Query {
/// A previous transaction id.
#[allow(dead_code)]
pub prv_txn_id: Option<DbTransactionId>,
/// A set of removed stake addresses.
pub removed_stake_addresses: HashSet<DbStakeAddress>,
}

impl Query {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ CREATE TABLE IF NOT EXISTS rbac_registration (
-- Non-Key Data
txn_id blob, -- 32 Bytes of Transaction Hash. (TransactionHash)
prv_txn_id blob, -- 32 Bytes from Previous Transaction Hash.
removed_stake_addresses set<blob>, -- A set of stake addresses that were removed by other chain.
-- The set is empty for all registrations of this chain.

purpose uuid, -- 16 Bytes of UUIDv4 Purpose.

PRIMARY KEY (catalyst_id, slot_no, txn_index)
Expand Down
Loading
Loading