Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions src/block_range_scanner/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use alloy::network::Network;

use crate::{
ScannerError,
block_range_scanner::{
DEFAULT_MAX_BLOCK_RANGE, DEFAULT_STREAM_BUFFER_CAPACITY, RingBufferCapacity,
scanner::BlockRangeScanner,
},
robust_provider::IntoRobustProvider,
};

/// Builder/configuration for the block-range streaming service.
#[derive(Clone, Debug)]
pub struct BlockRangeScannerBuilder {
/// Maximum number of blocks per streamed range.
pub max_block_range: u64,
/// How many past block hashes to keep in memory for reorg detection.
///
/// If set to `RingBufferCapacity::Limited(0)`, reorg detection is disabled.
pub past_blocks_storage_capacity: RingBufferCapacity,
pub buffer_capacity: usize,
}

impl Default for BlockRangeScannerBuilder {
fn default() -> Self {
Self::new()
}
}

impl BlockRangeScannerBuilder {
/// Creates a scanner with default configuration.
#[must_use]
pub fn new() -> Self {
Self {
max_block_range: DEFAULT_MAX_BLOCK_RANGE,
past_blocks_storage_capacity: RingBufferCapacity::Limited(10),
buffer_capacity: DEFAULT_STREAM_BUFFER_CAPACITY,
}
}

/// Sets the maximum number of blocks per streamed range.
///
/// This controls batching for historical scans and for catch-up in live/sync scanners.
///
/// Must be greater than 0.
#[must_use]
pub fn max_block_range(mut self, max_block_range: u64) -> Self {
self.max_block_range = max_block_range;
self
}

/// Sets how many past block hashes to keep in memory for reorg detection.
///
/// If set to `RingBufferCapacity::Limited(0)`, reorg detection is disabled.
#[must_use]
pub fn past_blocks_storage_capacity(
mut self,
past_blocks_storage_capacity: RingBufferCapacity,
) -> Self {
self.past_blocks_storage_capacity = past_blocks_storage_capacity;
self
}

/// Sets the stream buffer capacity.
///
/// Controls the maximum number of messages that can be buffered in the stream
/// before backpressure is applied.
///
/// # Arguments
///
/// * `buffer_capacity` - Maximum number of messages to buffer (must be greater than 0)
#[must_use]
pub fn buffer_capacity(mut self, buffer_capacity: usize) -> Self {
self.buffer_capacity = buffer_capacity;
self
}

/// Connects to an existing provider
///
/// # Errors
///
/// Returns an error if the provider connection fails.
pub async fn connect<N: Network>(
self,
provider: impl IntoRobustProvider<N>,
) -> Result<BlockRangeScanner<N>, ScannerError> {
if self.max_block_range == 0 {
return Err(ScannerError::InvalidMaxBlockRange);
}
if self.buffer_capacity == 0 {
return Err(ScannerError::InvalidBufferCapacity);
}
let provider = provider.into_robust_provider().await?;
Ok(BlockRangeScanner {
provider,
max_block_range: self.max_block_range,
past_blocks_storage_capacity: self.past_blocks_storage_capacity,
buffer_capacity: self.buffer_capacity,
})
}
}
41 changes: 38 additions & 3 deletions src/block_range_scanner/common.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::ops::RangeInclusive;

use tokio::sync::mpsc;
use tokio_stream::StreamExt;

use crate::{
ScannerError,
block_range_scanner::{BlockScannerResult, RangeIterator, reorg_handler::ReorgHandler},
ScannerError, ScannerMessage,
block_range_scanner::{range_iterator::RangeIterator, reorg_handler::ReorgHandler},
robust_provider::{RobustProvider, RobustSubscription, subscription},
types::{Notification, TryStream},
types::{IntoScannerResult, Notification, ScannerResult, TryStream},
};
use alloy::{
consensus::BlockHeader,
Expand All @@ -14,6 +16,39 @@ use alloy::{
primitives::BlockNumber,
};

/// Default maximum number of blocks per streamed range.
pub const DEFAULT_MAX_BLOCK_RANGE: u64 = 1000;

/// Default confirmation depth used by scanners that accept a `block_confirmations` setting.
pub const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0;

/// Default per-stream buffer size used by scanners.
pub const DEFAULT_STREAM_BUFFER_CAPACITY: usize = 50000;

/// The result type yielded by block-range streams.
pub type BlockScannerResult = ScannerResult<RangeInclusive<BlockNumber>>;

/// Convenience alias for a streamed block-range message.
pub type Message = ScannerMessage<RangeInclusive<BlockNumber>>;

impl From<RangeInclusive<BlockNumber>> for Message {
fn from(range: RangeInclusive<BlockNumber>) -> Self {
Message::Data(range)
}
}

impl PartialEq<RangeInclusive<BlockNumber>> for Message {
fn eq(&self, other: &RangeInclusive<BlockNumber>) -> bool {
if let Message::Data(range) = self { range.eq(other) } else { false }
}
}

impl IntoScannerResult<RangeInclusive<BlockNumber>> for RangeInclusive<BlockNumber> {
fn into_scanner_message_result(self) -> BlockScannerResult {
Ok(Message::Data(self))
}
}

#[allow(clippy::too_many_arguments)]
pub(crate) async fn stream_live_blocks<N: Network>(
stream_start: BlockNumber,
Expand Down
16 changes: 16 additions & 0 deletions src/block_range_scanner/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
mod builder;
mod common;
mod range_iterator;
mod reorg_handler;
mod ring_buffer;
mod scanner;
mod sync_handler;

pub use builder::BlockRangeScannerBuilder;
pub use common::BlockScannerResult;
pub use ring_buffer::RingBufferCapacity;
pub use scanner::BlockRangeScanner;

pub use common::{
DEFAULT_BLOCK_CONFIRMATIONS, DEFAULT_MAX_BLOCK_RANGE, DEFAULT_STREAM_BUFFER_CAPACITY,
};
Loading