Skip to content
Draft
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion catalyst-gateway/bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ workspace = true

[dependencies]
cardano-chain-follower = { version = "0.0.15", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "cardano-chain-follower/v0.0.15" }
rbac-registration = { version = "0.0.11", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "rbac-registration/v0.0.11" }
rbac-registration = { version = "0.0.11", git = "https://github.com/input-output-hk/catalyst-libs.git", rev = "be739fb6a928c6ec29f91ed5c74e08c2c4f24c98" }
catalyst-signed-doc = { version = "0.0.8", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "catalyst-signed-doc/v0.0.8" }
catalyst-signed-doc-v1 = { package = "catalyst-signed-doc", version = "0.0.4", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "catalyst-signed-doc/v.0.0.4" }
c509-certificate = { version = "0.0.3", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "c509-certificate-v0.0.3" }
Expand Down
123 changes: 102 additions & 21 deletions catalyst-gateway/bin/src/db/index/block/rbac509/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,24 @@ use std::{

use anyhow::{Context, Result};
use cardano_chain_follower::{hashes::TransactionId, MultiEraBlock, Slot, TxnIndex};
use rbac_registration::cardano::cip509::Cip509;
use rbac_registration::{
cardano::cip509::Cip509,
registration::cardano::validation::{start_new_chain, update_chain, RbacValidationError},
};
use scylla::client::session::Session;
use tokio::sync::watch;
use tracing::{debug, error};

use crate::{
db::index::{
queries::{FallibleQueryTasks, PreparedQuery, SizedBatch},
queries::{
rbac::get_catalyst_id_from_stake_address::cache_stake_address, FallibleQueryTasks,
PreparedQuery, SizedBatch,
},
session::CassandraSession,
},
metrics::caches::rbac::{inc_index_sync, inc_invalid_rbac_reg_count},
rbac::{
validate_rbac_registration, RbacBlockIndexingContext, RbacValidationError,
RbacValidationSuccess,
},
rbac::{cache_persistent_rbac_chain, RbacBlockIndexingContext},
settings::cassandra_db::EnvVars,
};

Expand Down Expand Up @@ -123,24 +126,102 @@ impl Rbac509InsertQuery {
wait_for_previous_blocks(pending_blocks, our_end, block.slot()).await?;

let previous_transaction = cip509.previous_transaction();

// `Box::pin` is used here because of the future size (`clippy::large_futures` lint).
match Box::pin(validate_rbac_registration(
cip509,
block.is_immutable(),
context,
))
.await
{
let result = if let Some(previous_txn) = cip509.previous_transaction() {
let result = Box::pin(update_chain(
cip509,
previous_txn,
block.is_immutable(),
context,
))
.await;

// Everything is fine: update the context.
if let Ok((new_chain, reg)) = &result {
let catalyst_id = reg
.catalyst_id()
.context("Cip509 error: cannot read Catalyst ID")?;
let txn_id = reg.txn_hash();
let stake_addresses = reg.stake_addresses();
let public_keys = reg.public_keys();
let origin = reg.origin();

context.insert_transaction(txn_id, catalyst_id.clone());
context.insert_addresses(stake_addresses.clone(), catalyst_id);
context.insert_public_keys(public_keys.clone(), catalyst_id);
context.insert_registration(
catalyst_id.clone(),
txn_id,
origin.point().slot_or_default(),
origin.txn_index(),
Some(previous_txn),
// Only a new chain can remove stake addresses from an existing one.
HashSet::new(),
);

if block.is_immutable() {
cache_persistent_rbac_chain(catalyst_id.clone(), new_chain.clone());
}
}

result
} else {
let result = Box::pin(start_new_chain(cip509, block.is_immutable(), context)).await;

// Everything is fine: update the context.
if let Ok((new_chain, reg)) = &result {
let catalyst_id = reg
.catalyst_id()
.context("Cip509 error: cannot read Catalyst ID")?;
let public_keys = reg.public_keys();
let new_addresses = new_chain.stake_addresses();

context.insert_transaction(new_chain.current_tx_id_hash(), catalyst_id.clone());
// This will also update the addresses that are already present in the context if
// they were reassigned to the new chain.
context.insert_addresses(new_addresses.clone(), catalyst_id);
context.insert_public_keys(public_keys.clone(), catalyst_id);
context.insert_registration(
catalyst_id.clone(),
new_chain.current_tx_id_hash(),
new_chain.current_point().slot_or_default(),
new_chain.current_txn_index(),
// No previous transaction for the root registration.
None,
// This chain has just been created, so no addresses have been removed from it.
HashSet::new(),
);

// This cache must be updated because these addresses previously belonged to other
// chains.
for (catalyst_id, addresses) in reg.modified_chains() {
for address in addresses {
cache_stake_address(
block.is_immutable(),
address.clone(),
catalyst_id.clone(),
);
}
}
}

result
};

match result {
// Write updates to the database. There can be multiple updates in one registration
// because a new chain can take ownership of stake addresses of the existing chains and
// in that case we want to record changes to all those chains as well as the new one.
Ok(RbacValidationSuccess {
catalyst_id,
stake_addresses,
public_keys,
modified_chains,
purpose,
}) => {
Ok((_, cip509)) => {
let catalyst_id = cip509
.catalyst_id()
.context("Cip509 error: cannot read Catalyst ID")?;
let stake_addresses = cip509.stake_addresses().clone();
let public_keys = cip509.public_keys().clone();
let modified_chains = cip509.modified_chains().clone();
let purpose = cip509.purpose();

// Record the transaction identifier (hash) of a new registration.
self.catalyst_id_for_txn_id
.push(insert_catalyst_id_for_txn_id::Params::new(
Expand Down Expand Up @@ -171,7 +252,7 @@ impl Rbac509InsertQuery {
}
// Update the chain this registration belongs to.
self.registrations.push(insert_rbac509::Params::new(
catalyst_id,
catalyst_id.clone(),
txn_hash,
slot,
index,
Expand Down
176 changes: 175 additions & 1 deletion catalyst-gateway/bin/src/rbac/indexing_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,23 @@

use std::collections::{HashMap, HashSet};

use anyhow::Context;
use cardano_chain_follower::{hashes::TransactionId, Slot, StakeAddress, TxnIndex};
use catalyst_types::catalyst_id::CatalystId;
use ed25519_dalek::VerifyingKey;
use futures::StreamExt;
use rbac_registration::providers::RbacRegistrationProvider;

use crate::db::index::queries::rbac::get_rbac_registrations::Query as RbacQuery;
use crate::{
db::index::{
queries::rbac::get_rbac_registrations::Query as RbacQuery, session::CassandraSession,
},
rbac::{
chains_cache::cached_persistent_rbac_chain,
get_chain::{apply_regs, build_rbac_chain, persistent_rbac_chain},
latest_rbac_chain,
},
};

/// A RBAC context used during indexing.
///
Expand Down Expand Up @@ -160,3 +172,165 @@ impl RbacBlockIndexingContext {
self.registrations.get(id).map(Vec::as_slice)
}
}

impl RbacRegistrationProvider for RbacBlockIndexingContext {
async fn chain(
&self,
id: CatalystId,
is_persistent: bool,
) -> anyhow::Result<Option<rbac_registration::registration::cardano::RegistrationChain>> {
let chain = if is_persistent {
persistent_rbac_chain(&id).await?
} else {
latest_rbac_chain(&id).await?.map(|i| i.chain)
};

// Apply additional registrations from context if any.
if let Some(regs) = self.find_registrations(&id) {
let regs = regs.iter().cloned();
match chain {
Some(c) => apply_regs(c, regs).await.map(Some),
None => build_rbac_chain(regs).await,
}
} else {
Ok(chain)
}
}

async fn catalyst_id_from_txn_id(
&self,
txn_id: TransactionId,
is_persistent: bool,
) -> anyhow::Result<Option<CatalystId>> {
use crate::db::index::queries::rbac::get_catalyst_id_from_transaction_id::Query;

// Check the context first.
if let Some(catalyst_id) = self.find_transaction(&txn_id) {
return Ok(Some(catalyst_id.to_owned()));
}

// Then try to find in the persistent database.
let session =
CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
if let Some(id) = Query::get(&session, txn_id).await? {
return Ok(Some(id));
}

// Conditionally check the volatile database.
if !is_persistent {
let session =
CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
return Query::get(&session, txn_id).await;
}

Ok(None)
}

async fn catalyst_id_from_stake_address(
&self,
address: &StakeAddress,
is_persistent: bool,
) -> anyhow::Result<Option<CatalystId>> {
use crate::db::index::queries::rbac::get_catalyst_id_from_stake_address::Query;

// Check the context first.
if let Some(catalyst_id) = self.find_address(address) {
return Ok(Some(catalyst_id.to_owned()));
}

// Then try to find in the persistent database.
let session =
CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
if let Some(id) = Query::latest(&session, address).await? {
return Ok(Some(id));
}

// Conditionally check the volatile database.
if !is_persistent {
let session =
CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
return Query::latest(&session, address).await;
}

Ok(None)
}

async fn catalyst_id_from_public_key(
&self,
key: VerifyingKey,
is_persistent: bool,
) -> anyhow::Result<Option<CatalystId>> {
use crate::db::index::queries::rbac::get_catalyst_id_from_public_key::Query;

// Check the context first.
if let Some(catalyst_id) = self.find_public_key(&key) {
return Ok(Some(catalyst_id.to_owned()));
}

// Then try to find in the persistent database.
let session =
CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
if let Some(id) = Query::get(&session, key).await? {
return Ok(Some(id));
}

// Conditionally check the volatile database.
if !is_persistent {
let session =
CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
return Query::get(&session, key).await;
}

Ok(None)
}

async fn is_chain_known(
&self,
id: CatalystId,
is_persistent: bool,
) -> anyhow::Result<bool> {
if self.find_registrations(&id).is_some() {
return Ok(true);
}

let session =
CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;

// We only cache persistent chains, so it is ok to check the cache regardless of the
// `is_persistent` parameter value.
if cached_persistent_rbac_chain(&session, &id).is_some() {
return Ok(true);
}

if is_cat_id_known(&session, &id).await? {
return Ok(true);
}

// Conditionally check the volatile database.
if !is_persistent {
let session =
CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
if is_cat_id_known(&session, &id).await? {
return Ok(true);
}
}

Ok(false)
}
}

/// Returns `true` if there is at least one registration with the given Catalyst ID.
async fn is_cat_id_known(
session: &CassandraSession,
id: &CatalystId,
) -> anyhow::Result<bool> {
use crate::db::index::queries::rbac::get_rbac_registrations::{Query, QueryParams};

Ok(Query::execute(session, QueryParams {
catalyst_id: id.clone().into(),
})
.await?
.next()
.await
.is_some())
}
5 changes: 1 addition & 4 deletions catalyst-gateway/bin/src/rbac/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@ mod chain_info;
mod chains_cache;
mod get_chain;
mod indexing_context;
mod validation;
mod validation_result;

pub use chain_info::ChainInfo;
pub use chains_cache::cache_persistent_rbac_chain;
pub use get_chain::latest_rbac_chain;
pub use indexing_context::RbacBlockIndexingContext;
pub use validation::validate_rbac_registration;
pub use validation_result::{RbacValidationError, RbacValidationResult, RbacValidationSuccess};
Loading
Loading