Skip to content

Commit efd993b

Browse files
authored
fix(cat-gateway): Fixing indexing issues and chain follower synchronisation (#2100)
* update cardano sync code * wip * wip * wip * wip * wip * wip * wip * wip * wip * fix fmt * change schema version * wip * wip * wip * wip
1 parent 4b54678 commit efd993b

File tree

12 files changed

+146
-81
lines changed

12 files changed

+146
-81
lines changed

catalyst-gateway/bin/Cargo.toml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@ repository.workspace = true
1515
workspace = true
1616

1717
[dependencies]
18-
cardano-chain-follower = { version = "0.0.8", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "r20250317-00" }
19-
rbac-registration = { version = "0.0.4", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "r20250317-00" }
20-
catalyst-types = { version = "0.0.3", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "r20250317-00" }
21-
cardano-blockchain-types = { version = "0.0.3", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "r20250317-00" }
22-
catalyst-signed-doc = { version = "0.0.4", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "r20250317-00" }
23-
c509-certificate = { version = "0.0.3", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "r20250317-00" }
18+
cardano-chain-follower = { version = "0.0.8", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "r20250325-00" }
19+
rbac-registration = { version = "0.0.4", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "r20250325-00" }
20+
catalyst-types = { version = "0.0.3", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "r20250325-00" }
21+
cardano-blockchain-types = { version = "0.0.3", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "r20250325-00" }
22+
catalyst-signed-doc = { version = "0.0.4", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "r20250325-00" }
23+
c509-certificate = { version = "0.0.3", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "r20250325-00" }
2424

2525
pallas = { version = "0.30.1", git = "https://github.com/input-output-hk/catalyst-pallas.git", rev = "9b5183c8b90b90fe2cc319d986e933e9518957b3" }
2626
pallas-traverse = { version = "0.30.1", git = "https://github.com/input-output-hk/catalyst-pallas.git", rev = "9b5183c8b90b90fe2cc319d986e933e9518957b3" }

catalyst-gateway/bin/src/cardano/mod.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -185,25 +185,23 @@ impl SyncParams {
185185
}
186186

187187
/// Convert Params into the result of the sync.
188-
fn done(
188+
pub(crate) fn done(
189189
&self, first: Option<Point>, first_immutable: bool, last: Option<Point>,
190190
last_immutable: bool, synced: u64, result: anyhow::Result<()>,
191191
) -> Self {
192-
if result.is_ok() && first_immutable && last_immutable {
192+
if result.is_ok() && self.end != Point::TIP {
193193
// Update sync status in the Immutable DB.
194194
// Can fire and forget, because failure to update DB will simply cause the chunk to be
195195
// re-indexed, on recovery.
196196
update_sync_status(self.end.slot_or_default(), self.start.slot_or_default());
197197
}
198-
199198
let mut done = self.clone();
200199
done.first_indexed_block = first;
201200
done.first_is_immutable = first_immutable;
202201
done.last_indexed_block = last;
203202
done.last_is_immutable = last_immutable;
204203
done.total_blocks_synced = done.total_blocks_synced.saturating_add(synced);
205204
done.last_blocks_synced = synced;
206-
207205
done.result = Arc::new(Some(result));
208206

209207
done
@@ -265,7 +263,7 @@ fn sync_subchain(
265263
// What we need to do here is tell the primary follower to start a new sync
266264
// for the new immutable data, and then purge the volatile database of the
267265
// old data (after the immutable data has synced).
268-
info!(chain=%params.chain, "Immutable chain rolled forward.");
266+
info!(chain=%params.chain, point=?chain_update.block_data().point(), "Immutable chain rolled forward.");
269267
let mut result = params.done(
270268
first_indexed_block,
271269
first_immutable,
@@ -275,9 +273,10 @@ fn sync_subchain(
275273
Ok(()),
276274
);
277275
// Signal the point the immutable chain rolled forward to.
276+
// If this is live chain immediately stops to later run immutable sync tasks
278277
result.follower_roll_forward = Some(chain_update.block_data().point());
279278
return result;
280-
};
279+
}
281280
},
282281
cardano_chain_follower::Kind::Block => {
283282
let block = chain_update.block_data();
@@ -311,8 +310,9 @@ fn sync_subchain(
311310

312311
let purge_condition = PurgeCondition::PurgeForwards(rollback_slot);
313312
if let Err(error) = roll_forward::purge_live_index(purge_condition).await {
314-
error!(chain=%params.chain, error=%error, "Chain follower
315-
rollback, purging volatile data task failed.");
313+
error!(chain=%params.chain, error=%error,
314+
"Chain follower rollback, purging volatile data task failed."
315+
);
316316
} else {
317317
// How many slots are purged
318318
#[allow(clippy::arithmetic_side_effects)]
@@ -364,7 +364,7 @@ fn sync_subchain(
364364
Ok(()),
365365
);
366366

367-
info!(chain = %params.chain, result=%result, "Indexing Blockchain Completed: OK");
367+
info!(chain = %result.chain, result=%result, "Indexing Blockchain Completed: OK");
368368

369369
result
370370
})
@@ -539,7 +539,7 @@ impl SyncTask {
539539
},
540540
Err(error) => {
541541
error!(chain=%self.cfg.chain, report=%finished, error=%error,
542-
"An Immutable follower failed, restarting it.");
542+
"An Immutable follower failed, restarting it.");
543543
// Restart the Immutable Chain sync task again from where it left
544544
// off.
545545
self.sync_tasks.push(sync_subchain(
@@ -550,7 +550,7 @@ impl SyncTask {
550550
}
551551
} else {
552552
error!(chain=%self.cfg.chain, report=%finished,
553-
"BUG: The Immutable follower completed, but without a proper result.");
553+
"BUG: The Immutable follower completed, but without a proper result.");
554554
}
555555
},
556556
Err(error) => {

catalyst-gateway/bin/src/db/index/block/certs.rs

Lines changed: 36 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::{
1919
settings::cassandra_db,
2020
};
2121

22-
/// Insert TXI Query and Parameters
22+
/// Insert stake registration query
2323
#[derive(SerializeRow)]
2424
pub(crate) struct StakeRegistrationInsertQuery {
2525
/// Stake address (29 bytes).
@@ -29,23 +29,22 @@ pub(crate) struct StakeRegistrationInsertQuery {
2929
/// Transaction Index.
3030
txn_index: DbTxnIndex,
3131
/// Full Stake Public Key (32 byte Ed25519 Public key, not hashed).
32-
stake_public_key: MaybeUnset<DbPublicKey>,
32+
stake_public_key: DbPublicKey,
3333
/// Is the stake address a script or not.
3434
script: bool,
35-
/// Is the Certificate Registered?
35+
/// Is the Cardano Certificate Registered
3636
register: MaybeUnset<bool>,
37-
/// Is the Certificate Deregistered?
37+
/// Is the Cardano Certificate Deregistered
3838
deregister: MaybeUnset<bool>,
39+
/// Is the stake address contains CIP36 registration?
40+
cip36: MaybeUnset<bool>,
3941
/// Pool Delegation Address
4042
pool_delegation: MaybeUnset<Vec<u8>>,
4143
}
4244

4345
impl Debug for StakeRegistrationInsertQuery {
4446
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
45-
let stake_public_key = match self.stake_public_key {
46-
MaybeUnset::Unset => "UNSET",
47-
MaybeUnset::Set(ref v) => &hex::encode(v.as_ref()),
48-
};
47+
let stake_public_key = hex::encode(self.stake_public_key.as_ref());
4948
let register = match self.register {
5049
MaybeUnset::Unset => "UNSET",
5150
MaybeUnset::Set(v) => &format!("{v:?}"),
@@ -54,6 +53,10 @@ impl Debug for StakeRegistrationInsertQuery {
5453
MaybeUnset::Unset => "UNSET",
5554
MaybeUnset::Set(v) => &format!("{v:?}"),
5655
};
56+
let cip36 = match self.cip36 {
57+
MaybeUnset::Unset => "UNSET",
58+
MaybeUnset::Set(v) => &format!("{v:?}"),
59+
};
5760
let pool_delegation = match self.pool_delegation {
5861
MaybeUnset::Unset => "UNSET",
5962
MaybeUnset::Set(ref v) => &hex::encode(v),
@@ -67,29 +70,28 @@ impl Debug for StakeRegistrationInsertQuery {
6770
.field("script", &self.script)
6871
.field("register", &register)
6972
.field("deregister", &deregister)
73+
.field("cip36", &cip36)
7074
.field("pool_delegation", &pool_delegation)
7175
.finish()
7276
}
7377
}
7478

75-
/// TXI by Txn hash Index
79+
/// Insert stake registration
7680
const INSERT_STAKE_REGISTRATION_QUERY: &str = include_str!("./cql/insert_stake_registration.cql");
7781

7882
impl StakeRegistrationInsertQuery {
7983
/// Create a new Insert Query.
80-
#[allow(clippy::too_many_arguments)]
84+
#[allow(clippy::too_many_arguments, clippy::fn_params_excessive_bools)]
8185
pub fn new(
8286
stake_address: StakeAddress, slot_no: Slot, txn_index: TxnIndex,
83-
stake_public_key: Option<VerifyingKey>, script: bool, register: bool, deregister: bool,
84-
pool_delegation: Option<Vec<u8>>,
87+
stake_public_key: VerifyingKey, script: bool, register: bool, deregister: bool,
88+
cip36: bool, pool_delegation: Option<Vec<u8>>,
8589
) -> Self {
86-
let stake_public_key =
87-
stake_public_key.map_or(MaybeUnset::Unset, |a| MaybeUnset::Set(a.into()));
8890
StakeRegistrationInsertQuery {
8991
stake_address: stake_address.into(),
9092
slot_no: slot_no.into(),
9193
txn_index: txn_index.into(),
92-
stake_public_key,
94+
stake_public_key: stake_public_key.into(),
9395
script,
9496
register: if register {
9597
MaybeUnset::Set(true)
@@ -101,6 +103,11 @@ impl StakeRegistrationInsertQuery {
101103
} else {
102104
MaybeUnset::Unset
103105
},
106+
cip36: if cip36 {
107+
MaybeUnset::Set(true)
108+
} else {
109+
MaybeUnset::Unset
110+
},
104111
pool_delegation: if let Some(pool_delegation) = pool_delegation {
105112
MaybeUnset::Set(pool_delegation)
106113
} else {
@@ -109,7 +116,7 @@ impl StakeRegistrationInsertQuery {
109116
}
110117
}
111118

112-
/// Prepare Batch of Insert TXI Index Data Queries
119+
/// Prepare Batch of Insert stake registration.
113120
pub(crate) async fn prepare_batch(
114121
session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
115122
) -> anyhow::Result<SizedBatch> {
@@ -184,16 +191,19 @@ impl CertInsertQuery {
184191
}
185192

186193
// This may not be witnessed, its normal but disappointing.
187-
self.stake_reg_data.push(StakeRegistrationInsertQuery::new(
188-
stake_address,
189-
slot_no,
190-
txn,
191-
pubkey,
192-
script,
193-
register,
194-
deregister,
195-
delegation,
196-
));
194+
if let Some(pubkey) = pubkey {
195+
self.stake_reg_data.push(StakeRegistrationInsertQuery::new(
196+
stake_address,
197+
slot_no,
198+
txn,
199+
pubkey,
200+
script,
201+
register,
202+
deregister,
203+
false,
204+
delegation,
205+
));
206+
}
197207
}
198208

199209
/// Index an Alonzo Era certificate into the database.

0 commit comments

Comments
 (0)