Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 27 additions & 12 deletions catalyst-gateway/bin/src/cardano/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::{
},
session::CassandraSession,
},
service::utilities::health::{follower_has_first_reached_tip, set_follower_first_reached_tip},
settings::{chain_follower, Settings},
};

Expand All @@ -31,7 +32,7 @@ pub(crate) mod event;
pub(crate) mod util;

/// How long we wait between checks for connection to the indexing DB to be ready.
const INDEXING_DB_READY_WAIT_INTERVAL: Duration = Duration::from_secs(1);
pub(crate) const INDEXING_DB_READY_WAIT_INTERVAL: Duration = Duration::from_secs(1);

/// Start syncing a particular network
async fn start_sync_for(cfg: &chain_follower::EnvVars) -> anyhow::Result<()> {
Expand Down Expand Up @@ -185,25 +186,23 @@ impl SyncParams {
}

/// Convert Params into the result of the sync.
fn done(
pub(crate) fn done(
&self, first: Option<Point>, first_immutable: bool, last: Option<Point>,
last_immutable: bool, synced: u64, result: anyhow::Result<()>,
) -> Self {
if result.is_ok() && first_immutable && last_immutable {
if result.is_ok() && self.end != Point::TIP {
// Update sync status in the Immutable DB.
// Can fire and forget, because failure to update DB will simply cause the chunk to be
// re-indexed, on recovery.
update_sync_status(self.end.slot_or_default(), self.start.slot_or_default());
}

let mut done = self.clone();
done.first_indexed_block = first;
done.first_is_immutable = first_immutable;
done.last_indexed_block = last;
done.last_is_immutable = last_immutable;
done.total_blocks_synced = done.total_blocks_synced.saturating_add(synced);
done.last_blocks_synced = synced;

done.result = Arc::new(Some(result));

done
Expand Down Expand Up @@ -265,7 +264,7 @@ fn sync_subchain(
// What we need to do here is tell the primary follower to start a new sync
// for the new immutable data, and then purge the volatile database of the
// old data (after the immutable data has synced).
info!(chain=%params.chain, "Immutable chain rolled forward.");
info!(chain=%params.chain, point=?chain_update.block_data().point(), "Immutable chain rolled forward.");
let mut result = params.done(
first_indexed_block,
first_immutable,
Expand All @@ -275,9 +274,10 @@ fn sync_subchain(
Ok(()),
);
// Signal the point the immutable chain rolled forward to.
// If this is live chain immediately stops to later run immutable sync tasks
result.follower_roll_forward = Some(chain_update.block_data().point());
return result;
};
}
},
cardano_chain_follower::Kind::Block => {
let block = chain_update.block_data();
Expand All @@ -295,6 +295,11 @@ fn sync_subchain(
);
}

// Update flag if this is the first time reaching TIP.
if chain_update.tip && !follower_has_first_reached_tip() {
set_follower_first_reached_tip();
}

update_block_state(
block,
&mut first_indexed_block,
Expand All @@ -311,8 +316,9 @@ fn sync_subchain(

let purge_condition = PurgeCondition::PurgeForwards(rollback_slot);
if let Err(error) = roll_forward::purge_live_index(purge_condition).await {
error!(chain=%params.chain, error=%error, "Chain follower
rollback, purging volatile data task failed.");
error!(chain=%params.chain, error=%error,
"Chain follower rollback, purging volatile data task failed."
);
} else {
// How many slots are purged
#[allow(clippy::arithmetic_side_effects)]
Expand Down Expand Up @@ -364,7 +370,7 @@ fn sync_subchain(
Ok(()),
);

info!(chain = %params.chain, result=%result, "Indexing Blockchain Completed: OK");
info!(chain = %result.chain, result=%result, "Indexing Blockchain Completed: OK");

result
})
Expand Down Expand Up @@ -435,6 +441,12 @@ impl SyncTask {
/// Primary Chain Follower task.
///
/// This continuously runs in the background, and never terminates.
///
/// Sets the Index DB liveness flag to true if it is not already set.
///
/// Sets the Chain Follower Has First Reached Tip flag to true if it is not already
/// set.
#[allow(clippy::too_many_lines)]
async fn run(&mut self) {
// We can't sync until the local chain data is synced.
// This call will wait until we sync.
Expand All @@ -453,7 +465,10 @@ impl SyncTask {
// Wait for indexing DB to be ready before continuing.
// We do this after the above, because other nodes may have finished already, and we don't
// want to wait do any work they already completed while we were fetching the blockchain.
//
// After waiting, we set the liveness flag to true if it is not already set.
drop(CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL, true).await);

info!(chain=%self.cfg.chain, "Indexing DB is ready - Getting recovery state");
self.sync_status = get_sync_status().await;
debug!(chain=%self.cfg.chain, "Sync Status: {:?}", self.sync_status);
Expand Down Expand Up @@ -539,7 +554,7 @@ impl SyncTask {
},
Err(error) => {
error!(chain=%self.cfg.chain, report=%finished, error=%error,
"An Immutable follower failed, restarting it.");
"An Immutable follower failed, restarting it.");
// Restart the Immutable Chain sync task again from where it left
// off.
self.sync_tasks.push(sync_subchain(
Expand All @@ -550,7 +565,7 @@ impl SyncTask {
}
} else {
error!(chain=%self.cfg.chain, report=%finished,
"BUG: The Immutable follower completed, but without a proper result.");
"BUG: The Immutable follower completed, but without a proper result.");
}
},
Err(error) => {
Expand Down
25 changes: 20 additions & 5 deletions catalyst-gateway/bin/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! CLI interpreter for the service
use std::{io::Write, path::PathBuf};
use std::{io::Write, path::PathBuf, time::Duration};

use clap::Parser;
use tracing::{error, info};
Expand All @@ -8,8 +8,10 @@ use crate::{
cardano::start_followers,
db::{self, index::session::CassandraSession},
service::{
self, started,
utilities::health::{is_live, live_counter_reset},
self,
utilities::health::{
condition_for_started, is_live, live_counter_reset, service_has_started, set_to_started,
},
},
settings::{ServiceSettings, Settings},
};
Expand Down Expand Up @@ -49,13 +51,15 @@ impl Cli {

info!("Catalyst Gateway - Starting");

// Start the DB's
// Start the DB's.
CassandraSession::init();

db::event::establish_connection();

// Start the chain indexing follower.
start_followers().await?;

// Start the API service.
let handle = tokio::spawn(async move {
match service::run().await {
Ok(()) => info!("Endpoints started ok"),
Expand All @@ -66,6 +70,7 @@ impl Cli {
});
tasks.push(handle);

// Start task to reset the service 'live counter' at a regular interval.
let handle = tokio::spawn(async move {
while is_live() {
tokio::time::sleep(Settings::service_live_timeout_interval()).await;
Expand All @@ -74,8 +79,18 @@ impl Cli {
});
tasks.push(handle);

started();
// Start task to wait for the service 'started' flag to be `true`.
let handle = tokio::spawn(async move {
while !service_has_started() {
tokio::time::sleep(Duration::from_secs(1)).await;
if condition_for_started() {
set_to_started();
}
}
});
tasks.push(handle);

// Run all asynchronous tasks to completion.
for task in tasks {
task.await?;
}
Expand Down
11 changes: 10 additions & 1 deletion catalyst-gateway/bin/src/db/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use futures::{Stream, StreamExt, TryStreamExt};
use tokio_postgres::{types::ToSql, NoTls, Row};
use tracing::{debug, debug_span, error, Instrument};

use crate::settings::Settings;
use crate::{service::utilities::health::set_event_db_liveness, settings::Settings};

pub(crate) mod common;
pub(crate) mod config;
Expand Down Expand Up @@ -160,6 +160,11 @@ impl EventDB {
Ok(())
}

/// Checks that connection to `EventDB` is available.
pub(crate) fn connection_is_ok() -> bool {
EVENT_DB_POOL.get().is_some()
}

/// Prepend `EXPLAIN ANALYZE` to the query, and rollback the transaction.
async fn explain_analyze_rollback(
stmt: &str, params: &[&(dyn ToSql + Sync)],
Expand Down Expand Up @@ -239,6 +244,8 @@ impl EventDB {
///
/// The env var "`DATABASE_URL`" can be set directly as an anv var, or in a
/// `.env` file.
///
/// If connection to the pool is `OK`, the `LIVE_EVENT_DB` atomic flag is set to `true`.
pub fn establish_connection() {
let (url, user, pass, max_connections, max_lifetime, min_idle, connection_timeout) =
Settings::event_db_settings();
Expand Down Expand Up @@ -267,5 +274,7 @@ pub fn establish_connection() {

if EVENT_DB_POOL.set(Arc::new(pool)).is_err() {
error!("Failed to set event db pool. Called Twice?");
} else {
set_event_db_liveness(true);
}
}
62 changes: 36 additions & 26 deletions catalyst-gateway/bin/src/db/index/block/certs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
settings::cassandra_db,
};

/// Insert TXI Query and Parameters
/// Insert stake registration query
#[derive(SerializeRow)]
pub(crate) struct StakeRegistrationInsertQuery {
/// Stake address (29 bytes).
Expand All @@ -29,23 +29,22 @@ pub(crate) struct StakeRegistrationInsertQuery {
/// Transaction Index.
txn_index: DbTxnIndex,
/// Full Stake Public Key (32 byte Ed25519 Public key, not hashed).
stake_public_key: MaybeUnset<DbPublicKey>,
stake_public_key: DbPublicKey,
/// Is the stake address a script or not.
script: bool,
/// Is the Certificate Registered?
/// Is the Cardano Certificate Registered
register: MaybeUnset<bool>,
/// Is the Certificate Deregistered?
/// Is the Cardano Certificate Deregistered
deregister: MaybeUnset<bool>,
/// Is the stake address contains CIP36 registration?
cip36: MaybeUnset<bool>,
/// Pool Delegation Address
pool_delegation: MaybeUnset<Vec<u8>>,
}

impl Debug for StakeRegistrationInsertQuery {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
let stake_public_key = match self.stake_public_key {
MaybeUnset::Unset => "UNSET",
MaybeUnset::Set(ref v) => &hex::encode(v.as_ref()),
};
let stake_public_key = hex::encode(self.stake_public_key.as_ref());
let register = match self.register {
MaybeUnset::Unset => "UNSET",
MaybeUnset::Set(v) => &format!("{v:?}"),
Expand All @@ -54,6 +53,10 @@ impl Debug for StakeRegistrationInsertQuery {
MaybeUnset::Unset => "UNSET",
MaybeUnset::Set(v) => &format!("{v:?}"),
};
let cip36 = match self.cip36 {
MaybeUnset::Unset => "UNSET",
MaybeUnset::Set(v) => &format!("{v:?}"),
};
let pool_delegation = match self.pool_delegation {
MaybeUnset::Unset => "UNSET",
MaybeUnset::Set(ref v) => &hex::encode(v),
Expand All @@ -67,29 +70,28 @@ impl Debug for StakeRegistrationInsertQuery {
.field("script", &self.script)
.field("register", &register)
.field("deregister", &deregister)
.field("cip36", &cip36)
.field("pool_delegation", &pool_delegation)
.finish()
}
}

/// TXI by Txn hash Index
/// Insert stake registration
const INSERT_STAKE_REGISTRATION_QUERY: &str = include_str!("./cql/insert_stake_registration.cql");

impl StakeRegistrationInsertQuery {
/// Create a new Insert Query.
#[allow(clippy::too_many_arguments)]
#[allow(clippy::too_many_arguments, clippy::fn_params_excessive_bools)]
pub fn new(
stake_address: StakeAddress, slot_no: Slot, txn_index: TxnIndex,
stake_public_key: Option<VerifyingKey>, script: bool, register: bool, deregister: bool,
pool_delegation: Option<Vec<u8>>,
stake_public_key: VerifyingKey, script: bool, register: bool, deregister: bool,
cip36: bool, pool_delegation: Option<Vec<u8>>,
) -> Self {
let stake_public_key =
stake_public_key.map_or(MaybeUnset::Unset, |a| MaybeUnset::Set(a.into()));
StakeRegistrationInsertQuery {
stake_address: stake_address.into(),
slot_no: slot_no.into(),
txn_index: txn_index.into(),
stake_public_key,
stake_public_key: stake_public_key.into(),
script,
register: if register {
MaybeUnset::Set(true)
Expand All @@ -101,6 +103,11 @@ impl StakeRegistrationInsertQuery {
} else {
MaybeUnset::Unset
},
cip36: if cip36 {
MaybeUnset::Set(true)
} else {
MaybeUnset::Unset
},
pool_delegation: if let Some(pool_delegation) = pool_delegation {
MaybeUnset::Set(pool_delegation)
} else {
Expand All @@ -109,7 +116,7 @@ impl StakeRegistrationInsertQuery {
}
}

/// Prepare Batch of Insert TXI Index Data Queries
/// Prepare Batch of Insert stake registration.
pub(crate) async fn prepare_batch(
session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
) -> anyhow::Result<SizedBatch> {
Expand Down Expand Up @@ -184,16 +191,19 @@ impl CertInsertQuery {
}

// This may not be witnessed, its normal but disappointing.
self.stake_reg_data.push(StakeRegistrationInsertQuery::new(
stake_address,
slot_no,
txn,
pubkey,
script,
register,
deregister,
delegation,
));
if let Some(pubkey) = pubkey {
self.stake_reg_data.push(StakeRegistrationInsertQuery::new(
stake_address,
slot_no,
txn,
pubkey,
script,
register,
deregister,
false,
delegation,
));
}
}

/// Index an Alonzo Era certificate into the database.
Expand Down
Loading
Loading