Skip to content

Commit a4dd8b0

Browse files
committed
Merge branch 'main' into 2091-map-proposal-elements-for-testing
2 parents d1bea67 + a258eab commit a4dd8b0

File tree

91 files changed

+1786
-5959
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

91 files changed

+1786
-5959
lines changed

catalyst-gateway/bin/src/cardano/mod.rs

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use crate::{
2323
},
2424
session::CassandraSession,
2525
},
26+
service::utilities::health::{follower_has_first_reached_tip, set_follower_first_reached_tip},
2627
settings::{chain_follower, Settings},
2728
};
2829

@@ -31,7 +32,7 @@ pub(crate) mod event;
3132
pub(crate) mod util;
3233

3334
/// How long we wait between checks for connection to the indexing DB to be ready.
34-
const INDEXING_DB_READY_WAIT_INTERVAL: Duration = Duration::from_secs(1);
35+
pub(crate) const INDEXING_DB_READY_WAIT_INTERVAL: Duration = Duration::from_secs(1);
3536

3637
/// Start syncing a particular network
3738
async fn start_sync_for(cfg: &chain_follower::EnvVars) -> anyhow::Result<()> {
@@ -185,25 +186,23 @@ impl SyncParams {
185186
}
186187

187188
/// Convert Params into the result of the sync.
188-
fn done(
189+
pub(crate) fn done(
189190
&self, first: Option<Point>, first_immutable: bool, last: Option<Point>,
190191
last_immutable: bool, synced: u64, result: anyhow::Result<()>,
191192
) -> Self {
192-
if result.is_ok() && first_immutable && last_immutable {
193+
if result.is_ok() && self.end != Point::TIP {
193194
// Update sync status in the Immutable DB.
194195
// Can fire and forget, because failure to update DB will simply cause the chunk to be
195196
// re-indexed, on recovery.
196197
update_sync_status(self.end.slot_or_default(), self.start.slot_or_default());
197198
}
198-
199199
let mut done = self.clone();
200200
done.first_indexed_block = first;
201201
done.first_is_immutable = first_immutable;
202202
done.last_indexed_block = last;
203203
done.last_is_immutable = last_immutable;
204204
done.total_blocks_synced = done.total_blocks_synced.saturating_add(synced);
205205
done.last_blocks_synced = synced;
206-
207206
done.result = Arc::new(Some(result));
208207

209208
done
@@ -265,7 +264,7 @@ fn sync_subchain(
265264
// What we need to do here is tell the primary follower to start a new sync
266265
// for the new immutable data, and then purge the volatile database of the
267266
// old data (after the immutable data has synced).
268-
info!(chain=%params.chain, "Immutable chain rolled forward.");
267+
info!(chain=%params.chain, point=?chain_update.block_data().point(), "Immutable chain rolled forward.");
269268
let mut result = params.done(
270269
first_indexed_block,
271270
first_immutable,
@@ -275,9 +274,10 @@ fn sync_subchain(
275274
Ok(()),
276275
);
277276
// Signal the point the immutable chain rolled forward to.
277+
// If this is live chain immediately stops to later run immutable sync tasks
278278
result.follower_roll_forward = Some(chain_update.block_data().point());
279279
return result;
280-
};
280+
}
281281
},
282282
cardano_chain_follower::Kind::Block => {
283283
let block = chain_update.block_data();
@@ -295,6 +295,11 @@ fn sync_subchain(
295295
);
296296
}
297297

298+
// Update flag if this is the first time reaching TIP.
299+
if chain_update.tip && !follower_has_first_reached_tip() {
300+
set_follower_first_reached_tip();
301+
}
302+
298303
update_block_state(
299304
block,
300305
&mut first_indexed_block,
@@ -311,8 +316,9 @@ fn sync_subchain(
311316

312317
let purge_condition = PurgeCondition::PurgeForwards(rollback_slot);
313318
if let Err(error) = roll_forward::purge_live_index(purge_condition).await {
314-
error!(chain=%params.chain, error=%error, "Chain follower
315-
rollback, purging volatile data task failed.");
319+
error!(chain=%params.chain, error=%error,
320+
"Chain follower rollback, purging volatile data task failed."
321+
);
316322
} else {
317323
// How many slots are purged
318324
#[allow(clippy::arithmetic_side_effects)]
@@ -364,7 +370,7 @@ fn sync_subchain(
364370
Ok(()),
365371
);
366372

367-
info!(chain = %params.chain, result=%result, "Indexing Blockchain Completed: OK");
373+
info!(chain = %result.chain, result=%result, "Indexing Blockchain Completed: OK");
368374

369375
result
370376
})
@@ -435,6 +441,12 @@ impl SyncTask {
435441
/// Primary Chain Follower task.
436442
///
437443
/// This continuously runs in the background, and never terminates.
444+
///
445+
/// Sets the Index DB liveness flag to true if it is not already set.
446+
///
447+
/// Sets the Chain Follower Has First Reached Tip flag to true if it is not already
448+
/// set.
449+
#[allow(clippy::too_many_lines)]
438450
async fn run(&mut self) {
439451
// We can't sync until the local chain data is synced.
440452
// This call will wait until we sync.
@@ -453,7 +465,10 @@ impl SyncTask {
453465
// Wait for indexing DB to be ready before continuing.
454466
// We do this after the above, because other nodes may have finished already, and we don't
455467
// want to wait do any work they already completed while we were fetching the blockchain.
468+
//
469+
// After waiting, we set the liveness flag to true if it is not already set.
456470
drop(CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL, true).await);
471+
457472
info!(chain=%self.cfg.chain, "Indexing DB is ready - Getting recovery state");
458473
self.sync_status = get_sync_status().await;
459474
debug!(chain=%self.cfg.chain, "Sync Status: {:?}", self.sync_status);
@@ -539,7 +554,7 @@ impl SyncTask {
539554
},
540555
Err(error) => {
541556
error!(chain=%self.cfg.chain, report=%finished, error=%error,
542-
"An Immutable follower failed, restarting it.");
557+
"An Immutable follower failed, restarting it.");
543558
// Restart the Immutable Chain sync task again from where it left
544559
// off.
545560
self.sync_tasks.push(sync_subchain(
@@ -550,7 +565,7 @@ impl SyncTask {
550565
}
551566
} else {
552567
error!(chain=%self.cfg.chain, report=%finished,
553-
"BUG: The Immutable follower completed, but without a proper result.");
568+
"BUG: The Immutable follower completed, but without a proper result.");
554569
}
555570
},
556571
Err(error) => {

catalyst-gateway/bin/src/cli.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
//! CLI interpreter for the service
2-
use std::{io::Write, path::PathBuf};
2+
use std::{io::Write, path::PathBuf, time::Duration};
33

44
use clap::Parser;
55
use tracing::{error, info};
@@ -8,8 +8,10 @@ use crate::{
88
cardano::start_followers,
99
db::{self, index::session::CassandraSession},
1010
service::{
11-
self, started,
12-
utilities::health::{is_live, live_counter_reset},
11+
self,
12+
utilities::health::{
13+
condition_for_started, is_live, live_counter_reset, service_has_started, set_to_started,
14+
},
1315
},
1416
settings::{ServiceSettings, Settings},
1517
};
@@ -49,13 +51,15 @@ impl Cli {
4951

5052
info!("Catalyst Gateway - Starting");
5153

52-
// Start the DB's
54+
// Start the DB's.
5355
CassandraSession::init();
56+
5457
db::event::establish_connection();
5558

5659
// Start the chain indexing follower.
5760
start_followers().await?;
5861

62+
// Start the API service.
5963
let handle = tokio::spawn(async move {
6064
match service::run().await {
6165
Ok(()) => info!("Endpoints started ok"),
@@ -66,6 +70,7 @@ impl Cli {
6670
});
6771
tasks.push(handle);
6872

73+
// Start task to reset the service 'live counter' at a regular interval.
6974
let handle = tokio::spawn(async move {
7075
while is_live() {
7176
tokio::time::sleep(Settings::service_live_timeout_interval()).await;
@@ -74,8 +79,18 @@ impl Cli {
7479
});
7580
tasks.push(handle);
7681

77-
started();
82+
// Start task to wait for the service 'started' flag to be `true`.
83+
let handle = tokio::spawn(async move {
84+
while !service_has_started() {
85+
tokio::time::sleep(Duration::from_secs(1)).await;
86+
if condition_for_started() {
87+
set_to_started();
88+
}
89+
}
90+
});
91+
tasks.push(handle);
7892

93+
// Run all asynchronous tasks to completion.
7994
for task in tasks {
8095
task.await?;
8196
}

catalyst-gateway/bin/src/db/event/mod.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use futures::{Stream, StreamExt, TryStreamExt};
1414
use tokio_postgres::{types::ToSql, NoTls, Row};
1515
use tracing::{debug, debug_span, error, Instrument};
1616

17-
use crate::settings::Settings;
17+
use crate::{service::utilities::health::set_event_db_liveness, settings::Settings};
1818

1919
pub(crate) mod common;
2020
pub(crate) mod config;
@@ -160,6 +160,11 @@ impl EventDB {
160160
Ok(())
161161
}
162162

163+
/// Checks that connection to `EventDB` is available.
164+
pub(crate) fn connection_is_ok() -> bool {
165+
EVENT_DB_POOL.get().is_some()
166+
}
167+
163168
/// Prepend `EXPLAIN ANALYZE` to the query, and rollback the transaction.
164169
async fn explain_analyze_rollback(
165170
stmt: &str, params: &[&(dyn ToSql + Sync)],
@@ -239,6 +244,8 @@ impl EventDB {
239244
///
240245
/// The env var "`DATABASE_URL`" can be set directly as an anv var, or in a
241246
/// `.env` file.
247+
///
248+
/// If connection to the pool is `OK`, the `LIVE_EVENT_DB` atomic flag is set to `true`.
242249
pub fn establish_connection() {
243250
let (url, user, pass, max_connections, max_lifetime, min_idle, connection_timeout) =
244251
Settings::event_db_settings();
@@ -267,5 +274,7 @@ pub fn establish_connection() {
267274

268275
if EVENT_DB_POOL.set(Arc::new(pool)).is_err() {
269276
error!("Failed to set event db pool. Called Twice?");
277+
} else {
278+
set_event_db_liveness(true);
270279
}
271280
}

catalyst-gateway/bin/src/db/index/block/certs.rs

Lines changed: 36 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::{
1919
settings::cassandra_db,
2020
};
2121

22-
/// Insert TXI Query and Parameters
22+
/// Insert stake registration query
2323
#[derive(SerializeRow)]
2424
pub(crate) struct StakeRegistrationInsertQuery {
2525
/// Stake address (29 bytes).
@@ -29,23 +29,22 @@ pub(crate) struct StakeRegistrationInsertQuery {
2929
/// Transaction Index.
3030
txn_index: DbTxnIndex,
3131
/// Full Stake Public Key (32 byte Ed25519 Public key, not hashed).
32-
stake_public_key: MaybeUnset<DbPublicKey>,
32+
stake_public_key: DbPublicKey,
3333
/// Is the stake address a script or not.
3434
script: bool,
35-
/// Is the Certificate Registered?
35+
/// Is the Cardano Certificate Registered
3636
register: MaybeUnset<bool>,
37-
/// Is the Certificate Deregistered?
37+
/// Is the Cardano Certificate Deregistered
3838
deregister: MaybeUnset<bool>,
39+
/// Is the stake address contains CIP36 registration?
40+
cip36: MaybeUnset<bool>,
3941
/// Pool Delegation Address
4042
pool_delegation: MaybeUnset<Vec<u8>>,
4143
}
4244

4345
impl Debug for StakeRegistrationInsertQuery {
4446
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
45-
let stake_public_key = match self.stake_public_key {
46-
MaybeUnset::Unset => "UNSET",
47-
MaybeUnset::Set(ref v) => &hex::encode(v.as_ref()),
48-
};
47+
let stake_public_key = hex::encode(self.stake_public_key.as_ref());
4948
let register = match self.register {
5049
MaybeUnset::Unset => "UNSET",
5150
MaybeUnset::Set(v) => &format!("{v:?}"),
@@ -54,6 +53,10 @@ impl Debug for StakeRegistrationInsertQuery {
5453
MaybeUnset::Unset => "UNSET",
5554
MaybeUnset::Set(v) => &format!("{v:?}"),
5655
};
56+
let cip36 = match self.cip36 {
57+
MaybeUnset::Unset => "UNSET",
58+
MaybeUnset::Set(v) => &format!("{v:?}"),
59+
};
5760
let pool_delegation = match self.pool_delegation {
5861
MaybeUnset::Unset => "UNSET",
5962
MaybeUnset::Set(ref v) => &hex::encode(v),
@@ -67,29 +70,28 @@ impl Debug for StakeRegistrationInsertQuery {
6770
.field("script", &self.script)
6871
.field("register", &register)
6972
.field("deregister", &deregister)
73+
.field("cip36", &cip36)
7074
.field("pool_delegation", &pool_delegation)
7175
.finish()
7276
}
7377
}
7478

75-
/// TXI by Txn hash Index
79+
/// Insert stake registration
7680
const INSERT_STAKE_REGISTRATION_QUERY: &str = include_str!("./cql/insert_stake_registration.cql");
7781

7882
impl StakeRegistrationInsertQuery {
7983
/// Create a new Insert Query.
80-
#[allow(clippy::too_many_arguments)]
84+
#[allow(clippy::too_many_arguments, clippy::fn_params_excessive_bools)]
8185
pub fn new(
8286
stake_address: StakeAddress, slot_no: Slot, txn_index: TxnIndex,
83-
stake_public_key: Option<VerifyingKey>, script: bool, register: bool, deregister: bool,
84-
pool_delegation: Option<Vec<u8>>,
87+
stake_public_key: VerifyingKey, script: bool, register: bool, deregister: bool,
88+
cip36: bool, pool_delegation: Option<Vec<u8>>,
8589
) -> Self {
86-
let stake_public_key =
87-
stake_public_key.map_or(MaybeUnset::Unset, |a| MaybeUnset::Set(a.into()));
8890
StakeRegistrationInsertQuery {
8991
stake_address: stake_address.into(),
9092
slot_no: slot_no.into(),
9193
txn_index: txn_index.into(),
92-
stake_public_key,
94+
stake_public_key: stake_public_key.into(),
9395
script,
9496
register: if register {
9597
MaybeUnset::Set(true)
@@ -101,6 +103,11 @@ impl StakeRegistrationInsertQuery {
101103
} else {
102104
MaybeUnset::Unset
103105
},
106+
cip36: if cip36 {
107+
MaybeUnset::Set(true)
108+
} else {
109+
MaybeUnset::Unset
110+
},
104111
pool_delegation: if let Some(pool_delegation) = pool_delegation {
105112
MaybeUnset::Set(pool_delegation)
106113
} else {
@@ -109,7 +116,7 @@ impl StakeRegistrationInsertQuery {
109116
}
110117
}
111118

112-
/// Prepare Batch of Insert TXI Index Data Queries
119+
/// Prepare Batch of Insert stake registration.
113120
pub(crate) async fn prepare_batch(
114121
session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
115122
) -> anyhow::Result<SizedBatch> {
@@ -184,16 +191,19 @@ impl CertInsertQuery {
184191
}
185192

186193
// This may not be witnessed, its normal but disappointing.
187-
self.stake_reg_data.push(StakeRegistrationInsertQuery::new(
188-
stake_address,
189-
slot_no,
190-
txn,
191-
pubkey,
192-
script,
193-
register,
194-
deregister,
195-
delegation,
196-
));
194+
if let Some(pubkey) = pubkey {
195+
self.stake_reg_data.push(StakeRegistrationInsertQuery::new(
196+
stake_address,
197+
slot_no,
198+
txn,
199+
pubkey,
200+
script,
201+
register,
202+
deregister,
203+
false,
204+
delegation,
205+
));
206+
}
197207
}
198208

199209
/// Index an Alonzo Era certificate into the database.

0 commit comments

Comments
 (0)