Skip to content

Commit bc4bfff

Browse files
committed
fix(cardano-chain-follower): use Network, MultiEraBlock, Point from cardano-blockchain-types
Signed-off-by: bkioshn <[email protected]>
1 parent 1440d0b commit bc4bfff

20 files changed

+114
-1853
lines changed

rust/cardano-chain-follower/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ mithril-client = { version = "0.10.4", default-features = false, features = [
2121
] }
2222

2323
rbac-registration = { version = "0.0.2", git = "https://github.com/input-output-hk/catalyst-libs.git", tag = "v0.0.8" }
24+
cardano-blockchain-types = { git = "https://github.com/input-output-hk/catalyst-libs.git", branch = "feat/cardano-blockchain-types" }
2425

2526
thiserror = "1.0.64"
2627
tokio = { version = "1.40.0", features = [

rust/cardano-chain-follower/src/chain_sync.rs

Lines changed: 28 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use std::time::Duration;
77

88
use anyhow::Context;
9+
use cardano_blockchain_types::{Fork, MultiEraBlock, Network, Point};
910
use pallas::{
1011
ledger::traverse::MultiEraHeader,
1112
network::{
@@ -32,8 +33,7 @@ use crate::{
3233
error::{Error, Result},
3334
mithril_snapshot_config::MithrilUpdateMessage,
3435
mithril_snapshot_data::latest_mithril_snapshot_id,
35-
point::{TIP_POINT, UNKNOWN_POINT},
36-
stats, ChainSyncConfig, MultiEraBlock, Network, Point, ORIGIN_POINT,
36+
stats, ChainSyncConfig,
3737
};
3838

3939
/// The maximum number of seconds we wait for a node to connect.
@@ -88,7 +88,7 @@ async fn retry_connect(
8888

8989
/// Purge the live chain, and intersect with TIP.
9090
async fn purge_and_intersect_tip(client: &mut PeerClient, chain: Network) -> Result<Point> {
91-
if let Err(error) = purge_live_chain(chain, &TIP_POINT) {
91+
if let Err(error) = purge_live_chain(chain, &Point::TIP) {
9292
// Shouldn't happen.
9393
error!("failed to purge live chain: {error}");
9494
}
@@ -122,16 +122,16 @@ async fn resync_live_tip(client: &mut PeerClient, chain: Network) -> Result<Poin
122122

123123
/// Fetch a single block from the Peer, and Decode it.
124124
async fn fetch_block_from_peer(
125-
peer: &mut PeerClient, chain: Network, point: Point, previous_point: Point, fork_count: u64,
125+
peer: &mut PeerClient, chain: Network, point: Point, previous_point: Point, fork: Fork,
126126
) -> anyhow::Result<MultiEraBlock> {
127127
let block_data = peer
128128
.blockfetch()
129129
.fetch_single(point.clone().into())
130130
.await
131131
.with_context(|| "Fetching block data")?;
132132

133-
debug!("{chain}, {previous_point}, {fork_count}");
134-
let live_block_data = MultiEraBlock::new(chain, block_data, &previous_point, fork_count)?;
133+
debug!("{chain}, {previous_point}, {fork:?}");
134+
let live_block_data = MultiEraBlock::new(chain, block_data, &previous_point, fork)?;
135135

136136
Ok(live_block_data)
137137
}
@@ -141,16 +141,16 @@ async fn fetch_block_from_peer(
141141
/// Fetch the rollback block, and try and insert it into the live-chain.
142142
/// If its a real rollback, it will purge the chain ahead of the block automatically.
143143
async fn process_rollback_actual(
144-
peer: &mut PeerClient, chain: Network, point: Point, tip: &Tip, fork_count: &mut u64,
144+
peer: &mut PeerClient, chain: Network, point: Point, tip: &Tip, fork: &mut Fork,
145145
) -> anyhow::Result<Point> {
146146
debug!("RollBackward: {:?} {:?}", point, tip);
147147

148148
// Check if the block is in the live chain, if it is, re-add it, which auto-purges the
149149
// rest of live chain tip. And increments the fork count.
150150
if let Some(mut block) = get_live_block(chain, &point, 0, true) {
151151
// Even though we are re-adding the known block, increase the fork count.
152-
block.set_fork(*fork_count);
153-
live_chain_add_block_to_tip(chain, block, fork_count, tip.0.clone().into())?;
152+
block.set_fork(*fork);
153+
live_chain_add_block_to_tip(chain, block, fork, tip.0.clone().into())?;
154154
return Ok(point);
155155
}
156156

@@ -165,7 +165,7 @@ async fn process_rollback_actual(
165165
let previous_point = if let Some(previous_block) = previous_block {
166166
let previous = previous_block.previous();
167167
debug!("Previous block: {:?}", previous);
168-
if previous == ORIGIN_POINT {
168+
if previous == Point::ORIGIN {
169169
latest_mithril_snapshot_id(chain).tip()
170170
} else {
171171
previous
@@ -175,9 +175,8 @@ async fn process_rollback_actual(
175175
latest_mithril_snapshot_id(chain).tip()
176176
};
177177
debug!("Previous point: {:?}", previous_point);
178-
let block =
179-
fetch_block_from_peer(peer, chain, point.clone(), previous_point, *fork_count).await?;
180-
live_chain_add_block_to_tip(chain, block, fork_count, tip.0.clone().into())?;
178+
let block = fetch_block_from_peer(peer, chain, point.clone(), previous_point, *fork).await?;
179+
live_chain_add_block_to_tip(chain, block, fork, tip.0.clone().into())?;
181180

182181
// Next block we receive is a rollback.
183182
Ok(point)
@@ -186,7 +185,7 @@ async fn process_rollback_actual(
186185
/// Process a rollback detected from the peer.
187186
async fn process_rollback(
188187
peer: &mut PeerClient, chain: Network, point: Point, tip: &Tip, previous_point: &Point,
189-
fork_count: &mut u64,
188+
fork: &mut Fork,
190189
) -> anyhow::Result<Point> {
191190
let rollback_slot = point.slot_or_default();
192191
let head_slot = previous_point.slot_or_default();
@@ -199,7 +198,7 @@ async fn process_rollback(
199198
};
200199

201200
// We actually do the work here...
202-
let response = process_rollback_actual(peer, chain, point, tip, fork_count).await?;
201+
let response = process_rollback_actual(peer, chain, point, tip, fork).await?;
203202

204203
// We never really know how many blocks are rolled back when advised by the peer, but we
205204
// can work out how many slots. This function wraps the real work, so we can properly
@@ -212,7 +211,7 @@ async fn process_rollback(
212211
/// Process a rollback detected from the peer.
213212
async fn process_next_block(
214213
peer: &mut PeerClient, chain: Network, header: HeaderContent, tip: &Tip,
215-
previous_point: &Point, fork_count: &mut u64,
214+
previous_point: &Point, fork: &mut Fork,
216215
) -> anyhow::Result<Point> {
217216
// Decode the Header of the block so we know what to fetch.
218217
let decoded_header = MultiEraHeader::decode(
@@ -222,7 +221,7 @@ async fn process_next_block(
222221
)
223222
.with_context(|| "Decoding Block Header")?;
224223

225-
let block_point = Point::new(decoded_header.slot(), decoded_header.hash().to_vec());
224+
let block_point = Point::new(decoded_header.slot().into(), decoded_header.hash().into());
226225

227226
debug!("RollForward: {block_point:?} {tip:?}");
228227

@@ -231,20 +230,20 @@ async fn process_next_block(
231230
chain,
232231
block_point.clone(),
233232
previous_point.clone(),
234-
*fork_count,
233+
*fork,
235234
)
236235
.await?;
237236

238237
let block_point = block.point();
239238

240239
// We can't store this block because we don't know the previous one so the chain
241240
// would break, so just use it for previous.
242-
if *previous_point == UNKNOWN_POINT {
241+
if *previous_point == Point::UNKNOWN {
243242
// Nothing else we can do with the first block when we don't know the previous
244243
// one. Just return it's point.
245244
debug!("Not storing the block, because we did not know the previous point.");
246245
} else {
247-
live_chain_add_block_to_tip(chain, block, fork_count, tip.0.clone().into())?;
246+
live_chain_add_block_to_tip(chain, block, fork, tip.0.clone().into())?;
248247
}
249248

250249
Ok(block_point)
@@ -255,10 +254,10 @@ async fn process_next_block(
255254
///
256255
/// We take ownership of the client because of that.
257256
async fn follow_chain(
258-
peer: &mut PeerClient, chain: Network, fork_count: &mut u64,
257+
peer: &mut PeerClient, chain: Network, fork: &mut Fork,
259258
) -> anyhow::Result<()> {
260259
let mut update_sender = get_chain_update_tx_queue(chain).await;
261-
let mut previous_point = UNKNOWN_POINT;
260+
let mut previous_point = Point::UNKNOWN;
262261

263262
loop {
264263
// debug!("Waiting for data from Cardano Peer Node:");
@@ -286,16 +285,15 @@ async fn follow_chain(
286285
// subtracting current block height and the tip block height.
287286
// IF the TIP is <= the current block height THEN we are at tip.
288287
previous_point =
289-
process_next_block(peer, chain, header, &tip, &previous_point, fork_count)
290-
.await?;
288+
process_next_block(peer, chain, header, &tip, &previous_point, fork).await?;
291289

292290
// This update is just for followers to know to look again at their live chains for
293291
// new data.
294292
notify_follower(chain, &update_sender, &chain_update::Kind::Block);
295293
},
296294
chainsync::NextResponse::RollBackward(point, tip) => {
297295
previous_point =
298-
process_rollback(peer, chain, point.into(), &tip, &previous_point, fork_count)
296+
process_rollback(peer, chain, point.into(), &tip, &previous_point, fork)
299297
.await?;
300298
// This update is just for followers to know to look again at their live chains for
301299
// new data.
@@ -367,8 +365,8 @@ async fn live_sync_backfill(
367365

368366
while let Some(block_data) = peer.blockfetch().recv_while_streaming().await? {
369367
// Backfilled blocks get placed in the oldest fork currently on the live-chain.
370-
let block =
371-
MultiEraBlock::new(cfg.chain, block_data, &previous_point, 1).with_context(|| {
368+
let block = MultiEraBlock::new(cfg.chain, block_data, &previous_point, 1.into())
369+
.with_context(|| {
372370
format!(
373371
"Failed to decode block data. previous: {previous_point:?}, range: {range_msg}"
374372
)
@@ -533,7 +531,7 @@ pub(crate) async fn chain_sync(cfg: ChainSyncConfig, rx: mpsc::Receiver<MithrilU
533531
// Live Fill data starts at fork 1.
534532
// Immutable data from a mithril snapshot is fork 0.
535533
// Live backfill is always Fork 1.
536-
let mut fork_count: u64 = 2;
534+
let mut fork: Fork = Fork::from_saturating(2);
537535

538536
loop {
539537
// We never have a connection if we end up around the loop, so make a new one.
@@ -551,7 +549,7 @@ pub(crate) async fn chain_sync(cfg: ChainSyncConfig, rx: mpsc::Receiver<MithrilU
551549
}
552550

553551
// Note: This can ONLY return with an error, otherwise it will sync indefinitely.
554-
if let Err(error) = follow_chain(&mut peer, cfg.chain, &mut fork_count).await {
552+
if let Err(error) = follow_chain(&mut peer, cfg.chain, &mut fork).await {
555553
error!(
556554
"Cardano Client {} failed to follow chain: {}: Reconnecting.",
557555
cfg.relay_address, error
@@ -560,7 +558,7 @@ pub(crate) async fn chain_sync(cfg: ChainSyncConfig, rx: mpsc::Receiver<MithrilU
560558
}
561559

562560
// If this returns, we are on a new fork (or assume we are)
563-
fork_count += 1;
561+
fork.incr();
564562
}
565563
}
566564

rust/cardano-chain-follower/src/chain_sync_config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
77
use std::sync::LazyLock;
88

9+
use cardano_blockchain_types::Network;
910
use dashmap::DashMap;
1011
use strum::IntoEnumIterator;
1112
use tokio::{sync::Mutex, task::JoinHandle};
@@ -15,7 +16,6 @@ use crate::{
1516
chain_sync::chain_sync,
1617
error::{Error, Result},
1718
mithril_snapshot_config::MithrilSnapshotConfig,
18-
network::Network,
1919
stats,
2020
};
2121

rust/cardano-chain-follower/src/chain_sync_live_chains.rs

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::{
66
time::Duration,
77
};
88

9+
use cardano_blockchain_types::{Fork, MultiEraBlock, Network, Point};
910
use crossbeam_skiplist::SkipMap;
1011
use rayon::prelude::*;
1112
use strum::IntoEnumIterator;
@@ -14,8 +15,7 @@ use tracing::{debug, error};
1415
use crate::{
1516
error::{Error, Result},
1617
mithril_snapshot_data::latest_mithril_snapshot_id,
17-
point::UNKNOWN_POINT,
18-
stats, MultiEraBlock, Network, Point, TIP_POINT,
18+
stats,
1919
};
2020

2121
/// Type we use to manage the Sync Task handle map.
@@ -40,7 +40,7 @@ static LIVE_CHAINS: LazyLock<SkipMap<Network, ProtectedLiveChainBlockList>> = La
4040
static PEER_TIP: LazyLock<SkipMap<Network, Point>> = LazyLock::new(|| {
4141
let map = SkipMap::new();
4242
for network in Network::iter() {
43-
map.insert(network, UNKNOWN_POINT);
43+
map.insert(network, Point::UNKNOWN);
4444
}
4545
map
4646
});
@@ -52,7 +52,7 @@ fn update_peer_tip(chain: Network, tip: Point) {
5252

5353
/// Set the last TIP received from the peer.
5454
pub(crate) fn get_peer_tip(chain: Network) -> Point {
55-
(*PEER_TIP.get_or_insert(chain, UNKNOWN_POINT).value()).clone()
55+
(*PEER_TIP.get_or_insert(chain, Point::UNKNOWN).value()).clone()
5656
}
5757

5858
/// Number of seconds to wait if we detect a `SyncReady` race condition.
@@ -122,7 +122,7 @@ impl ProtectedLiveChainBlockList {
122122
let Some(check_last_live_entry) = live_chain.back() else {
123123
// Its not an error if we can't get a latest block because the chain is empty,
124124
// so report that we don't know...
125-
return UNKNOWN_POINT;
125+
return Point::UNKNOWN;
126126
};
127127
let check_last_live_block = check_last_live_entry.value();
128128
check_last_live_block.point()
@@ -192,7 +192,7 @@ impl ProtectedLiveChainBlockList {
192192
/// would be lost due to rollback. Will REFUSE to add a block which does NOT have
193193
/// a proper "previous" point defined.
194194
fn add_block_to_tip(
195-
&self, chain: Network, block: MultiEraBlock, fork_count: &mut u64, tip: Point,
195+
&self, chain: Network, block: MultiEraBlock, fork: &mut Fork, tip: Point,
196196
) -> Result<()> {
197197
let live_chain = self.0.write().map_err(|_| Error::Internal)?;
198198

@@ -202,11 +202,11 @@ impl ProtectedLiveChainBlockList {
202202
let last_live_point = Self::get_last_live_point(&live_chain);
203203
if !previous_point.strict_eq(&last_live_point) {
204204
// Detected a rollback, so increase the fork count.
205-
*fork_count += 1;
205+
fork.incr();
206206
let mut rollback_size: u64 = 0;
207207

208208
// We are NOT contiguous, so check if we can become contiguous with a rollback.
209-
debug!("Detected non-contiguous block, rolling back. Fork: {fork_count}");
209+
debug!("Detected non-contiguous block, rolling back. Fork: {fork:?}");
210210

211211
// First check if the previous is >= the earliest block in the live chain.
212212
// This is because when we start syncing we could rollback earlier than our
@@ -265,7 +265,7 @@ impl ProtectedLiveChainBlockList {
265265
// Ensures we are properly connected to the Mithril Chain.
266266
// But don't check this if we are about to purge the entire chain.
267267
// We do this before we bother locking the chain for update.
268-
if *point != TIP_POINT {
268+
if *point != Point::TIP {
269269
let latest_mithril_tip = latest_mithril_snapshot_id(chain).tip();
270270
if !point.strict_eq(&latest_mithril_tip) {
271271
return Err(Error::LiveSync(format!(
@@ -278,7 +278,7 @@ impl ProtectedLiveChainBlockList {
278278

279279
// Special Case.
280280
// If the Purge Point == TIP_POINT, then we purge the entire chain.
281-
if *point == TIP_POINT {
281+
if *point == Point::TIP {
282282
live_chain.clear();
283283
} else {
284284
// If the block we want to purge upto must be in the chain.
@@ -345,7 +345,7 @@ impl ProtectedLiveChainBlockList {
345345

346346
// Loop until we exhaust probe slots, OR we would step past genesis.
347347
while slot_age < reference_slot {
348-
let ref_point = Point::fuzzy(reference_slot - slot_age);
348+
let ref_point = Point::fuzzy((reference_slot - slot_age).into());
349349
let Some(entry) = chain.lower_bound(Bound::Included(&ref_point)) else {
350350
break;
351351
};
@@ -363,15 +363,15 @@ impl ProtectedLiveChainBlockList {
363363
/// Given a known point on the live chain, and a fork count, find the best block we
364364
/// have.
365365
fn find_best_fork_block(
366-
&self, point: &Point, previous_point: &Point, fork: u64,
366+
&self, point: &Point, previous_point: &Point, fork: Fork,
367367
) -> Option<(MultiEraBlock, u64)> {
368368
let mut rollback_depth: u64 = 0;
369369
let Ok(chain) = self.0.read() else {
370370
return None;
371371
};
372372

373373
// Get the block <= the current slot.
374-
let ref_point = Point::fuzzy(point.slot_or_default());
374+
let ref_point = Point::fuzzy(point.slot_or_default().into());
375375
let mut entry = chain.upper_bound(Bound::Included(&ref_point))?;
376376

377377
let mut this_block = entry.value().clone();
@@ -400,7 +400,7 @@ impl ProtectedLiveChainBlockList {
400400
let live_chain = self.0.read().map_err(|_| Error::Internal).ok()?;
401401

402402
let head_point = Self::get_last_live_point(&live_chain);
403-
if head_point == UNKNOWN_POINT {
403+
if head_point == Point::UNKNOWN {
404404
return None;
405405
}
406406

@@ -457,7 +457,7 @@ pub(crate) fn get_live_block(
457457
///
458458
/// Note: It MAY change between calling this function and actually backfilling.
459459
/// This is expected and normal behavior.
460-
pub(crate) async fn get_fill_to_point(chain: Network) -> (Point, u64) {
460+
pub(crate) async fn get_fill_to_point(chain: Network) -> (Point, Fork) {
461461
let live_chain = get_live_chain(chain);
462462

463463
loop {
@@ -474,10 +474,10 @@ pub(crate) async fn get_fill_to_point(chain: Network) -> (Point, u64) {
474474
/// `rollback_count` should be set to 1 on the very first connection, after that,
475475
/// it is maintained by this function, and MUST not be modified elsewhere.
476476
pub(crate) fn live_chain_add_block_to_tip(
477-
chain: Network, block: MultiEraBlock, fork_count: &mut u64, tip: Point,
477+
chain: Network, block: MultiEraBlock, fork: &mut Fork, tip: Point,
478478
) -> Result<()> {
479479
let live_chain = get_live_chain(chain);
480-
live_chain.add_block_to_tip(chain, block, fork_count, tip)
480+
live_chain.add_block_to_tip(chain, block, fork, tip)
481481
}
482482

483483
/// Backfill the live chain with the block set provided.
@@ -509,7 +509,7 @@ pub(crate) fn get_intersect_points(chain: Network) -> Vec<pallas::network::minip
509509

510510
/// Find best block from a fork relative to a point.
511511
pub(crate) fn find_best_fork_block(
512-
chain: Network, point: &Point, previous_point: &Point, fork: u64,
512+
chain: Network, point: &Point, previous_point: &Point, fork: Fork,
513513
) -> Option<(MultiEraBlock, u64)> {
514514
let live_chain = get_live_chain(chain);
515515
live_chain.find_best_fork_block(point, previous_point, fork)

0 commit comments

Comments
 (0)