diff --git a/catalyst-gateway/bin/Cargo.toml b/catalyst-gateway/bin/Cargo.toml index 94c9d4749fc7..736d9fa557c1 100644 --- a/catalyst-gateway/bin/Cargo.toml +++ b/catalyst-gateway/bin/Cargo.toml @@ -15,8 +15,8 @@ repository.workspace = true workspace = true [dependencies] -cardano-chain-follower = { version = "0.0.18", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "cardano-chain-follower/v0.0.18" } -rbac-registration = { version = "0.0.14", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "rbac-registration/v0.0.14" } +cardano-chain-follower = { version = "0.0.19", git = "https://github.com/input-output-hk/catalyst-libs.git", branch = "feat/cardano-chain-follower-bump" } +rbac-registration = { version = "0.0.14", git = "https://github.com/input-output-hk/catalyst-libs.git", branch = "feat/rbac" } catalyst-signed-doc = { version = "0.0.10", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "catalyst-signed-doc/v0.0.10" } catalyst-signed-doc-v1 = { package = "catalyst-signed-doc", version = "0.0.4", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "catalyst-signed-doc/v.0.0.4" } c509-certificate = { version = "0.0.3", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "c509-certificate-v0.0.3" } diff --git a/catalyst-gateway/bin/src/db/index/block/rbac509/cql/insert_rbac509.cql b/catalyst-gateway/bin/src/db/index/block/rbac509/cql/insert_rbac509.cql index 7e964610bf6f..a73e7020228f 100644 --- a/catalyst-gateway/bin/src/db/index/block/rbac509/cql/insert_rbac509.cql +++ b/catalyst-gateway/bin/src/db/index/block/rbac509/cql/insert_rbac509.cql @@ -5,7 +5,6 @@ INSERT INTO rbac_registration ( txn_index, txn_id, prv_txn_id, - removed_stake_addresses, purpose ) VALUES ( :catalyst_id, @@ -13,6 +12,5 @@ INSERT INTO rbac_registration ( :txn_index, :txn_id, :prv_txn_id, - :removed_stake_addresses, :purpose ); diff --git a/catalyst-gateway/bin/src/db/index/block/rbac509/insert_catalyst_id_for_public_key.rs b/catalyst-gateway/bin/src/db/index/block/rbac509/insert_catalyst_id_for_public_key.rs index e73c8d8de0da..f4d232181095 100644 --- a/catalyst-gateway/bin/src/db/index/block/rbac509/insert_catalyst_id_for_public_key.rs +++ b/catalyst-gateway/bin/src/db/index/block/rbac509/insert_catalyst_id_for_public_key.rs @@ -10,7 +10,10 @@ use tracing::error; use crate::{ db::{ - index::queries::{PreparedQueries, SizedBatch}, + index::{ + queries::{FallibleQueryResults, PreparedQueries, PreparedQuery, SizedBatch}, + session::CassandraSession, + }, types::{DbCatalystId, DbPublicKey, DbSlot}, }, settings::cassandra_db::EnvVars, @@ -57,6 +60,16 @@ impl Params { } } + /// Executes prepared queries as a batch. + pub(crate) async fn execute_batch( + session: &Arc, + queries: Vec, + ) -> FallibleQueryResults { + session + .execute_batch(PreparedQuery::CatalystIdForPublicKeyInsertQuery, queries) + .await + } + /// Prepares a batch of queries. pub(crate) async fn prepare_batch( session: &Arc, diff --git a/catalyst-gateway/bin/src/db/index/block/rbac509/insert_catalyst_id_for_stake_address.rs b/catalyst-gateway/bin/src/db/index/block/rbac509/insert_catalyst_id_for_stake_address.rs index 2a7162f056f5..0a29c10fb4eb 100644 --- a/catalyst-gateway/bin/src/db/index/block/rbac509/insert_catalyst_id_for_stake_address.rs +++ b/catalyst-gateway/bin/src/db/index/block/rbac509/insert_catalyst_id_for_stake_address.rs @@ -9,7 +9,10 @@ use tracing::error; use crate::{ db::{ - index::queries::{PreparedQueries, SizedBatch}, + index::{ + queries::{FallibleQueryResults, PreparedQueries, PreparedQuery, SizedBatch}, + session::CassandraSession, + }, types::{DbCatalystId, DbSlot, DbStakeAddress, DbTxnIndex}, }, settings::cassandra_db::EnvVars, @@ -61,6 +64,23 @@ impl Params { } } + /// Executes prepared queries as a batch. + pub(crate) async fn execute_batch( + session: &Arc, + queries: Vec, + ) -> FallibleQueryResults { + for q in &queries { + session + .caches() + .rbac_stake_address() + .insert(q.stake_address.clone().into(), q.catalyst_id.clone().into()); + } + + session + .execute_batch(PreparedQuery::CatalystIdForStakeAddressInsertQuery, queries) + .await + } + /// Prepare Batch of RBAC Registration Index Data Queries pub(crate) async fn prepare_batch( session: &Arc, diff --git a/catalyst-gateway/bin/src/db/index/block/rbac509/insert_catalyst_id_for_txn_id.rs b/catalyst-gateway/bin/src/db/index/block/rbac509/insert_catalyst_id_for_txn_id.rs index 72b0231327f4..ce561e0cc7d4 100644 --- a/catalyst-gateway/bin/src/db/index/block/rbac509/insert_catalyst_id_for_txn_id.rs +++ b/catalyst-gateway/bin/src/db/index/block/rbac509/insert_catalyst_id_for_txn_id.rs @@ -9,7 +9,10 @@ use tracing::error; use crate::{ db::{ - index::queries::{PreparedQueries, SizedBatch}, + index::{ + queries::{FallibleQueryResults, PreparedQueries, PreparedQuery, SizedBatch}, + session::CassandraSession, + }, types::{DbCatalystId, DbSlot, DbTransactionId}, }, settings::cassandra_db::EnvVars, @@ -56,6 +59,16 @@ impl Params { } } + /// Executes prepared queries as a batch. + pub(crate) async fn execute_batch( + session: &Arc, + queries: Vec, + ) -> FallibleQueryResults { + session + .execute_batch(PreparedQuery::CatalystIdForTxnIdInsertQuery, queries) + .await + } + /// Prepares a Batch of RBAC Registration Index Data Queries. pub(crate) async fn prepare_batch( session: &Arc, diff --git a/catalyst-gateway/bin/src/db/index/block/rbac509/insert_rbac509.rs b/catalyst-gateway/bin/src/db/index/block/rbac509/insert_rbac509.rs index ff9577e7d18f..1981df9c4449 100644 --- a/catalyst-gateway/bin/src/db/index/block/rbac509/insert_rbac509.rs +++ b/catalyst-gateway/bin/src/db/index/block/rbac509/insert_rbac509.rs @@ -1,16 +1,19 @@ //! Insert RBAC 509 Registration Query. -use std::{collections::HashSet, fmt::Debug, sync::Arc}; +use std::{fmt::Debug, sync::Arc}; -use cardano_chain_follower::{Slot, StakeAddress, TxnIndex, hashes::TransactionId}; +use cardano_chain_follower::{Slot, TxnIndex, hashes::TransactionId}; use catalyst_types::{catalyst_id::CatalystId, uuid::UuidV4}; use scylla::{SerializeRow, client::session::Session, value::MaybeUnset}; use tracing::error; use crate::{ db::{ - index::queries::{PreparedQueries, SizedBatch}, - types::{DbCatalystId, DbSlot, DbStakeAddress, DbTransactionId, DbTxnIndex, DbUuidV4}, + index::{ + queries::{FallibleQueryResults, PreparedQueries, PreparedQuery, SizedBatch}, + session::CassandraSession, + }, + types::{DbCatalystId, DbSlot, DbTransactionId, DbTxnIndex, DbUuidV4}, }, settings::cassandra_db::EnvVars, }; @@ -31,8 +34,6 @@ pub(crate) struct Params { txn_id: DbTransactionId, /// Hash of Previous Transaction. Is `None` for the first registration. 32 Bytes. prv_txn_id: MaybeUnset, - /// A set of removed stake addresses. - removed_stake_addresses: HashSet, /// A registration purpose. /// /// The value of purpose is `None` if the chain is modified by the registration @@ -55,7 +56,6 @@ impl Debug for Params { .field("slot_no", &self.slot_no) .field("txn_index", &self.txn_index) .field("prv_txn_id", &prv_txn_id) - .field("removed_stake_addresses", &self.removed_stake_addresses) .field("purpose", &self.purpose) .finish() } @@ -69,14 +69,9 @@ impl Params { slot_no: Slot, txn_index: TxnIndex, prv_txn_id: Option, - removed_stake_addresses: HashSet, purpose: Option, ) -> Self { let prv_txn_id = prv_txn_id.map_or(MaybeUnset::Unset, |v| MaybeUnset::Set(v.into())); - let removed_stake_addresses = removed_stake_addresses - .into_iter() - .map(Into::into) - .collect(); let purpose = purpose.map_or(MaybeUnset::Unset, |v| MaybeUnset::Set(v.into())); Self { @@ -85,11 +80,20 @@ impl Params { slot_no: slot_no.into(), txn_index: txn_index.into(), prv_txn_id, - removed_stake_addresses, purpose, } } + /// Executes prepared queries as a batch. + pub(crate) async fn execute_batch( + session: &Arc, + queries: Vec, + ) -> FallibleQueryResults { + session + .execute_batch(PreparedQuery::Rbac509InsertQuery, queries) + .await + } + /// Prepare Batch of RBAC Registration Index Data Queries pub(crate) async fn prepare_batch( session: &Arc, diff --git a/catalyst-gateway/bin/src/db/index/block/rbac509/insert_rbac509_invalid.rs b/catalyst-gateway/bin/src/db/index/block/rbac509/insert_rbac509_invalid.rs index 5f14393e5cb5..c5e5039945a7 100644 --- a/catalyst-gateway/bin/src/db/index/block/rbac509/insert_rbac509_invalid.rs +++ b/catalyst-gateway/bin/src/db/index/block/rbac509/insert_rbac509_invalid.rs @@ -10,7 +10,10 @@ use tracing::error; use crate::{ db::{ - index::queries::{PreparedQueries, SizedBatch}, + index::{ + queries::{FallibleQueryResults, PreparedQueries, PreparedQuery, SizedBatch}, + session::CassandraSession, + }, types::{DbCatalystId, DbSlot, DbTransactionId, DbTxnIndex, DbUuidV4}, }, service::common::objects::generic::problem_report::ProblemReport, @@ -65,6 +68,16 @@ impl Params { } } + /// Executes prepared queries as a batch. + pub(crate) async fn execute_batch( + session: &Arc, + queries: Vec, + ) -> FallibleQueryResults { + session + .execute_batch(PreparedQuery::Rbac509InvalidInsertQuery, queries) + .await + } + /// Prepare Batch of RBAC Registration Index Data Queries pub(crate) async fn prepare_batch( session: &Arc, diff --git a/catalyst-gateway/bin/src/db/index/block/rbac509/mod.rs b/catalyst-gateway/bin/src/db/index/block/rbac509/mod.rs index 37031e25d56e..10954e348db7 100644 --- a/catalyst-gateway/bin/src/db/index/block/rbac509/mod.rs +++ b/catalyst-gateway/bin/src/db/index/block/rbac509/mod.rs @@ -7,27 +7,29 @@ pub(crate) mod insert_rbac509; pub(crate) mod insert_rbac509_invalid; use std::{ - collections::{BTreeSet, HashSet}, + collections::{BTreeSet, HashMap, HashSet}, sync::Arc, }; -use anyhow::{Context, Result}; -use cardano_chain_follower::{MultiEraBlock, Slot, TxnIndex, hashes::TransactionId}; -use rbac_registration::cardano::cip509::Cip509; +use anyhow::Context; +use cardano_chain_follower::{MultiEraBlock, Slot, StakeAddress, TxnIndex, hashes::TransactionId}; +use catalyst_types::{catalyst_id::CatalystId, problem_report::ProblemReport, uuid::UuidV4}; +use ed25519_dalek::VerifyingKey; +use rbac_registration::{ + cardano::{cip509::Cip509, state::RbacChainsState as _}, + registration::cardano::RegistrationChain, +}; use scylla::client::session::Session; use tokio::sync::watch; use tracing::{debug, error}; use crate::{ db::index::{ - queries::{FallibleQueryTasks, PreparedQuery, SizedBatch}, + queries::{FallibleQueryTasks, SizedBatch}, session::CassandraSession, }, metrics::caches::rbac::{inc_index_sync, inc_invalid_rbac_reg_count}, - rbac::{ - RbacBlockIndexingContext, RbacValidationError, RbacValidationSuccess, - validate_rbac_registration, - }, + rbac::{RbacBlockIndexingContext, cache_persistent_rbac_chain, state::RbacChainsState}, settings::cassandra_db::EnvVars, }; @@ -62,7 +64,7 @@ impl Rbac509InsertQuery { pub(crate) async fn prepare_batch( session: &Arc, cfg: &EnvVars, - ) -> Result<(SizedBatch, SizedBatch, SizedBatch, SizedBatch, SizedBatch)> { + ) -> anyhow::Result<(SizedBatch, SizedBatch, SizedBatch, SizedBatch, SizedBatch)> { Ok(( insert_rbac509::Params::prepare_batch(session, cfg).await?, insert_rbac509_invalid::Params::prepare_batch(session, cfg).await?, @@ -73,7 +75,6 @@ impl Rbac509InsertQuery { } /// Index the RBAC 509 registrations in a transaction. - #[allow(clippy::too_many_lines)] pub(crate) async fn index( &mut self, txn_hash: TransactionId, @@ -82,7 +83,7 @@ impl Rbac509InsertQuery { pending_blocks: &mut watch::Receiver>, our_end: Slot, context: &mut RbacBlockIndexingContext, - ) -> Result<()> { + ) -> anyhow::Result<()> { let slot = block.slot(); let cip509 = match Cip509::new(block, index, &[]) { Ok(Some(v)) => v, @@ -122,126 +123,269 @@ impl Rbac509InsertQuery { // here we are going to wait till the other tasks have indexed the blocks before this one. wait_for_previous_blocks(pending_blocks, our_end, block.slot()).await?; - let previous_transaction = cip509.previous_transaction(); - // `Box::pin` is used here because of the future size (`clippy::large_futures` lint). - match Box::pin(validate_rbac_registration( - cip509, - block.is_immutable(), + if let Some(previous_txn) = cip509.previous_transaction() { + self.try_record_chain_update(&cip509, previous_txn, block.is_immutable(), context) + .await?; + } else { + self.try_record_new_chain(&cip509, block.is_immutable(), context) + .await?; + } + + Ok(()) + } + + /// Validating the provided registration as an update for an existing chain, preparing + /// corresponding updates + async fn try_record_chain_update( + &mut self, + reg: &Cip509, + previous_txn: TransactionId, + is_persistent: bool, + context: &mut RbacBlockIndexingContext, + ) -> anyhow::Result<()> { + // Find a chain this registration belongs to. + let state = RbacChainsState::new(is_persistent, context); + + let slot = reg.origin().point().slot_or_default(); + let txn_index = reg.origin().txn_index(); + let txn_hash = reg.txn_hash(); + + let Some(catalyst_id) = state.catalyst_id_from_txn_id(previous_txn).await? else { + // This isn't a hard error because user input can contain invalid information. + // If there is no Catalyst ID, then we cannot record this + // registration as invalid and can only ignore (and log) it. + debug!( + slot = ?slot, + txn_index = ?txn_index, + txn_hash = ?txn_hash, + "Unable to determine Catalyst id for registration" + ); + return Ok(()); + }; + + let chain = state.chain(&catalyst_id).await?.context(format!( + "{catalyst_id} is present in 'catalyst_id_for_txn_id' table, but not in 'rbac_registration'" + ))?; + + // Try to add a new registration to the chain. + let Some(new_chain) = chain.update(reg, &state).await? else { + self.record_invalid_registration( + txn_hash, + txn_index, + slot, + chain.catalyst_id().clone(), + Some(previous_txn), + reg.purpose(), + reg.report().clone(), + ); + + return Ok(()); + }; + + let previous_addresses = chain.stake_addresses(); + let stake_addresses: HashSet<_> = new_chain + .stake_addresses() + .difference(&previous_addresses) + .cloned() + .collect(); + let public_keys = reg + .all_roles() + .iter() + .filter_map(|v| reg.signing_public_key_for_role(*v)) + .collect::>(); + let modified_chains = state.consume(); + let purpose = reg.purpose(); + + if is_persistent { + cache_persistent_rbac_chain(catalyst_id.clone(), new_chain); + } + + self.record_valid_registration( + txn_hash, + txn_index, + slot, + catalyst_id.clone(), + Some(previous_txn), + stake_addresses, + public_keys, + modified_chains, + purpose, context, - )) - .await - { - // Write updates to the database. There can be multiple updates in one registration - // because a new chain can take ownership of stake addresses of the existing chains and - // in that case we want to record changes to all those chains as well as the new one. - Ok(RbacValidationSuccess { - catalyst_id, - stake_addresses, - public_keys, - modified_chains, - purpose, - }) => { - // Record the transaction identifier (hash) of a new registration. - self.catalyst_id_for_txn_id - .push(insert_catalyst_id_for_txn_id::Params::new( - catalyst_id.clone(), - txn_hash, - slot, - )); - // Record new stake addresses. - for address in stake_addresses { - self.catalyst_id_for_stake_address.push( - insert_catalyst_id_for_stake_address::Params::new( - address, - slot, - index, - catalyst_id.clone(), - ), - ); - } - // Record new public keys. - for key in public_keys { - self.catalyst_id_for_public_key.push( - insert_catalyst_id_for_public_key::Params::new( - key, - slot, - catalyst_id.clone(), - ), - ); - } - // Update the chain this registration belongs to. - self.registrations.push(insert_rbac509::Params::new( - catalyst_id, - txn_hash, - slot, - index, - previous_transaction, - // Addresses can only be removed from other chains, so this list is always - // empty for the chain that is being updated. - HashSet::new(), - purpose, - )); + ); - // Update other chains that were affected by this registration. - for (catalyst_id, removed_addresses) in modified_chains { - self.registrations.push(insert_rbac509::Params::new( - catalyst_id.clone(), - txn_hash, - slot, - index, - // In this case the addresses were removed by another chain, so it doesn't - // make sense to include a previous transaction ID unrelated to the chain - // that is being updated. - None, - removed_addresses, - None, - )); - } - }, - // Invalid registrations are being recorded in order to report failure. - Err(RbacValidationError::InvalidRegistration { - catalyst_id, - purpose, - report, - }) => { - inc_invalid_rbac_reg_count(); - self.invalid.push(insert_rbac509_invalid::Params::new( - catalyst_id, + Ok(()) + } + + /// Validating the provided registration as a beginning of a new chain, preparing + /// corresponding updates + async fn try_record_new_chain( + &mut self, + reg: &Cip509, + is_persistent: bool, + context: &mut RbacBlockIndexingContext, + ) -> anyhow::Result<()> { + let mut state = RbacChainsState::new(is_persistent, context); + + let slot = reg.origin().point().slot_or_default(); + let txn_index = reg.origin().txn_index(); + let txn_hash = reg.txn_hash(); + + // Try to start a new chain. + let Some(new_chain) = RegistrationChain::new(reg, &mut state).await? else { + if let Some(cat_id) = reg.catalyst_id() { + self.record_invalid_registration( txn_hash, + txn_index, slot, - index, - purpose, - previous_transaction, - report, - )); - }, - // This isn't a hard error because user input can contain invalid information. If there - // is no Catalyst ID, then we cannot record this registration as invalid and can only - // ignore (and log) it. - Err(RbacValidationError::UnknownCatalystId) => { + cat_id.clone(), + None, + reg.purpose(), + reg.report().clone(), + ); + } else { + // This isn't a hard error because user input can contain invalid information. + // If there is no Catalyst ID, then we cannot record this + // registration as invalid and can only ignore (and log) it. debug!( slot = ?slot, - index = ?index, + txn_index = ?txn_index, txn_hash = ?txn_hash, "Unable to determine Catalyst id for registration" ); - }, - Err(RbacValidationError::Fatal(e)) => { - error!( - slot = ?slot, - index = ?index, - txn_hash = ?txn_hash, - err = ?e, - "Error indexing RBAC registration" - ); - // Propagate an error. - return Err(e); - }, - } + } + + return Ok(()); + }; + + let catalyst_id = new_chain.catalyst_id(); + let stake_addresses = new_chain.stake_addresses(); + let public_keys = reg + .all_roles() + .iter() + .filter_map(|v| reg.signing_public_key_for_role(*v)) + .collect::>(); + let modified_chains = state.consume(); + let purpose = reg.purpose(); + + self.record_valid_registration( + txn_hash, + txn_index, + slot, + catalyst_id.clone(), + None, + stake_addresses, + public_keys, + modified_chains, + purpose, + context, + ); Ok(()) } + /// Making corresponding records according to the valid registration + #[allow(clippy::too_many_arguments)] + fn record_valid_registration( + &mut self, + txn_hash: TransactionId, + index: TxnIndex, + slot: Slot, + catalyst_id: CatalystId, + previous_transaction: Option, + stake_addresses: HashSet, + public_keys: HashSet, + modified_chains: HashMap>, + purpose: Option, + context: &mut RbacBlockIndexingContext, + ) { + context.insert_transaction(txn_hash, catalyst_id.clone()); + context.insert_addresses(stake_addresses.clone(), &catalyst_id); + context.insert_public_keys(public_keys.clone(), &catalyst_id); + context.insert_registration( + catalyst_id.clone(), + txn_hash, + slot, + index, + previous_transaction, + ); + + // Record the transaction identifier (hash) of a new registration. + self.catalyst_id_for_txn_id + .push(insert_catalyst_id_for_txn_id::Params::new( + catalyst_id.clone(), + txn_hash, + slot, + )); + // Record new stake addresses. + for address in stake_addresses { + self.catalyst_id_for_stake_address.push( + insert_catalyst_id_for_stake_address::Params::new( + address, + slot, + index, + catalyst_id.clone(), + ), + ); + } + // Record new public keys. + for key in public_keys { + self.catalyst_id_for_public_key + .push(insert_catalyst_id_for_public_key::Params::new( + key, + slot, + catalyst_id.clone(), + )); + } + // Update the chain this registration belongs to. + self.registrations.push(insert_rbac509::Params::new( + catalyst_id, + txn_hash, + slot, + index, + previous_transaction, + purpose, + )); + + // Update other chains that were affected by this registration. + for (catalyst_id, _) in modified_chains { + self.registrations.push(insert_rbac509::Params::new( + catalyst_id.clone(), + txn_hash, + slot, + index, + // In this case the addresses were removed by another chain, so it doesn't + // make sense to include a previous transaction ID unrelated to the chain + // that is being updated. + None, + None, + )); + } + } + + /// Making corresponding records according to the valid registration + #[allow(clippy::too_many_arguments)] + fn record_invalid_registration( + &mut self, + txn_hash: TransactionId, + index: TxnIndex, + slot: Slot, + catalyst_id: CatalystId, + previous_transaction: Option, + purpose: Option, + report: ProblemReport, + ) { + inc_invalid_rbac_reg_count(); + self.invalid.push(insert_rbac509_invalid::Params::new( + catalyst_id, + txn_hash, + slot, + index, + purpose, + previous_transaction, + report, + )); + } + /// Execute the RBAC 509 Registration Indexing Queries. /// /// Consumes the `self` and returns a vector of futures. @@ -254,54 +398,47 @@ impl Rbac509InsertQuery { if !self.registrations.is_empty() { let inner_session = session.clone(); query_handles.push(tokio::spawn(async move { - inner_session - .execute_batch(PreparedQuery::Rbac509InsertQuery, self.registrations) - .await + insert_rbac509::Params::execute_batch(&inner_session, self.registrations).await })); } if !self.invalid.is_empty() { let inner_session = session.clone(); query_handles.push(tokio::spawn(async move { - inner_session - .execute_batch(PreparedQuery::Rbac509InvalidInsertQuery, self.invalid) - .await + insert_rbac509_invalid::Params::execute_batch(&inner_session, self.invalid).await })); } if !self.catalyst_id_for_txn_id.is_empty() { let inner_session = session.clone(); query_handles.push(tokio::spawn(async move { - inner_session - .execute_batch( - PreparedQuery::CatalystIdForTxnIdInsertQuery, - self.catalyst_id_for_txn_id, - ) - .await + insert_catalyst_id_for_txn_id::Params::execute_batch( + &inner_session, + self.catalyst_id_for_txn_id, + ) + .await })); } if !self.catalyst_id_for_stake_address.is_empty() { let inner_session = session.clone(); query_handles.push(tokio::spawn(async move { - inner_session - .execute_batch( - PreparedQuery::CatalystIdForStakeAddressInsertQuery, - self.catalyst_id_for_stake_address, - ) - .await + insert_catalyst_id_for_stake_address::Params::execute_batch( + &inner_session, + self.catalyst_id_for_stake_address, + ) + .await })); } if !self.catalyst_id_for_public_key.is_empty() { let inner_session = session.clone(); query_handles.push(tokio::spawn(async move { - inner_session - .execute_batch( - PreparedQuery::CatalystIdForPublicKeyInsertQuery, - self.catalyst_id_for_public_key, - ) - .await + insert_catalyst_id_for_public_key::Params::execute_batch( + &inner_session, + self.catalyst_id_for_public_key, + ) + .await })); } @@ -316,7 +453,7 @@ async fn wait_for_previous_blocks( pending_blocks: &mut watch::Receiver>, our_end: Slot, current_slot: Slot, -) -> Result<()> { +) -> anyhow::Result<()> { loop { if pending_blocks .borrow_and_update() diff --git a/catalyst-gateway/bin/src/db/index/queries/cql/get_rbac_registrations_catalyst_id.cql b/catalyst-gateway/bin/src/db/index/queries/cql/get_rbac_registrations_catalyst_id.cql index d506a527bb2d..51ecd49c3ebc 100644 --- a/catalyst-gateway/bin/src/db/index/queries/cql/get_rbac_registrations_catalyst_id.cql +++ b/catalyst-gateway/bin/src/db/index/queries/cql/get_rbac_registrations_catalyst_id.cql @@ -1,7 +1,6 @@ SELECT txn_id, slot_no, txn_index, - prv_txn_id, - removed_stake_addresses + prv_txn_id FROM rbac_registration WHERE catalyst_id = :catalyst_id diff --git a/catalyst-gateway/bin/src/db/index/queries/rbac/get_catalyst_id_from_stake_address.rs b/catalyst-gateway/bin/src/db/index/queries/rbac/get_catalyst_id_from_stake_address.rs index bf5b37466d38..333e8037f45d 100644 --- a/catalyst-gateway/bin/src/db/index/queries/rbac/get_catalyst_id_from_stake_address.rs +++ b/catalyst-gateway/bin/src/db/index/queries/rbac/get_catalyst_id_from_stake_address.rs @@ -120,20 +120,6 @@ impl Query { } } -/// Adds the given value to the cache. -pub fn cache_stake_address( - is_persistent: bool, - stake_address: StakeAddress, - catalyst_id: CatalystId, -) { - CassandraSession::get(is_persistent).inspect(|session| { - session - .caches() - .rbac_stake_address() - .insert(stake_address, catalyst_id); - }); -} - /// Removes all cached values. pub fn invalidate_stake_addresses_cache(is_persistent: bool) { CassandraSession::get(is_persistent).inspect(|session| { diff --git a/catalyst-gateway/bin/src/db/index/queries/rbac/get_rbac_registrations.rs b/catalyst-gateway/bin/src/db/index/queries/rbac/get_rbac_registrations.rs index 774f34b9f674..394526fb8ca1 100644 --- a/catalyst-gateway/bin/src/db/index/queries/rbac/get_rbac_registrations.rs +++ b/catalyst-gateway/bin/src/db/index/queries/rbac/get_rbac_registrations.rs @@ -1,6 +1,6 @@ //! Get RBAC registrations by Catalyst ID. -use std::{collections::HashSet, sync::Arc}; +use std::sync::Arc; use scylla::{ DeserializeRow, SerializeRow, @@ -14,7 +14,7 @@ use crate::db::{ queries::{PreparedQueries, PreparedSelectQuery}, session::CassandraSession, }, - types::{DbCatalystId, DbSlot, DbStakeAddress, DbTransactionId, DbTxnIndex}, + types::{DbCatalystId, DbSlot, DbTransactionId, DbTxnIndex}, }; /// Get registrations by Catalyst ID query. @@ -40,8 +40,6 @@ pub(crate) struct Query { /// A previous transaction id. #[allow(dead_code)] pub prv_txn_id: Option, - /// A set of removed stake addresses. - pub removed_stake_addresses: HashSet, } impl Query { diff --git a/catalyst-gateway/bin/src/db/index/schema/cql/rbac_registration.cql b/catalyst-gateway/bin/src/db/index/schema/cql/rbac_registration.cql index 1319dbcb01d7..6cefb1e5125b 100644 --- a/catalyst-gateway/bin/src/db/index/schema/cql/rbac_registration.cql +++ b/catalyst-gateway/bin/src/db/index/schema/cql/rbac_registration.cql @@ -8,8 +8,7 @@ CREATE TABLE IF NOT EXISTS rbac_registration ( -- Non-Key Data txn_id blob, -- 32 Bytes of Transaction Hash. (TransactionHash) prv_txn_id blob, -- 32 Bytes from Previous Transaction Hash. - removed_stake_addresses set, -- A set of stake addresses that were removed by other chain. - -- The set is empty for all registrations of this chain. + purpose uuid, -- 16 Bytes of UUIDv4 Purpose. PRIMARY KEY (catalyst_id, slot_no, txn_index) diff --git a/catalyst-gateway/bin/src/db/index/schema/mod.rs b/catalyst-gateway/bin/src/db/index/schema/mod.rs index ae185d40b846..db25b512a90a 100644 --- a/catalyst-gateway/bin/src/db/index/schema/mod.rs +++ b/catalyst-gateway/bin/src/db/index/schema/mod.rs @@ -274,7 +274,7 @@ mod tests { /// This constant is ONLY used by Unit tests to identify when the schema version will /// change accidentally, and is NOT to be used directly to set the schema version of /// the table namespaces. - const SCHEMA_VERSION: &str = "69e28bc1-be89-8407-83dc-9c4cc408d3a9"; + const SCHEMA_VERSION: &str = "d44d83ad-9b11-81aa-9158-668738bb3f67"; #[test] /// This test is designed to fail if the schema version has changed. diff --git a/catalyst-gateway/bin/src/db/index/tests/scylla_purge.rs b/catalyst-gateway/bin/src/db/index/tests/scylla_purge.rs index 40335acc8263..3ac360d4d385 100644 --- a/catalyst-gateway/bin/src/db/index/tests/scylla_purge.rs +++ b/catalyst-gateway/bin/src/db/index/tests/scylla_purge.rs @@ -4,9 +4,7 @@ // cSpell:ignoreRegExp cardano/Fftx -use std::collections::HashSet; - -use cardano_chain_follower::{Network, StakeAddress, VotingPubKey, hashes::TransactionId}; +use cardano_chain_follower::{VotingPubKey, hashes::TransactionId}; use catalyst_types::{problem_report::ProblemReport, uuid::UuidV4}; use ed25519_dalek::VerifyingKey; use futures::StreamExt; @@ -258,7 +256,6 @@ async fn rbac509_registration() { 0.into(), 0.into(), None, - HashSet::new(), None, ), rbac509::insert_rbac509::Params::new( @@ -269,15 +266,6 @@ async fn rbac509_registration() { 1.into(), 1.into(), Some(TransactionId::new(&[3])), - [StakeAddress::new( - Network::Mainnet, - false, - "276fd18711931e2c0e21430192dbeac0e458093cd9d1fcd7210f64b3" - .parse() - .unwrap(), - )] - .into_iter() - .collect(), Some(UuidV4::new()), ), ]; diff --git a/catalyst-gateway/bin/src/db/types/stake_address.rs b/catalyst-gateway/bin/src/db/types/stake_address.rs index e76ca150b5de..32a6d7418ac7 100644 --- a/catalyst-gateway/bin/src/db/types/stake_address.rs +++ b/catalyst-gateway/bin/src/db/types/stake_address.rs @@ -26,6 +26,12 @@ impl From for DbStakeAddress { } } +impl From for StakeAddress { + fn from(value: DbStakeAddress) -> Self { + value.0 + } +} + impl Display for DbStakeAddress { fn fmt( &self, diff --git a/catalyst-gateway/bin/src/rbac/get_chain.rs b/catalyst-gateway/bin/src/rbac/get_chain.rs index 695bd706eb41..55e167ca5457 100644 --- a/catalyst-gateway/bin/src/rbac/get_chain.rs +++ b/catalyst-gateway/bin/src/rbac/get_chain.rs @@ -99,11 +99,7 @@ pub async fn build_rbac_chain( let Some(root) = regs.next() else { return Ok(None); }; - if !root.removed_stake_addresses.is_empty() { - // This set contains addresses that were removed from the chain. It is impossible to - // remove an address before the chain was even started. - bail!("The root registration shouldn't contain removed stake addresses"); - } + let root = cip509( Settings::cardano_network(), root.slot_no.into(), @@ -111,7 +107,8 @@ pub async fn build_rbac_chain( ) .await?; - let chain = RegistrationChain::new(root).context("Failed to start registration chain")?; + let chain = + RegistrationChain::new_stateless(&root).context("Failed to start registration chain")?; let chain = apply_regs(chain, regs).await?; Ok(Some(chain)) } @@ -124,14 +121,9 @@ pub async fn apply_regs( let network = Settings::cardano_network(); for reg in regs { - if !reg.removed_stake_addresses.is_empty() { - // TODO: This should be handled as a part of the - // https://github.com/input-output-hk/catalyst-voices/issues/3464 task. - continue; - } let reg = cip509(network, reg.slot_no.into(), reg.txn_index.into()).await?; chain = chain - .update(reg) + .update_stateless(®) .context("Failed to update registration chain")?; } diff --git a/catalyst-gateway/bin/src/rbac/indexing_context.rs b/catalyst-gateway/bin/src/rbac/indexing_context.rs index 9929c40e2a43..f4e97afc52ea 100644 --- a/catalyst-gateway/bin/src/rbac/indexing_context.rs +++ b/catalyst-gateway/bin/src/rbac/indexing_context.rs @@ -1,6 +1,6 @@ //! A RBAC context used during indexing. -use std::collections::{HashMap, HashSet}; +use std::collections::HashMap; use cardano_chain_follower::{Slot, StakeAddress, TxnIndex, hashes::TransactionId}; use catalyst_types::catalyst_id::CatalystId; @@ -127,7 +127,6 @@ impl RbacBlockIndexingContext { slot: Slot, txn_index: TxnIndex, prv_txn: Option, - removed_stake_addresses: HashSet, ) { use std::collections::hash_map::Entry; @@ -136,10 +135,6 @@ impl RbacBlockIndexingContext { slot_no: slot.into(), txn_index: txn_index.into(), prv_txn_id: prv_txn.map(Into::into), - removed_stake_addresses: removed_stake_addresses - .into_iter() - .map(Into::into) - .collect(), }; match self.registrations.entry(id) { diff --git a/catalyst-gateway/bin/src/rbac/mod.rs b/catalyst-gateway/bin/src/rbac/mod.rs index f86384174f0e..8293cdebff0e 100644 --- a/catalyst-gateway/bin/src/rbac/mod.rs +++ b/catalyst-gateway/bin/src/rbac/mod.rs @@ -4,11 +4,9 @@ mod chain_info; mod chains_cache; mod get_chain; mod indexing_context; -mod validation; -mod validation_result; +pub mod state; pub use chain_info::ChainInfo; +pub use chains_cache::cache_persistent_rbac_chain; pub use get_chain::latest_rbac_chain; pub use indexing_context::RbacBlockIndexingContext; -pub use validation::validate_rbac_registration; -pub use validation_result::{RbacValidationError, RbacValidationResult, RbacValidationSuccess}; diff --git a/catalyst-gateway/bin/src/rbac/state.rs b/catalyst-gateway/bin/src/rbac/state.rs new file mode 100644 index 000000000000..9381d70f35cf --- /dev/null +++ b/catalyst-gateway/bin/src/rbac/state.rs @@ -0,0 +1,245 @@ +//! An implementation of the `rbac_registration::cardano::state::RbacChainsState` trait + +use std::collections::{HashMap, HashSet}; + +use anyhow::Context; +use cardano_chain_follower::{StakeAddress, hashes::TransactionId}; +use catalyst_types::catalyst_id::CatalystId; +use ed25519_dalek::VerifyingKey; +use futures::StreamExt; + +use crate::{ + db::index::session::CassandraSession, + rbac::{ + RbacBlockIndexingContext, + chains_cache::cached_persistent_rbac_chain, + get_chain::{apply_regs, build_rbac_chain, persistent_rbac_chain}, + latest_rbac_chain, + }, +}; + +/// A helper struct to handle RBAC related state from the DB and caches. +/// Implements `rbac_registration::cardano::state::RbacChainsState` trait. +pub(crate) struct RbacChainsState<'a> { + /// `index-db` corresponding flag + is_persistent: bool, + /// `RbacBlockIndexingContext` reference + context: &'a RbacBlockIndexingContext, + /// Recorded modified registration chains by the `take_stake_address_from_chains` + /// method. During the `take_stake_address_from_chains` execution nothing is + /// written into the `index-db`, all these data would be written to the DB in a + /// batch. To consume this field call `consume` method + modified_chains: HashMap>, +} + +impl<'a> RbacChainsState<'a> { + /// Creates a new instance of `RbacChainsState` + pub fn new( + is_persistent: bool, + context: &'a RbacBlockIndexingContext, + ) -> Self { + Self { + is_persistent, + context, + modified_chains: HashMap::new(), + } + } + + /// Consumes `RbacChainsState` instance and returns recorded `modified_chains` field. + pub fn consume(self) -> HashMap> { + self.modified_chains + } + + /// Returns a Catalyst ID corresponding to the given transaction hash. + pub async fn catalyst_id_from_txn_id( + &self, + txn_id: TransactionId, + ) -> anyhow::Result> { + use crate::db::index::queries::rbac::get_catalyst_id_from_transaction_id::Query; + + // Check the context first. + if let Some(catalyst_id) = self.context.find_transaction(&txn_id) { + return Ok(Some(catalyst_id.to_owned())); + } + + // Then try to find in the persistent database. + let session = + CassandraSession::get(true).context("Failed to get Cassandra persistent session")?; + if let Some(id) = Query::get(&session, txn_id).await? { + return Ok(Some(id)); + } + + // Conditionally check the volatile database. + if !self.is_persistent { + let session = + CassandraSession::get(false).context("Failed to get Cassandra volatile session")?; + return Query::get(&session, txn_id).await; + } + + Ok(None) + } +} + +impl rbac_registration::cardano::state::RbacChainsState for RbacChainsState<'_> { + async fn chain( + &self, + id: &CatalystId, + ) -> anyhow::Result> { + let chain = if self.is_persistent { + persistent_rbac_chain(id).await? + } else { + latest_rbac_chain(id).await?.map(|i| i.chain) + }; + + // Apply additional registrations from context if any. + if let Some(regs) = self.context.find_registrations(id) { + let regs = regs.iter().cloned(); + match chain { + Some(c) => return apply_regs(c, regs).await.map(Some), + None => return build_rbac_chain(regs).await, + } + } + + Ok(chain) + } + + async fn is_chain_known( + &self, + id: &CatalystId, + ) -> anyhow::Result { + if self.context.find_registrations(id).is_some() { + return Ok(true); + } + + let session = + CassandraSession::get(true).context("Failed to get Cassandra persistent session")?; + + // We only cache persistent chains, so it is ok to check the cache regardless of the + // `is_persistent` parameter value. + if cached_persistent_rbac_chain(&session, id).is_some() { + return Ok(true); + } + + if is_cat_id_known(&session, id).await? { + return Ok(true); + } + + // Conditionally check the volatile database. + if !self.is_persistent { + let session = + CassandraSession::get(false).context("Failed to get Cassandra volatile session")?; + if is_cat_id_known(&session, id).await? { + return Ok(true); + } + } + + Ok(false) + } + + async fn is_stake_address_used( + &self, + address: &StakeAddress, + ) -> anyhow::Result { + catalyst_id_from_stake_address(address, self.is_persistent, self.context) + .await + .map(|v| v.is_some()) + } + + async fn chain_catalyst_id_from_signing_public_key( + &self, + key: &VerifyingKey, + ) -> anyhow::Result> { + use crate::db::index::queries::rbac::get_catalyst_id_from_public_key::Query; + + // Check the context first. + if let Some(catalyst_id) = self.context.find_public_key(key) { + return Ok(Some(catalyst_id.to_owned())); + } + + // Then try to find in the persistent database. + let session = + CassandraSession::get(true).context("Failed to get Cassandra persistent session")?; + if let Some(id) = Query::get(&session, *key).await? { + return Ok(Some(id)); + } + + // Conditionally check the volatile database. + if !self.is_persistent { + let session = + CassandraSession::get(false).context("Failed to get Cassandra volatile session")?; + return Query::get(&session, *key).await; + } + + Ok(None) + } + + async fn take_stake_address_from_chains( + &mut self, + addresses: I, + ) -> anyhow::Result<()> + where + I: IntoIterator + Send, + ::IntoIter: Send, + { + for addr in addresses { + if let Some(cat_id) = + catalyst_id_from_stake_address(&addr, self.is_persistent, self.context).await? + { + self.modified_chains + .entry(cat_id) + .and_modify(|e| { + e.insert(addr.clone()); + }) + .or_insert([addr].into_iter().collect()); + } + } + + Ok(()) + } +} + +/// Returns a Catalyst ID corresponding to the given stake address. +async fn catalyst_id_from_stake_address( + address: &StakeAddress, + is_persistent: bool, + context: &RbacBlockIndexingContext, +) -> anyhow::Result> { + use crate::db::index::queries::rbac::get_catalyst_id_from_stake_address::Query; + + // Check the context first. + if let Some(catalyst_id) = context.find_address(address) { + return Ok(Some(catalyst_id.to_owned())); + } + + // Then try to find in the persistent database. + let session = + CassandraSession::get(true).context("Failed to get Cassandra persistent session")?; + if let Some(id) = Query::latest(&session, address).await? { + return Ok(Some(id)); + } + + // Conditionally check the volatile database. + if !is_persistent { + let session = + CassandraSession::get(false).context("Failed to get Cassandra volatile session")?; + return Query::latest(&session, address).await; + } + + Ok(None) +} + +/// Returns `true` if there is at least one registration with the given Catalyst ID. +async fn is_cat_id_known( + session: &CassandraSession, + id: &CatalystId, +) -> anyhow::Result { + use crate::db::index::queries::rbac::get_rbac_registrations::{Query, QueryParams}; + + Ok(Query::execute(session, QueryParams { + catalyst_id: id.clone().into(), + }) + .await? + .next() + .await + .is_some()) +} diff --git a/catalyst-gateway/bin/src/rbac/validation.rs b/catalyst-gateway/bin/src/rbac/validation.rs deleted file mode 100644 index b2e714355335..000000000000 --- a/catalyst-gateway/bin/src/rbac/validation.rs +++ /dev/null @@ -1,461 +0,0 @@ -//! Utilities for RBAC registrations validation. - -use std::collections::{HashMap, HashSet}; - -use anyhow::{Context, Result}; -use cardano_chain_follower::{StakeAddress, hashes::TransactionId}; -use catalyst_types::{ - catalyst_id::{CatalystId, role_index::RoleId}, - problem_report::ProblemReport, -}; -use ed25519_dalek::VerifyingKey; -use futures::StreamExt; -use rbac_registration::{ - cardano::cip509::{Cip0134UriSet, Cip509}, - registration::cardano::RegistrationChain, -}; - -use crate::{ - db::index::{ - queries::rbac::get_catalyst_id_from_stake_address::cache_stake_address, - session::CassandraSession, - }, - rbac::{ - RbacBlockIndexingContext, RbacValidationError, RbacValidationResult, RbacValidationSuccess, - chains_cache::{cache_persistent_rbac_chain, cached_persistent_rbac_chain}, - get_chain::{apply_regs, build_rbac_chain, persistent_rbac_chain}, - latest_rbac_chain, - }, -}; - -/// Validates a new registration by either starting a new chain or adding it to the -/// existing one. -/// -/// In case of failure a problem report from the given registration is updated and -/// returned. -pub async fn validate_rbac_registration( - reg: Cip509, - is_persistent: bool, - context: &mut RbacBlockIndexingContext, -) -> RbacValidationResult { - match reg.previous_transaction() { - // `Box::pin` is used here because of the future size (`clippy::large_futures` lint). - Some(previous_txn) => { - Box::pin(update_chain(reg, previous_txn, is_persistent, context)).await - }, - None => Box::pin(start_new_chain(reg, is_persistent, context)).await, - } -} - -/// Tries to update an existing RBAC chain. -async fn update_chain( - reg: Cip509, - previous_txn: TransactionId, - is_persistent: bool, - context: &mut RbacBlockIndexingContext, -) -> RbacValidationResult { - let purpose = reg.purpose(); - let report = reg.report().to_owned(); - - // Find a chain this registration belongs to. - let Some(catalyst_id) = catalyst_id_from_txn_id(previous_txn, is_persistent, context).await? - else { - // We are unable to determine a Catalyst ID, so there is no sense to update the problem - // report because we would be unable to store this registration anyway. - return Err(RbacValidationError::UnknownCatalystId); - }; - let chain = chain(&catalyst_id, is_persistent, context).await? - .context("{catalyst_id} is present in 'catalyst_id_for_txn_id' table, but not in 'rbac_registration'")?; - - // Check that addresses from the new registration aren't used in other chains. - let previous_addresses = chain.stake_addresses(); - let reg_addresses = cip509_stake_addresses(®); - let new_addresses: Vec<_> = reg_addresses.difference(&previous_addresses).collect(); - for address in &new_addresses { - match catalyst_id_from_stake_address(address, is_persistent, context).await? { - None => { - // All good: the address wasn't used before. - }, - Some(_) => { - report.functional_validation( - &format!("{address} stake addresses is already used"), - "It isn't allowed to use same stake address in multiple registration chains", - ); - }, - } - } - - // Store values before consuming the registration. - let txn_id = reg.txn_hash(); - let stake_addresses = cip509_stake_addresses(®); - let origin = reg.origin().to_owned(); - - // Try to add a new registration to the chain. - let new_chain = chain.update(reg).ok_or_else(|| { - RbacValidationError::InvalidRegistration { - catalyst_id: catalyst_id.clone(), - purpose, - report: report.clone(), - } - })?; - - // Check that new public keys aren't used by other chains. - let public_keys = validate_public_keys(&new_chain, is_persistent, &report, context).await?; - - // Return an error if any issues were recorded in the report. - if report.is_problematic() { - return Err(RbacValidationError::InvalidRegistration { - catalyst_id, - purpose, - report, - }); - } - - // Everything is fine: update the context. - context.insert_transaction(txn_id, catalyst_id.clone()); - context.insert_addresses(stake_addresses.clone(), &catalyst_id); - context.insert_public_keys(public_keys.clone(), &catalyst_id); - context.insert_registration( - catalyst_id.clone(), - txn_id, - origin.point().slot_or_default(), - origin.txn_index(), - Some(previous_txn), - // Only a new chain can remove stake addresses from an existing one. - HashSet::new(), - ); - - if is_persistent { - cache_persistent_rbac_chain(catalyst_id.clone(), new_chain); - } - - Ok(RbacValidationSuccess { - catalyst_id, - stake_addresses, - public_keys, - // Only new chains can take ownership of stake addresses of existing chains, so in this case - // other chains aren't affected. - modified_chains: Vec::new(), - purpose, - }) -} - -/// Tries to start a new RBAC chain. -async fn start_new_chain( - reg: Cip509, - is_persistent: bool, - context: &mut RbacBlockIndexingContext, -) -> RbacValidationResult { - let catalyst_id = reg.catalyst_id().map(CatalystId::as_short_id); - let purpose = reg.purpose(); - let report = reg.report().to_owned(); - - // Try to start a new chain. - let new_chain = RegistrationChain::new(reg).ok_or_else(|| { - if let Some(catalyst_id) = catalyst_id { - RbacValidationError::InvalidRegistration { - catalyst_id, - purpose, - report: report.clone(), - } - } else { - RbacValidationError::UnknownCatalystId - } - })?; - - // Verify that a Catalyst ID of this chain is unique. - let catalyst_id = new_chain.catalyst_id().as_short_id(); - if is_chain_known(&catalyst_id, is_persistent, context).await? { - report.functional_validation( - &format!("{catalyst_id} is already used"), - "It isn't allowed to use same Catalyst ID (certificate subject public key) in multiple registration chains", - ); - return Err(RbacValidationError::InvalidRegistration { - catalyst_id, - purpose, - report, - }); - } - - // Validate stake addresses. - let new_addresses = new_chain.stake_addresses(); - let mut updated_chains: HashMap<_, HashSet> = HashMap::new(); - for address in &new_addresses { - if let Some(id) = catalyst_id_from_stake_address(address, is_persistent, context).await? { - // If an address is used in existing chain then a new chain must have different role 0 - // signing key. - let previous_chain = chain(&id, is_persistent, context) - .await? - .context("{id} is present in 'catalyst_id_for_stake_address', but not in 'rbac_registration'")?; - if previous_chain.get_latest_signing_pk_for_role(&RoleId::Role0) - == new_chain.get_latest_signing_pk_for_role(&RoleId::Role0) - { - report.functional_validation( - &format!("A new registration ({catalyst_id}) uses the same public key as the previous one ({})", - previous_chain.catalyst_id().as_short_id() - ), - "It is only allowed to override the existing chain by using different public key", - ); - } else { - // The new root registration "takes" an address(es) from the existing chain, so that - // chain needs to be updated. - updated_chains - .entry(id) - .and_modify(|e| { - e.insert(address.clone()); - }) - .or_insert([address.clone()].into_iter().collect()); - } - } - } - - // Check that new public keys aren't used by other chains. - let public_keys = validate_public_keys(&new_chain, is_persistent, &report, context).await?; - - if report.is_problematic() { - return Err(RbacValidationError::InvalidRegistration { - catalyst_id, - purpose, - report, - }); - } - - // Everything is fine: update the context. - context.insert_transaction(new_chain.current_tx_id_hash(), catalyst_id.clone()); - // This will also update the addresses that are already present in the context if they - // were reassigned to the new chain. - context.insert_addresses(new_addresses.clone(), &catalyst_id); - context.insert_public_keys(public_keys.clone(), &catalyst_id); - context.insert_registration( - catalyst_id.clone(), - new_chain.current_tx_id_hash(), - new_chain.current_point().slot_or_default(), - new_chain.current_txn_index(), - // No previous transaction for the root registration. - None, - // This chain has just been created, so no addresses have been removed from it. - HashSet::new(), - ); - - // This cache must be updated because these addresses previously belonged to other chains. - for (catalyst_id, addresses) in &updated_chains { - for address in addresses { - cache_stake_address(is_persistent, address.clone(), catalyst_id.clone()); - } - } - - Ok(RbacValidationSuccess { - catalyst_id, - stake_addresses: new_addresses, - public_keys, - modified_chains: updated_chains.into_iter().collect(), - purpose, - }) -} - -/// Returns a Catalyst ID corresponding to the given transaction hash. -async fn catalyst_id_from_txn_id( - txn_id: TransactionId, - is_persistent: bool, - context: &mut RbacBlockIndexingContext, -) -> Result> { - use crate::db::index::queries::rbac::get_catalyst_id_from_transaction_id::Query; - - // Check the context first. - if let Some(catalyst_id) = context.find_transaction(&txn_id) { - return Ok(Some(catalyst_id.to_owned())); - } - - // Then try to find in the persistent database. - let session = - CassandraSession::get(true).context("Failed to get Cassandra persistent session")?; - if let Some(id) = Query::get(&session, txn_id).await? { - return Ok(Some(id)); - } - - // Conditionally check the volatile database. - if !is_persistent { - let session = - CassandraSession::get(false).context("Failed to get Cassandra volatile session")?; - return Query::get(&session, txn_id).await; - } - - Ok(None) -} - -/// Returns either persistent or "latest" (persistent + volatile) registration chain for -/// the given Catalyst ID. -async fn chain( - id: &CatalystId, - is_persistent: bool, - context: &mut RbacBlockIndexingContext, -) -> Result> { - let chain = if is_persistent { - persistent_rbac_chain(id).await? - } else { - latest_rbac_chain(id).await?.map(|i| i.chain) - }; - - // Apply additional registrations from context if any. - if let Some(regs) = context.find_registrations(id) { - let regs = regs.iter().cloned(); - match chain { - Some(c) => apply_regs(c, regs).await.map(Some), - None => build_rbac_chain(regs).await, - } - } else { - Ok(chain) - } -} - -/// Returns a Catalyst ID corresponding to the given stake address. -async fn catalyst_id_from_stake_address( - address: &StakeAddress, - is_persistent: bool, - context: &mut RbacBlockIndexingContext, -) -> Result> { - use crate::db::index::queries::rbac::get_catalyst_id_from_stake_address::Query; - - // Check the context first. - if let Some(catalyst_id) = context.find_address(address) { - return Ok(Some(catalyst_id.to_owned())); - } - - // Then try to find in the persistent database. - let session = - CassandraSession::get(true).context("Failed to get Cassandra persistent session")?; - if let Some(id) = Query::latest(&session, address).await? { - return Ok(Some(id)); - } - - // Conditionally check the volatile database. - if !is_persistent { - let session = - CassandraSession::get(false).context("Failed to get Cassandra volatile session")?; - return Query::latest(&session, address).await; - } - - Ok(None) -} - -/// Checks that a new registration doesn't contain a signing key that was used by any -/// other chain. Returns a list of public keys in the registration. -async fn validate_public_keys( - chain: &RegistrationChain, - is_persistent: bool, - report: &ProblemReport, - context: &mut RbacBlockIndexingContext, -) -> Result> { - let mut keys = HashSet::new(); - - let roles: Vec<_> = chain.role_data_history().keys().collect(); - let catalyst_id = chain.catalyst_id().as_short_id(); - - for role in roles { - if let Some((key, _)) = chain.get_latest_signing_pk_for_role(role) { - keys.insert(key); - if let Some(previous) = catalyst_id_from_public_key(key, is_persistent, context).await? - && previous != catalyst_id - { - report.functional_validation( - &format!("An update to {catalyst_id} registration chain uses the same public key ({key:?}) as {previous} chain"), - "It isn't allowed to use role 0 signing (certificate subject public) key in different chains", - ); - } - } - } - - Ok(keys) -} - -/// Returns a Catalyst ID corresponding to the given public key. -async fn catalyst_id_from_public_key( - key: VerifyingKey, - is_persistent: bool, - context: &mut RbacBlockIndexingContext, -) -> Result> { - use crate::db::index::queries::rbac::get_catalyst_id_from_public_key::Query; - - // Check the context first. - if let Some(catalyst_id) = context.find_public_key(&key) { - return Ok(Some(catalyst_id.to_owned())); - } - - // Then try to find in the persistent database. - let session = - CassandraSession::get(true).context("Failed to get Cassandra persistent session")?; - if let Some(id) = Query::get(&session, key).await? { - return Ok(Some(id)); - } - - // Conditionally check the volatile database. - if !is_persistent { - let session = - CassandraSession::get(false).context("Failed to get Cassandra volatile session")?; - return Query::get(&session, key).await; - } - - Ok(None) -} - -/// Returns `true` if a chain with the given Catalyst ID already exists. -/// -/// This function behaves in the same way as `latest_rbac_chain(...).is_some()` but the -/// implementation is more optimized because we don't need to build the whole chain. -pub async fn is_chain_known( - id: &CatalystId, - is_persistent: bool, - context: &mut RbacBlockIndexingContext, -) -> Result { - if context.find_registrations(id).is_some() { - return Ok(true); - } - - let session = - CassandraSession::get(true).context("Failed to get Cassandra persistent session")?; - - // We only cache persistent chains, so it is ok to check the cache regardless of the - // `is_persistent` parameter value. - if cached_persistent_rbac_chain(&session, id).is_some() { - return Ok(true); - } - - if is_cat_id_known(&session, id).await? { - return Ok(true); - } - - // Conditionally check the volatile database. - if !is_persistent { - let session = - CassandraSession::get(false).context("Failed to get Cassandra volatile session")?; - if is_cat_id_known(&session, id).await? { - return Ok(true); - } - } - - Ok(false) -} - -/// Returns `true` if there is at least one registration with the given Catalyst ID. -async fn is_cat_id_known( - session: &CassandraSession, - id: &CatalystId, -) -> Result { - use crate::db::index::queries::rbac::get_rbac_registrations::{Query, QueryParams}; - - Ok(Query::execute(session, QueryParams { - catalyst_id: id.clone().into(), - }) - .await? - .next() - .await - .is_some()) -} - -/// Returns a set of stake addresses in the given registration. -fn cip509_stake_addresses(cip509: &Cip509) -> HashSet { - cip509 - .certificate_uris() - .map(Cip0134UriSet::stake_addresses) - .unwrap_or_default() -} diff --git a/catalyst-gateway/bin/src/rbac/validation_result.rs b/catalyst-gateway/bin/src/rbac/validation_result.rs deleted file mode 100644 index 18061b342566..000000000000 --- a/catalyst-gateway/bin/src/rbac/validation_result.rs +++ /dev/null @@ -1,64 +0,0 @@ -//! Types related to validation of new RBAC registrations. - -use std::collections::HashSet; - -use cardano_chain_follower::StakeAddress; -use catalyst_types::{catalyst_id::CatalystId, problem_report::ProblemReport, uuid::UuidV4}; -use ed25519_dalek::VerifyingKey; - -/// A return value of the `validate_rbac_registration` method. -pub type RbacValidationResult = Result; - -/// A value returned from the `validate_rbac_registration` on happy path. -/// -/// It contains information for updating `rbac_registration`, -/// `catalyst_id_for_public_key`, `catalyst_id_for_stake_address` and -/// `catalyst_id_for_txn_id` tables. -pub struct RbacValidationSuccess { - /// A Catalyst ID of the chain this registration belongs to. - pub catalyst_id: CatalystId, - /// A list of stake addresses that were added to the chain. - pub stake_addresses: HashSet, - /// A list of role public keys used in this registration. - pub public_keys: HashSet, - /// A list of updates to other chains containing Catalyst IDs and removed stake - /// addresses. - /// - /// A new RBAC registration can take ownership of stake addresses of other chains. - pub modified_chains: Vec<(CatalystId, HashSet)>, - /// A registration purpose. - pub purpose: Option, -} - -/// An error returned from the `validate_rbac_registration` method. -#[allow(clippy::large_enum_variant)] -pub enum RbacValidationError { - /// A registration is invalid (`report.is_problematic()` returns `true`). - /// - /// This variant is inserted to the `rbac_invalid_registration` table. - InvalidRegistration { - /// A Catalyst ID. - catalyst_id: CatalystId, - /// A registration purpose. - purpose: Option, - /// A problem report. - report: ProblemReport, - }, - /// Unable to determine a Catalyst ID of the registration. - /// - /// This can happen if a previous transaction ID in the registration is incorrect. - UnknownCatalystId, - /// A "fatal" error occurred during validation. - /// - /// This means that the validation wasn't performed properly (usually because of a - /// database failure) and we cannot process the given registration. This error is - /// propagated on a higher level, so there will be another attempt to index that - /// block. - Fatal(anyhow::Error), -} - -impl From for RbacValidationError { - fn from(e: anyhow::Error) -> Self { - RbacValidationError::Fatal(e) - } -} diff --git a/catalyst-gateway/bin/src/service/api/documents/common/mod.rs b/catalyst-gateway/bin/src/service/api/documents/common/mod.rs index 4ef98713f128..1d5cc8b5e549 100644 --- a/catalyst-gateway/bin/src/service/api/documents/common/mod.rs +++ b/catalyst-gateway/bin/src/service/api/documents/common/mod.rs @@ -230,7 +230,7 @@ impl VerifyingKeyProvider { let (kid_role_index, kid_rotation) = kid.role_and_rotation(); let (latest_pk, rotation) = reg_chain - .get_latest_signing_pk_for_role(&kid_role_index) + .get_latest_signing_public_key_for_role(kid_role_index) .ok_or_else(|| { anyhow::anyhow!( "Failed to get last signing key for the proposer role for {kid} Catalyst ID" diff --git a/catalyst-gateway/bin/src/service/common/auth/rbac/scheme.rs b/catalyst-gateway/bin/src/service/common/auth/rbac/scheme.rs index 95b802802f79..77008cf7d539 100644 --- a/catalyst-gateway/bin/src/service/common/auth/rbac/scheme.rs +++ b/catalyst-gateway/bin/src/service/common/auth/rbac/scheme.rs @@ -163,7 +163,7 @@ async fn checker_api_catalyst_auth( // Step 8: Get the latest stable signing certificate registered for Role 0. let (latest_pk, _) = reg_chain - .get_latest_signing_pk_for_role(&RoleId::Role0) + .get_latest_signing_public_key_for_role(RoleId::Role0) .ok_or_else(|| { debug!( "Unable to get last signing key for {} Catalyst ID",