Skip to content

Commit 2fd5318

Browse files
authored
refactor: Introduced the ReorgHandler trait (#233)
<!-- Append the issue number --> Resolves #226 This PR refactors the `ReorgHandler` struct into a trait-based design to enable better testability. ### Changes - **Added `ReorgHandler` trait** - Defines the interface for chain reorganization detection with a single `check` method - **Renamed `ReorgHandler` struct to `DefaultReorgHandler`** - The concrete implementation using RPC providers now implements the new trait - **Updated function signatures** - All functions that accept a reorg handler now use generic bounds (`R: ReorgHandler<N>`) instead of the concrete type
1 parent 3ffacc6 commit 2fd5318

File tree

7 files changed

+63
-34
lines changed

7 files changed

+63
-34
lines changed

src/block_range_scanner/common.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,14 @@ impl IntoScannerResult<RangeInclusive<BlockNumber>> for RangeInclusive<BlockNumb
5454
feature = "tracing",
5555
tracing::instrument(level = "trace", skip(subscription, sender, provider, reorg_handler))
5656
)]
57-
pub(crate) async fn stream_live_blocks<N: Network>(
57+
pub(crate) async fn stream_live_blocks<N: Network, R: ReorgHandler<N>>(
5858
stream_start: BlockNumber,
5959
subscription: RobustSubscription<N>,
6060
sender: &mpsc::Sender<BlockScannerResult>,
6161
provider: &RobustProvider<N>,
6262
block_confirmations: u64,
6363
max_block_range: u64,
64-
reorg_handler: &mut ReorgHandler<N>,
64+
reorg_handler: &mut R,
6565
notify_after_first_block: bool,
6666
) {
6767
// Phase 1: Wait for first relevant block
@@ -169,14 +169,14 @@ fn skip_to_first_relevant_block<N: Network>(
169169

170170
/// Initializes the streaming state after receiving the first block.
171171
/// Returns None if the channel is closed.
172-
async fn initialize_live_streaming_state<N: Network>(
172+
async fn initialize_live_streaming_state<N: Network, R: ReorgHandler<N>>(
173173
first_block: N::HeaderResponse,
174174
stream_start: BlockNumber,
175175
block_confirmations: u64,
176176
max_block_range: u64,
177177
sender: &mpsc::Sender<BlockScannerResult>,
178178
provider: &RobustProvider<N>,
179-
reorg_handler: &mut ReorgHandler<N>,
179+
reorg_handler: &mut R,
180180
) -> Option<LiveStreamingState<N>> {
181181
let confirmed = first_block.number().saturating_sub(block_confirmations);
182182

@@ -206,6 +206,7 @@ async fn initialize_live_streaming_state<N: Network>(
206206
async fn stream_blocks_continuously<
207207
N: Network,
208208
S: tokio_stream::Stream<Item = Result<N::HeaderResponse, subscription::Error>> + Unpin,
209+
R: ReorgHandler<N>,
209210
>(
210211
stream: &mut S,
211212
state: &mut LiveStreamingState<N>,
@@ -214,7 +215,7 @@ async fn stream_blocks_continuously<
214215
max_block_range: u64,
215216
sender: &mpsc::Sender<BlockScannerResult>,
216217
provider: &RobustProvider<N>,
217-
reorg_handler: &mut ReorgHandler<N>,
218+
reorg_handler: &mut R,
218219
) {
219220
while let Some(incoming_block) = stream.next().await {
220221
let incoming_block = match incoming_block {
@@ -337,14 +338,14 @@ async fn handle_reorg_detected<N: Network>(
337338

338339
/// Streams the next batch of blocks up to `batch_end_num`.
339340
/// Returns `ChannelState::Closed` if the channel is closed, `ChannelState::Open` otherwise.
340-
async fn stream_next_batch<N: Network>(
341+
async fn stream_next_batch<N: Network, R: ReorgHandler<N>>(
341342
batch_end_num: BlockNumber,
342343
state: &mut LiveStreamingState<N>,
343344
stream_start: BlockNumber,
344345
max_block_range: u64,
345346
sender: &mpsc::Sender<BlockScannerResult>,
346347
provider: &RobustProvider<N>,
347-
reorg_handler: &mut ReorgHandler<N>,
348+
reorg_handler: &mut R,
348349
) -> ChannelState {
349350
if batch_end_num < state.batch_start {
350351
// No new confirmed blocks to stream yet
@@ -389,13 +390,13 @@ struct LiveStreamingState<N: Network> {
389390
feature = "tracing",
390391
tracing::instrument(level = "trace", skip(sender, provider, reorg_handler))
391392
)]
392-
pub(crate) async fn stream_historical_range<N: Network>(
393+
pub(crate) async fn stream_historical_range<N: Network, R: ReorgHandler<N>>(
393394
start: BlockNumber,
394395
end: BlockNumber,
395396
max_block_range: u64,
396397
sender: &mpsc::Sender<BlockScannerResult>,
397398
provider: &RobustProvider<N>,
398-
reorg_handler: &mut ReorgHandler<N>,
399+
reorg_handler: &mut R,
399400
) -> Option<()> {
400401
// NOTE: Edge case - If the chain is too young to expose finalized blocks (height < finalized
401402
// depth) just use zero.
@@ -464,14 +465,14 @@ pub(crate) async fn stream_historical_range<N: Network>(
464465
feature = "tracing",
465466
tracing::instrument(level = "trace", skip(sender, provider, reorg_handler))
466467
)]
467-
pub(crate) async fn stream_range_with_reorg_handling<N: Network>(
468+
pub(crate) async fn stream_range_with_reorg_handling<N: Network, R: ReorgHandler<N>>(
468469
min_common_ancestor: BlockNumber,
469470
next_start_block: BlockNumber,
470471
end: BlockNumber,
471472
max_block_range: u64,
472473
sender: &mpsc::Sender<BlockScannerResult>,
473474
provider: &RobustProvider<N>,
474-
reorg_handler: &mut ReorgHandler<N>,
475+
reorg_handler: &mut R,
475476
) -> Option<N::BlockResponse> {
476477
let mut last_batch_end: Option<N::BlockResponse> = None;
477478
let mut iter = RangeIterator::forward(next_start_block, end, max_block_range);

src/block_range_scanner/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ mod sync_handler;
99

1010
pub use builder::BlockRangeScannerBuilder;
1111
pub use common::BlockScannerResult;
12+
pub use reorg_handler::ReorgHandler;
1213
pub use ring_buffer::RingBufferCapacity;
1314
pub use scanner::BlockRangeScanner;
1415

src/block_range_scanner/reorg_handler.rs

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,34 @@ use robust_provider::RobustProvider;
99
use super::ring_buffer::RingBuffer;
1010
use crate::{ScannerError, block_range_scanner::ring_buffer::RingBufferCapacity};
1111

12+
/// Trait for handling chain reorganizations.
13+
#[allow(async_fn_in_trait)]
14+
pub trait ReorgHandler<N: Network> {
15+
/// Checks if a block was reorged and returns the common ancestor if found.
16+
///
17+
/// # Arguments
18+
///
19+
/// * `block` - The block to check for reorg.
20+
///
21+
/// # Returns
22+
///
23+
/// * `Ok(Some(common_ancestor))` - If a reorg was detected, returns the common ancestor block.
24+
/// * `Ok(None)` - If no reorg was detected, returns `None`.
25+
/// * `Err(e)` - If an error occurred while checking for reorg.
26+
async fn check(
27+
&mut self,
28+
block: &N::BlockResponse,
29+
) -> Result<Option<N::BlockResponse>, ScannerError>;
30+
}
31+
32+
/// Default implementation of [`ReorgHandler`] that uses an RPC provider.
1233
#[derive(Clone, Debug)]
13-
pub(crate) struct ReorgHandler<N: Network = Ethereum> {
34+
pub(crate) struct DefaultReorgHandler<N: Network = Ethereum> {
1435
provider: RobustProvider<N>,
1536
buffer: RingBuffer<BlockHash>,
1637
}
1738

18-
impl<N: Network> ReorgHandler<N> {
19-
pub fn new(provider: RobustProvider<N>, capacity: RingBufferCapacity) -> Self {
20-
Self { provider, buffer: RingBuffer::new(capacity) }
21-
}
22-
39+
impl<N: Network> ReorgHandler<N> for DefaultReorgHandler<N> {
2340
/// Checks if a block was reorged and returns the common ancestor if found.
2441
///
2542
/// # Arguments
@@ -57,7 +74,7 @@ impl<N: Network> ReorgHandler<N> {
5774
feature = "tracing",
5875
tracing::instrument(level = "trace", fields(block.hash = %block.header().hash(), block.number = block.header().number()))
5976
)]
60-
pub async fn check(
77+
async fn check(
6178
&mut self,
6279
block: &N::BlockResponse,
6380
) -> Result<Option<N::BlockResponse>, ScannerError> {
@@ -114,6 +131,12 @@ impl<N: Network> ReorgHandler<N> {
114131

115132
Ok(Some(finalized))
116133
}
134+
}
135+
136+
impl<N: Network> DefaultReorgHandler<N> {
137+
pub fn new(provider: RobustProvider<N>, capacity: RingBufferCapacity) -> Self {
138+
Self { provider, buffer: RingBuffer::new(capacity) }
139+
}
117140

118141
async fn reorg_detected(&self, block: &N::HeaderResponse) -> Result<bool, ScannerError> {
119142
match self.provider.get_block_by_hash(block.hash()).await {

src/block_range_scanner/rewind_handler.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ use robust_provider::RobustProvider;
1212
use crate::{
1313
Notification, ScannerError,
1414
block_range_scanner::{
15-
common::BlockScannerResult, range_iterator::RangeIterator, reorg_handler::ReorgHandler,
15+
common::BlockScannerResult,
16+
range_iterator::RangeIterator,
17+
reorg_handler::{DefaultReorgHandler, ReorgHandler},
1618
ring_buffer::RingBufferCapacity,
1719
},
1820
types::{ChannelState, TryStream},
@@ -24,7 +26,7 @@ pub(crate) struct RewindHandler<N: Network> {
2426
start_id: BlockId,
2527
end_id: BlockId,
2628
sender: mpsc::Sender<BlockScannerResult>,
27-
reorg_handler: ReorgHandler<N>,
29+
reorg_handler: DefaultReorgHandler<N>,
2830
}
2931

3032
impl<N: Network> RewindHandler<N> {
@@ -36,7 +38,8 @@ impl<N: Network> RewindHandler<N> {
3638
past_blocks_storage_capacity: RingBufferCapacity,
3739
sender: mpsc::Sender<BlockScannerResult>,
3840
) -> Self {
39-
let reorg_handler = ReorgHandler::new(provider.clone(), past_blocks_storage_capacity);
41+
let reorg_handler =
42+
DefaultReorgHandler::new(provider.clone(), past_blocks_storage_capacity);
4043
Self { provider, max_block_range, start_id, end_id, sender, reorg_handler }
4144
}
4245

@@ -89,13 +92,13 @@ impl<N: Network> RewindHandler<N> {
8992
feature = "tracing",
9093
tracing::instrument(level = "trace", skip(sender, provider, reorg_handler))
9194
)]
92-
async fn handle_stream_rewind(
95+
async fn handle_stream_rewind<R: ReorgHandler<N>>(
9396
from: N::BlockResponse,
9497
to: N::BlockResponse,
9598
max_block_range: u64,
9699
sender: &mpsc::Sender<BlockScannerResult>,
97100
provider: &RobustProvider<N>,
98-
reorg_handler: &mut ReorgHandler<N>,
101+
reorg_handler: &mut R,
99102
) {
100103
// for checking whether reorg occurred
101104
let mut tip = from;

src/block_range_scanner/scanner.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ use crate::{
8787
block_range_scanner::{
8888
RingBufferCapacity,
8989
common::{self, BlockScannerResult},
90-
reorg_handler::ReorgHandler,
90+
reorg_handler::DefaultReorgHandler,
9191
rewind_handler::RewindHandler,
9292
sync_handler::SyncHandler,
9393
},
@@ -178,7 +178,7 @@ impl<N: Network> BlockRangeScanner<N> {
178178

179179
tokio::spawn(async move {
180180
let mut reorg_handler =
181-
ReorgHandler::new(provider.clone(), past_blocks_storage_capacity);
181+
DefaultReorgHandler::new(provider.clone(), past_blocks_storage_capacity);
182182

183183
common::stream_live_blocks(
184184
start_block,
@@ -246,7 +246,7 @@ impl<N: Network> BlockRangeScanner<N> {
246246

247247
tokio::spawn(async move {
248248
let mut reorg_handler =
249-
ReorgHandler::new(provider.clone(), past_blocks_storage_capacity);
249+
DefaultReorgHandler::new(provider.clone(), past_blocks_storage_capacity);
250250

251251
_ = common::stream_historical_range(
252252
start_block_num,

src/block_range_scanner/sync_handler.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::{
66
Notification, ScannerError,
77
block_range_scanner::{
88
common::{self, BlockScannerResult},
9-
reorg_handler::ReorgHandler,
9+
reorg_handler::{DefaultReorgHandler, ReorgHandler},
1010
ring_buffer::RingBufferCapacity,
1111
},
1212
types::TryStream,
@@ -18,7 +18,7 @@ pub(crate) struct SyncHandler<N: Network> {
1818
start_id: BlockId,
1919
block_confirmations: u64,
2020
sender: mpsc::Sender<BlockScannerResult>,
21-
reorg_handler: ReorgHandler<N>,
21+
reorg_handler: DefaultReorgHandler<N>,
2222
}
2323

2424
impl<N: Network> SyncHandler<N> {
@@ -30,7 +30,8 @@ impl<N: Network> SyncHandler<N> {
3030
past_blocks_storage_capacity: RingBufferCapacity,
3131
sender: mpsc::Sender<BlockScannerResult>,
3232
) -> Self {
33-
let reorg_handler = ReorgHandler::new(provider.clone(), past_blocks_storage_capacity);
33+
let reorg_handler =
34+
DefaultReorgHandler::new(provider.clone(), past_blocks_storage_capacity);
3435
Self { provider, max_block_range, start_id, block_confirmations, sender, reorg_handler }
3536
}
3637

@@ -147,14 +148,14 @@ impl<N: Network> SyncHandler<N> {
147148

148149
/// Catches up on historical blocks until we reach the chain tip
149150
/// Returns the block number where live streaming should begin
150-
async fn catchup_historical_blocks(
151+
async fn catchup_historical_blocks<R: ReorgHandler<N>>(
151152
mut start_block: BlockNumber,
152153
mut confirmed_tip: BlockNumber,
153154
block_confirmations: u64,
154155
max_block_range: u64,
155156
sender: &mpsc::Sender<BlockScannerResult>,
156157
provider: &RobustProvider<N>,
157-
reorg_handler: &mut ReorgHandler<N>,
158+
reorg_handler: &mut R,
158159
) -> Result<Option<BlockNumber>, ScannerError> {
159160
while start_block < confirmed_tip {
160161
if common::stream_historical_range(
@@ -181,13 +182,13 @@ impl<N: Network> SyncHandler<N> {
181182
}
182183

183184
/// Subscribes to live blocks and begins streaming
184-
async fn transition_to_live(
185+
async fn transition_to_live<R: ReorgHandler<N>>(
185186
start_block: BlockNumber,
186187
block_confirmations: u64,
187188
max_block_range: u64,
188189
sender: &mpsc::Sender<BlockScannerResult>,
189190
provider: &RobustProvider<N>,
190-
reorg_handler: &mut ReorgHandler<N>,
191+
reorg_handler: &mut R,
191192
) {
192193
let subscription = match provider.subscribe_blocks().await {
193194
Ok(sub) => sub,

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ mod types;
6363

6464
pub use block_range_scanner::{
6565
BlockRangeScanner, BlockRangeScannerBuilder, BlockScannerResult, DEFAULT_BLOCK_CONFIRMATIONS,
66-
DEFAULT_MAX_BLOCK_RANGE, DEFAULT_STREAM_BUFFER_CAPACITY, RingBufferCapacity,
66+
DEFAULT_MAX_BLOCK_RANGE, DEFAULT_STREAM_BUFFER_CAPACITY, ReorgHandler, RingBufferCapacity,
6767
RingBufferCapacity as PastBlocksStorageCapacity,
6868
};
6969

0 commit comments

Comments
 (0)