Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
ad163eb
init test
LeoPatOZ Sep 18, 2025
3844e13
refactor reorg test
LeoPatOZ Sep 18, 2025
f824f74
add more basic block scanner test
LeoPatOZ Sep 18, 2025
42a0a40
update test
LeoPatOZ Sep 22, 2025
768bf8e
Merge branch 'main' into requirement-tests
LeoPatOZ Sep 23, 2025
8e41902
update naming
LeoPatOZ Sep 23, 2025
be28102
update
LeoPatOZ Sep 23, 2025
c7efa8d
format
LeoPatOZ Sep 23, 2025
d474f9d
Merge branch 'main' into requirement-tests
LeoPatOZ Sep 23, 2025
87c3916
update tests for range inclusive
LeoPatOZ Sep 23, 2025
9f23779
rename
LeoPatOZ Sep 23, 2025
da8a308
better logging
LeoPatOZ Sep 23, 2025
0fa7850
refactor to have seperatation in modes
LeoPatOZ Sep 23, 2025
2b7eafd
unify live and historical
LeoPatOZ Sep 23, 2025
f17042a
comments explaining
LeoPatOZ Sep 23, 2025
1f0e5ab
better cuttoff tests
LeoPatOZ Sep 23, 2025
e81d6c6
update comment
LeoPatOZ Sep 23, 2025
ac45a6f
fmt
LeoPatOZ Sep 23, 2025
54a8a4a
fmt
LeoPatOZ Sep 23, 2025
82931b9
Merge branch 'refactor-block-range-scanner' into reorg-live-mode
LeoPatOZ Sep 23, 2025
f62d79f
update to use saturating add
LeoPatOZ Sep 23, 2025
aec2a11
reorg detection start
LeoPatOZ Sep 23, 2025
5d488c0
Merge branch 'refactor-block-range-scanner' into reorg-live-mode
LeoPatOZ Sep 23, 2025
f8a8037
update comment about reorg
LeoPatOZ Sep 23, 2025
ce8e5c1
replace expects
LeoPatOZ Sep 23, 2025
9be45a6
format
LeoPatOZ Sep 23, 2025
3a547e0
Update src/block_range_scanner.rs
LeoPatOZ Sep 24, 2025
de9a07b
add check to historical
LeoPatOZ Sep 24, 2025
33c50fb
move EOF check to sync historical data function
LeoPatOZ Sep 24, 2025
2c4c281
fmt
LeoPatOZ Sep 24, 2025
aa98709
refactor to have commands
LeoPatOZ Sep 24, 2025
3649cba
helper function for the subsriber logic
LeoPatOZ Sep 24, 2025
cb419ea
Merge branch 'requirement-tests' into refactor-block-range-scanner
LeoPatOZ Sep 24, 2025
6ba0b24
comment
LeoPatOZ Sep 24, 2025
dd6cc1e
removed optional block hash and num
LeoPatOZ Sep 24, 2025
0324247
comment about unwrap
LeoPatOZ Sep 24, 2025
218dbc5
send error
LeoPatOZ Sep 24, 2025
22a4d25
update batch end fetching error message
0xNeshi Sep 24, 2025
9fc91e7
refactor + print warning if channel closed when sending error
0xNeshi Sep 24, 2025
2bb4690
updated test cases
LeoPatOZ Sep 24, 2025
07b24e1
Merge branch 'main' into refactor-block-range-scanner
LeoPatOZ Sep 24, 2025
8877d98
format
LeoPatOZ Sep 24, 2025
77a820a
Merge branch 'refactor-block-range-scanner' into reorg-live-mode
LeoPatOZ Sep 24, 2025
ced78ce
improve reorg logic
LeoPatOZ Sep 24, 2025
33f89e8
comment out for now
LeoPatOZ Sep 24, 2025
367d5b3
update reorg test
LeoPatOZ Sep 25, 2025
5d003c6
up
LeoPatOZ Sep 25, 2025
352c2bd
Merge branch 'main' into reorg-live-mode
LeoPatOZ Sep 25, 2025
94697c8
checkpoint commit - INCLUDES REORG LOGIC
LeoPatOZ Sep 25, 2025
e260c81
update scanner
LeoPatOZ Sep 25, 2025
f28fd7f
final checks
LeoPatOZ Sep 25, 2025
9e75d44
format
LeoPatOZ Sep 25, 2025
f4b7384
Update tests/live_mode/reorg.rs
LeoPatOZ Sep 26, 2025
a1a530d
Update tests/live_mode/reorg.rs
LeoPatOZ Sep 26, 2025
427cb19
Update tests/live_mode/reorg.rs
LeoPatOZ Sep 26, 2025
d496f6c
update test
LeoPatOZ Sep 26, 2025
2547f73
Merge branch 'main' into reorg-live-mode
LeoPatOZ Sep 26, 2025
0a9d525
remove callback
LeoPatOZ Sep 26, 2025
58ef2a3
update range
LeoPatOZ Sep 26, 2025
98dd7df
revert change to curr
LeoPatOZ Sep 26, 2025
f150bdf
remove print
LeoPatOZ Sep 26, 2025
f5bac5b
reorg test
LeoPatOZ Sep 26, 2025
b012367
add more tests
LeoPatOZ Sep 26, 2025
3a3e572
format
LeoPatOZ Sep 26, 2025
c9de225
ignored rewind depth for now
LeoPatOZ Sep 26, 2025
8af35e8
Merge branch 'main' into reorg-live-mode
LeoPatOZ Sep 26, 2025
b7a675e
remove unused import
LeoPatOZ Sep 26, 2025
27da4d8
cleaner reorg logic
LeoPatOZ Sep 27, 2025
4eba9ff
remove todo and move warning
LeoPatOZ Sep 29, 2025
bdfddcf
merge
LeoPatOZ Sep 29, 2025
d207c32
Merge branch 'main' into reorg-live-mode
LeoPatOZ Sep 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 28 additions & 9 deletions src/block_range_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ pub enum Error {

#[error("WebSocket connection failed after {0} attempts")]
WebSocketConnectionFailed(usize),

#[error("Reorg Detected")]
ReorgDetected,
}

