Skip to content

Commit 7a3999e

Browse files
LeoPatOZ0xNeshi
andauthored
ref: Move BlockRangeScanner to its own module (#275)
Co-authored-by: Nenad <[email protected]>
1 parent 69f5e79 commit 7a3999e

File tree

13 files changed

+273
-237
lines changed

13 files changed

+273
-237
lines changed

src/block_range_scanner/builder.rs

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
use alloy::network::Network;
2+
3+
use crate::{
4+
ScannerError,
5+
block_range_scanner::{
6+
DEFAULT_MAX_BLOCK_RANGE, DEFAULT_STREAM_BUFFER_CAPACITY, RingBufferCapacity,
7+
scanner::BlockRangeScanner,
8+
},
9+
robust_provider::IntoRobustProvider,
10+
};
11+
12+
/// Builder/configuration for the block-range streaming service.
13+
#[derive(Clone, Debug)]
14+
pub struct BlockRangeScannerBuilder {
15+
/// Maximum number of blocks per streamed range.
16+
pub max_block_range: u64,
17+
/// How many past block hashes to keep in memory for reorg detection.
18+
///
19+
/// If set to `RingBufferCapacity::Limited(0)`, reorg detection is disabled.
20+
pub past_blocks_storage_capacity: RingBufferCapacity,
21+
pub buffer_capacity: usize,
22+
}
23+
24+
impl Default for BlockRangeScannerBuilder {
25+
fn default() -> Self {
26+
Self::new()
27+
}
28+
}
29+
30+
impl BlockRangeScannerBuilder {
31+
/// Creates a scanner with default configuration.
32+
#[must_use]
33+
pub fn new() -> Self {
34+
Self {
35+
max_block_range: DEFAULT_MAX_BLOCK_RANGE,
36+
past_blocks_storage_capacity: RingBufferCapacity::Limited(10),
37+
buffer_capacity: DEFAULT_STREAM_BUFFER_CAPACITY,
38+
}
39+
}
40+
41+
/// Sets the maximum number of blocks per streamed range.
42+
///
43+
/// This controls batching for historical scans and for catch-up in live/sync scanners.
44+
///
45+
/// Must be greater than 0.
46+
#[must_use]
47+
pub fn max_block_range(mut self, max_block_range: u64) -> Self {
48+
self.max_block_range = max_block_range;
49+
self
50+
}
51+
52+
/// Sets how many past block hashes to keep in memory for reorg detection.
53+
///
54+
/// If set to `RingBufferCapacity::Limited(0)`, reorg detection is disabled.
55+
#[must_use]
56+
pub fn past_blocks_storage_capacity(
57+
mut self,
58+
past_blocks_storage_capacity: RingBufferCapacity,
59+
) -> Self {
60+
self.past_blocks_storage_capacity = past_blocks_storage_capacity;
61+
self
62+
}
63+
64+
/// Sets the stream buffer capacity.
65+
///
66+
/// Controls the maximum number of messages that can be buffered in the stream
67+
/// before backpressure is applied.
68+
///
69+
/// # Arguments
70+
///
71+
/// * `buffer_capacity` - Maximum number of messages to buffer (must be greater than 0)
72+
#[must_use]
73+
pub fn buffer_capacity(mut self, buffer_capacity: usize) -> Self {
74+
self.buffer_capacity = buffer_capacity;
75+
self
76+
}
77+
78+
/// Connects to an existing provider
79+
///
80+
/// # Errors
81+
///
82+
/// Returns an error if the provider connection fails.
83+
pub async fn connect<N: Network>(
84+
self,
85+
provider: impl IntoRobustProvider<N>,
86+
) -> Result<BlockRangeScanner<N>, ScannerError> {
87+
if self.max_block_range == 0 {
88+
return Err(ScannerError::InvalidMaxBlockRange);
89+
}
90+
if self.buffer_capacity == 0 {
91+
return Err(ScannerError::InvalidBufferCapacity);
92+
}
93+
let provider = provider.into_robust_provider().await?;
94+
Ok(BlockRangeScanner {
95+
provider,
96+
max_block_range: self.max_block_range,
97+
past_blocks_storage_capacity: self.past_blocks_storage_capacity,
98+
buffer_capacity: self.buffer_capacity,
99+
})
100+
}
101+
}

src/block_range_scanner/common.rs

Lines changed: 38 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
1+
use std::ops::RangeInclusive;
2+
13
use tokio::sync::mpsc;
24
use tokio_stream::StreamExt;
35

46
use crate::{
5-
ScannerError,
6-
block_range_scanner::{BlockScannerResult, RangeIterator, reorg_handler::ReorgHandler},
7+
ScannerError, ScannerMessage,
8+
block_range_scanner::{range_iterator::RangeIterator, reorg_handler::ReorgHandler},
79
robust_provider::{RobustProvider, RobustSubscription, subscription},
8-
types::{Notification, TryStream},
10+
types::{IntoScannerResult, Notification, ScannerResult, TryStream},
911
};
1012
use alloy::{
1113
consensus::BlockHeader,
@@ -14,6 +16,39 @@ use alloy::{
1416
primitives::BlockNumber,
1517
};
1618

19+
/// Default maximum number of blocks per streamed range.
20+
pub const DEFAULT_MAX_BLOCK_RANGE: u64 = 1000;
21+
22+
/// Default confirmation depth used by scanners that accept a `block_confirmations` setting.
23+
pub const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0;
24+
25+
/// Default per-stream buffer size used by scanners.
26+
pub const DEFAULT_STREAM_BUFFER_CAPACITY: usize = 50000;
27+
28+
/// The result type yielded by block-range streams.
29+
pub type BlockScannerResult = ScannerResult<RangeInclusive<BlockNumber>>;
30+
31+
/// Convenience alias for a streamed block-range message.
32+
pub type Message = ScannerMessage<RangeInclusive<BlockNumber>>;
33+
34+
impl From<RangeInclusive<BlockNumber>> for Message {
35+
fn from(range: RangeInclusive<BlockNumber>) -> Self {
36+
Message::Data(range)
37+
}
38+
}
39+
40+
impl PartialEq<RangeInclusive<BlockNumber>> for Message {
41+
fn eq(&self, other: &RangeInclusive<BlockNumber>) -> bool {
42+
if let Message::Data(range) = self { range.eq(other) } else { false }
43+
}
44+
}
45+
46+
impl IntoScannerResult<RangeInclusive<BlockNumber>> for RangeInclusive<BlockNumber> {
47+
fn into_scanner_message_result(self) -> BlockScannerResult {
48+
Ok(Message::Data(self))
49+
}
50+
}
51+
1752
#[allow(clippy::too_many_arguments)]
1853
pub(crate) async fn stream_live_blocks<N: Network>(
1954
stream_start: BlockNumber,

src/block_range_scanner/mod.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
mod builder;
2+
mod common;
3+
mod range_iterator;
4+
mod reorg_handler;
5+
mod ring_buffer;
6+
mod scanner;
7+
mod sync_handler;
8+
9+
pub use builder::BlockRangeScannerBuilder;
10+
pub use common::BlockScannerResult;
11+
pub use ring_buffer::RingBufferCapacity;
12+
pub use scanner::BlockRangeScanner;
13+
14+
pub use common::{
15+
DEFAULT_BLOCK_CONFIRMATIONS, DEFAULT_MAX_BLOCK_RANGE, DEFAULT_STREAM_BUFFER_CAPACITY,
16+
};

0 commit comments

Comments
 (0)