Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion catalyst-gateway/bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ workspace = true

[dependencies]
cardano-chain-follower = { version = "0.0.17", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "cardano-chain-follower/v0.0.17" }
rbac-registration = { version = "0.0.13", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "rbac-registration/v0.0.13" }
rbac-registration = { version = "0.0.13", git = "https://github.com/input-output-hk/catalyst-libs.git", rev = "95af33a685341bcb69993677c361253f2b9c404e" }
catalyst-signed-doc = { version = "0.0.9", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "catalyst-signed-doc/v0.0.9" }
catalyst-signed-doc-v1 = { package = "catalyst-signed-doc", version = "0.0.4", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "catalyst-signed-doc/v.0.0.4" }
c509-certificate = { version = "0.0.3", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "c509-certificate-v0.0.3" }
Expand Down
103 changes: 81 additions & 22 deletions catalyst-gateway/bin/src/db/index/block/rbac509/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,24 @@ use std::{

use anyhow::{Context, Result};
use cardano_chain_follower::{hashes::TransactionId, MultiEraBlock, Slot, TxnIndex};
use rbac_registration::cardano::cip509::Cip509;
use rbac_registration::{
cardano::cip509::Cip509,
registration::cardano::{RbacValidationError, RegistrationChain},
};
use scylla::client::session::Session;
use tokio::sync::watch;
use tracing::{debug, error};

use crate::{
db::index::{
queries::{FallibleQueryTasks, PreparedQuery, SizedBatch},
queries::{
rbac::get_catalyst_id_from_stake_address::cache_stake_address, FallibleQueryTasks,
PreparedQuery, SizedBatch,
},
session::CassandraSession,
},
metrics::caches::rbac::{inc_index_sync, inc_invalid_rbac_reg_count},
rbac::{
validate_rbac_registration, RbacBlockIndexingContext, RbacValidationError,
RbacValidationSuccess,
},
rbac::{cache_persistent_rbac_chain, RbacBlockIndexingContext},
settings::cassandra_db::EnvVars,
};

Expand Down Expand Up @@ -123,24 +126,80 @@ impl Rbac509InsertQuery {
wait_for_previous_blocks(pending_blocks, our_end, block.slot()).await?;

let previous_transaction = cip509.previous_transaction();
// `Box::pin` is used here because of the future size (`clippy::large_futures` lint).
match Box::pin(validate_rbac_registration(
cip509,
block.is_immutable(),
context,
))
.await
{

// Before it is consumed
let txn_id = cip509.txn_hash();
let origin = cip509.origin().clone();
let purpose = cip509.purpose();
let previous_txn = cip509.previous_transaction();

context.set_persistent(block.is_immutable());

match RegistrationChain::update_from_previous_txn(cip509, context).await {
// Write updates to the database. There can be multiple updates in one registration
// because a new chain can take ownership of stake addresses of the existing chains and
// in that case we want to record changes to all those chains as well as the new one.
Ok(RbacValidationSuccess {
catalyst_id,
stake_addresses,
public_keys,
modified_chains,
purpose,
}) => {
Ok((chain, modified_chains)) => {
let catalyst_id = chain.catalyst_id();
let stake_addresses = chain.stake_addresses();
let public_keys: HashSet<_> = chain
.role_data_history()
.keys()
.filter_map(|role| {
chain
.get_latest_signing_pk_for_role(role)
.map(|(key, _)| key)
})
.collect();

if let Some(previous_txn) = previous_txn {
context.insert_transaction(txn_id, catalyst_id.clone());
context.insert_addresses(stake_addresses.clone(), catalyst_id);
context.insert_public_keys(public_keys.clone(), catalyst_id);
context.insert_registration(
catalyst_id.clone(),
txn_id,
origin.point().slot_or_default(),
origin.txn_index(),
Some(previous_txn),
// Only a new chain can remove stake addresses from an existing one.
HashSet::new(),
);

if block.is_immutable() {
cache_persistent_rbac_chain(catalyst_id.clone(), chain.clone());
}
} else {
context.insert_transaction(chain.current_tx_id_hash(), catalyst_id.clone());
// This will also update the addresses that are already present in the context
// if they were reassigned to the new chain.
context.insert_addresses(stake_addresses.clone(), catalyst_id);
context.insert_public_keys(public_keys.clone(), catalyst_id);
context.insert_registration(
catalyst_id.clone(),
chain.current_tx_id_hash(),
chain.current_point().slot_or_default(),
chain.current_txn_index(),
// No previous transaction for the root registration.
None,
// This chain has just been created, so no addresses have been removed from
// it.
HashSet::new(),
);

// This cache must be updated because these addresses previously belonged to
// other chains.
for (catalyst_id, addresses) in &modified_chains {
for address in addresses {
cache_stake_address(
block.is_immutable(),
address.clone(),
catalyst_id.clone(),
);
}
}
}

// Record the transaction identifier (hash) of a new registration.
self.catalyst_id_for_txn_id
.push(insert_catalyst_id_for_txn_id::Params::new(
Expand Down Expand Up @@ -171,7 +230,7 @@ impl Rbac509InsertQuery {
}
// Update the chain this registration belongs to.
self.registrations.push(insert_rbac509::Params::new(
catalyst_id,
catalyst_id.clone(),
txn_hash,
slot,
index,
Expand Down
182 changes: 181 additions & 1 deletion catalyst-gateway/bin/src/rbac/indexing_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,23 @@

use std::collections::{HashMap, HashSet};

use anyhow::Context;
use cardano_chain_follower::{hashes::TransactionId, Slot, StakeAddress, TxnIndex};
use catalyst_types::catalyst_id::CatalystId;
use ed25519_dalek::VerifyingKey;
use futures::StreamExt;
use rbac_registration::providers::RbacRegistrationProvider;

use crate::db::index::queries::rbac::get_rbac_registrations::Query as RbacQuery;
use crate::{
db::index::{
queries::rbac::get_rbac_registrations::Query as RbacQuery, session::CassandraSession,
},
rbac::{
chains_cache::cached_persistent_rbac_chain,
get_chain::{apply_regs, build_rbac_chain, persistent_rbac_chain},
latest_rbac_chain,
},
};

/// A RBAC context used during indexing.
///
Expand All @@ -28,6 +40,8 @@ pub struct RbacBlockIndexingContext {
/// A map containing pending data that will be written in the `rbac_registration`
/// table.
registrations: HashMap<CatalystId, Vec<RbacQuery>>,
/// Indicates whether the provider should fetch data from persistent storage.
is_persistent: bool,
}

impl RbacBlockIndexingContext {
Expand All @@ -43,6 +57,7 @@ impl RbacBlockIndexingContext {
addresses,
public_keys,
registrations,
is_persistent: false,
}
}

Expand Down Expand Up @@ -159,4 +174,169 @@ impl RbacBlockIndexingContext {
) -> Option<&[RbacQuery]> {
self.registrations.get(id).map(Vec::as_slice)
}

/// Sets `is_persistent`.
pub fn set_persistent(
&mut self,
value: bool,
) {
self.is_persistent = value;
}
}

impl RbacRegistrationProvider for RbacBlockIndexingContext {
async fn chain(
&self,
id: CatalystId,
) -> anyhow::Result<Option<rbac_registration::registration::cardano::RegistrationChain>> {
let chain = if self.is_persistent {
persistent_rbac_chain(&id).await?
} else {
latest_rbac_chain(&id).await?.map(|i| i.chain)
};

// Apply additional registrations from context if any.
if let Some(regs) = self.find_registrations(&id) {
let regs = regs.iter().cloned();
match chain {
Some(c) => apply_regs(c, regs).await.map(Some),
None => build_rbac_chain(regs).await,
}
} else {
Ok(chain)
}
}

async fn catalyst_id_from_txn_id(
&self,
txn_id: TransactionId,
) -> anyhow::Result<Option<CatalystId>> {
use crate::db::index::queries::rbac::get_catalyst_id_from_transaction_id::Query;

// Check the context first.
if let Some(catalyst_id) = self.find_transaction(&txn_id) {
return Ok(Some(catalyst_id.to_owned()));
}

// Then try to find in the persistent database.
let session =
CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
if let Some(id) = Query::get(&session, txn_id).await? {
return Ok(Some(id));
}

// Conditionally check the volatile database.
if !self.is_persistent {
let session =
CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
return Query::get(&session, txn_id).await;
}

Ok(None)
}

async fn catalyst_id_from_stake_address(
&self,
address: &StakeAddress,
) -> anyhow::Result<Option<CatalystId>> {
use crate::db::index::queries::rbac::get_catalyst_id_from_stake_address::Query;

// Check the context first.
if let Some(catalyst_id) = self.find_address(address) {
return Ok(Some(catalyst_id.to_owned()));
}

// Then try to find in the persistent database.
let session =
CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
if let Some(id) = Query::latest(&session, address).await? {
return Ok(Some(id));
}

// Conditionally check the volatile database.
if !self.is_persistent {
let session =
CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
return Query::latest(&session, address).await;
}

Ok(None)
}

async fn catalyst_id_from_public_key(
&self,
key: VerifyingKey,
) -> anyhow::Result<Option<CatalystId>> {
use crate::db::index::queries::rbac::get_catalyst_id_from_public_key::Query;

// Check the context first.
if let Some(catalyst_id) = self.find_public_key(&key) {
return Ok(Some(catalyst_id.to_owned()));
}

// Then try to find in the persistent database.
let session =
CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
if let Some(id) = Query::get(&session, key).await? {
return Ok(Some(id));
}

// Conditionally check the volatile database.
if !self.is_persistent {
let session =
CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
return Query::get(&session, key).await;
}

Ok(None)
}

async fn is_chain_known(
&self,
id: CatalystId,
) -> anyhow::Result<bool> {
if self.find_registrations(&id).is_some() {
return Ok(true);
}

let session =
CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;

// We only cache persistent chains, so it is ok to check the cache regardless of the
// `is_persistent` parameter value.
if cached_persistent_rbac_chain(&session, &id).is_some() {
return Ok(true);
}

if is_cat_id_known(&session, &id).await? {
return Ok(true);
}

// Conditionally check the volatile database.
if !self.is_persistent {
let session =
CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
if is_cat_id_known(&session, &id).await? {
return Ok(true);
}
}

Ok(false)
}
}

/// Returns `true` if there is at least one registration with the given Catalyst ID.
async fn is_cat_id_known(
session: &CassandraSession,
id: &CatalystId,
) -> anyhow::Result<bool> {
use crate::db::index::queries::rbac::get_rbac_registrations::{Query, QueryParams};

Ok(Query::execute(session, QueryParams {
catalyst_id: id.clone().into(),
})
.await?
.next()
.await
.is_some())
}
5 changes: 1 addition & 4 deletions catalyst-gateway/bin/src/rbac/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@ mod chain_info;
mod chains_cache;
mod get_chain;
mod indexing_context;
mod validation;
mod validation_result;

pub use chain_info::ChainInfo;
pub use chains_cache::cache_persistent_rbac_chain;
pub use get_chain::latest_rbac_chain;
pub use indexing_context::RbacBlockIndexingContext;
pub use validation::validate_rbac_registration;
pub use validation_result::{RbacValidationError, RbacValidationResult, RbacValidationSuccess};
Loading