Skip to content

Commit c4e76dc

Browse files
LeoPatOZ0xNeshi
andauthored
Reorg live mode (#74)
Co-authored-by: Nenad <[email protected]>
1 parent c507623 commit c4e76dc

File tree

4 files changed

+335
-11
lines changed

4 files changed

+335
-11
lines changed

src/block_range_scanner.rs

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,9 @@ pub enum Error {
127127

128128
#[error("WebSocket connection failed after {0} attempts")]
129129
WebSocketConnectionFailed(usize),
130+
131+
#[error("Reorg Detected")]
132+
ReorgDetected,
130133
}
131134

132135
#[derive(Debug)]
@@ -404,8 +407,9 @@ impl<N: Network> Service<N> {
404407
return Err(Error::ServiceShutdown);
405408
};
406409

410+
let reorg_rewind = self.config.reorg_rewind_depth;
407411
tokio::spawn(async move {
408-
Self::stream_live_blocks(start, provider, sender).await;
412+
Self::stream_live_blocks(start, provider, sender, reorg_rewind).await;
409413
});
410414

411415
Ok(())
@@ -478,9 +482,12 @@ impl<N: Network> Service<N> {
478482
// Any block > cutoff will come from the live stream
479483
let cutoff = end_block.header().number();
480484

485+
let reorg_rewind = self.config.reorg_rewind_depth;
486+
481487
// This task runs independently, accumulating new blocks while wehistorical data is syncing
482488
let live_subscription_task = tokio::spawn(async move {
483-
Self::stream_live_blocks(cutoff + 1, provider, live_block_buffer_sender).await;
489+
Self::stream_live_blocks(cutoff + 1, provider, live_block_buffer_sender, reorg_rewind)
490+
.await;
484491
});
485492

486493
// Step 4: Perform historical synchronization
@@ -551,26 +558,38 @@ impl<N: Network> Service<N> {
551558
}
552559

553560
async fn stream_live_blocks<P: Provider<N>>(
554-
mut current: BlockNumber,
561+
mut expected_next_block: BlockNumber,
555562
provider: P,
556563
sender: mpsc::Sender<Result<RangeInclusive<BlockNumber>, Error>>,
564+
_reorg_rewind_depth: u64,
557565
) {
558566
match Self::get_block_subscription(&provider).await {
559567
Ok(ws_stream) => {
560568
info!("WebSocket connected for live blocks");
561569

562-
let cur = current;
570+
let cur = expected_next_block;
563571
let mut stream = ws_stream.into_stream().skip_while(|header| header.number() < cur);
564-
while let Some(header_resp) = stream.next().await {
565-
info!(block_number = header_resp.number(), "Received block header");
572+
while let Some(incoming_block) = stream.next().await {
573+
let incoming_block_num = incoming_block.number();
574+
info!(block_number = incoming_block_num, "Received block header");
575+
576+
if incoming_block_num < expected_next_block {
577+
warn!("Reorg detected: sending forked range");
578+
if sender.send(Err(Error::ReorgDetected)).await.is_err() {
579+
warn!("Downstream channel closed, stopping live blocks task");
580+
return;
581+
}
582+
583+
// resets cursor to incoming block num
584+
expected_next_block = incoming_block_num;
585+
}
566586

567-
if sender.send(Ok(current..=header_resp.number())).await.is_err() {
587+
if sender.send(Ok(expected_next_block..=incoming_block_num)).await.is_err() {
568588
warn!("Downstream channel closed, stopping live blocks task");
569589
return;
570590
}
571591

572-
// next block will be processed in the next batch
573-
current = header_resp.number() + 1;
592+
expected_next_block = incoming_block_num + 1;
574593
}
575594
}
576595
Err(e) => {

src/event_scanner.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use tokio::sync::{
2020
mpsc,
2121
};
2222
use tokio_stream::{StreamExt, wrappers::ReceiverStream};
23-
use tracing::{error, info};
23+
use tracing::{error, info, warn};
2424

2525
pub struct EventScanner {
2626
block_range_scanner: BlockRangeScanner,
@@ -133,7 +133,9 @@ impl<N: Network> ConnectedEventScanner<N> {
133133
}
134134
}
135135
Err(e) => {
136-
error!(error = %e, "failed to get block range");
136+
warn!(error = %e, "failed to get block range");
137+
138+
// range_tx.send(Err(e)).await.unwrap();
137139
}
138140
}
139141
}

tests/live_mode/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
pub mod basic;
22
pub mod optional_fields;
33
pub mod performance;
4+
pub mod reorg;

0 commit comments

Comments
 (0)