#[derive(Debug)]
Expand Down Expand Up @@ -404,8 +407,9 @@ impl<N: Network> Service<N> {
return Err(Error::ServiceShutdown);
};

let reorg_rewind = self.config.reorg_rewind_depth;
tokio::spawn(async move {
Self::stream_live_blocks(start, provider, sender).await;
Self::stream_live_blocks(start, provider, sender, reorg_rewind).await;
});

Ok(())
Expand Down Expand Up @@ -478,9 +482,12 @@ impl<N: Network> Service<N> {
// Any block > cutoff will come from the live stream
let cutoff = end_block.header().number();

let reorg_rewind = self.config.reorg_rewind_depth;

// This task runs independently, accumulating new blocks while wehistorical data is syncing
let live_subscription_task = tokio::spawn(async move {
Self::stream_live_blocks(cutoff + 1, provider, live_block_buffer_sender).await;
Self::stream_live_blocks(cutoff + 1, provider, live_block_buffer_sender, reorg_rewind)
.await;
});

// Step 4: Perform historical synchronization
Expand Down Expand Up @@ -551,26 +558,38 @@ impl<N: Network> Service<N> {
}

async fn stream_live_blocks<P: Provider<N>>(
mut current: BlockNumber,
mut expected_next_block: BlockNumber,
provider: P,
sender: mpsc::Sender<Result<RangeInclusive<BlockNumber>, Error>>,
_reorg_rewind_depth: u64,
) {
match Self::get_block_subscription(&provider).await {
Ok(ws_stream) => {
info!("WebSocket connected for live blocks");

let cur = current;
let cur = expected_next_block;
let mut stream = ws_stream.into_stream().skip_while(|header| header.number() < cur);
while let Some(header_resp) = stream.next().await {
info!(block_number = header_resp.number(), "Received block header");
while let Some(incoming_block) = stream.next().await {
let incoming_block_num = incoming_block.number();
info!(block_number = incoming_block_num, "Received block header");

if incoming_block_num < expected_next_block {
warn!("Reorg detected: sending forked range");
if sender.send(Err(Error::ReorgDetected)).await.is_err() {
warn!("Downstream channel closed, stopping live blocks task");
return;
}

// resets cursor to incoming block num
expected_next_block = incoming_block_num;
}

if sender.send(Ok(current..=header_resp.number())).await.is_err() {
if sender.send(Ok(expected_next_block..=incoming_block_num)).await.is_err() {
warn!("Downstream channel closed, stopping live blocks task");
return;
}

// next block will be processed in the next batch
current = header_resp.number() + 1;
expected_next_block = incoming_block_num + 1;
}
}
Err(e) => {
Expand Down
6 changes: 4 additions & 2 deletions src/event_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use tokio::sync::{
mpsc,
};
use tokio_stream::{StreamExt, wrappers::ReceiverStream};
use tracing::{error, info};
use tracing::{error, info, warn};

pub struct EventScanner {
block_range_scanner: BlockRangeScanner,
Expand Down Expand Up @@ -133,7 +133,9 @@ impl<N: Network> ConnectedEventScanner<N> {
}
}
Err(e) => {
error!(error = %e, "failed to get block range");
warn!(error = %e, "failed to get block range");

// range_tx.send(Err(e)).await.unwrap();
}
}
}
Expand Down
1 change: 1 addition & 0 deletions tests/live_mode/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod basic;
pub mod optional_fields;
pub mod performance;
pub mod reorg;
Loading