Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 59 additions & 15 deletions src/block_range_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ pub const DEFAULT_MAX_BLOCK_RANGE: u64 = 1000;
// copied form https://github.com/taikoxyz/taiko-mono/blob/f4b3a0e830e42e2fee54829326389709dd422098/packages/taiko-client/pkg/chain_iterator/block_batch_iterator.go#L19
pub const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0;

pub const MAX_BUFFERED_MESSAGES: usize = 50000;
pub const DEFAULT_STREAM_BUFFER_CAPACITY: usize = 50000;

// Maximum amount of reorged blocks on Ethereum (after this amount of block confirmations, a block
// is considered final)
Expand Down Expand Up @@ -121,10 +121,11 @@ impl IntoScannerResult<RangeInclusive<BlockNumber>> for RangeInclusive<BlockNumb
}
}

#[derive(Clone, Debug)]
#[derive(Clone)]
pub struct BlockRangeScanner {
pub max_block_range: u64,
pub past_blocks_storage_capacity: RingBufferCapacity,
pub buffer_capacity: usize,
}

impl Default for BlockRangeScanner {
Expand All @@ -139,6 +140,7 @@ impl BlockRangeScanner {
Self {
max_block_range: DEFAULT_MAX_BLOCK_RANGE,
past_blocks_storage_capacity: RingBufferCapacity::Limited(10),
buffer_capacity: DEFAULT_STREAM_BUFFER_CAPACITY,
}
}

Expand All @@ -157,6 +159,12 @@ impl BlockRangeScanner {
self
}

#[must_use]
pub fn buffer_capacity(mut self, buffer_capacity: usize) -> Self {
self.buffer_capacity = buffer_capacity;
self
}

/// Connects to an existing provider
///
/// # Errors
Expand All @@ -166,20 +174,27 @@ impl BlockRangeScanner {
self,
provider: impl IntoRobustProvider<N>,
) -> Result<ConnectedBlockRangeScanner<N>, ScannerError> {
if self.max_block_range == 0 {
return Err(ScannerError::InvalidMaxBlockRange);
}
if self.buffer_capacity == 0 {
return Err(ScannerError::InvalidBufferCapacity);
}
let provider = provider.into_robust_provider().await?;
Ok(ConnectedBlockRangeScanner {
provider,
max_block_range: self.max_block_range,
past_blocks_storage_capacity: self.past_blocks_storage_capacity,
buffer_capacity: self.buffer_capacity,
})
}
}

#[derive(Debug)]
pub struct ConnectedBlockRangeScanner<N: Network> {
provider: RobustProvider<N>,
max_block_range: u64,
past_blocks_storage_capacity: RingBufferCapacity,
buffer_capacity: usize,
}

impl<N: Network> ConnectedBlockRangeScanner<N> {
Expand All @@ -189,6 +204,12 @@ impl<N: Network> ConnectedBlockRangeScanner<N> {
&self.provider
}

/// Returns the stream buffer capacity.
#[must_use]
pub fn buffer_capacity(&self) -> usize {
self.buffer_capacity
}

/// Starts the subscription service and returns a client for sending commands.
///
/// # Errors
Expand All @@ -203,7 +224,7 @@ impl<N: Network> ConnectedBlockRangeScanner<N> {
tokio::spawn(async move {
service.run().await;
});
Ok(BlockRangeScannerClient::new(cmd_tx))
Ok(BlockRangeScannerClient::new(cmd_tx, self.buffer_capacity))
}
}

Expand Down Expand Up @@ -585,6 +606,7 @@ impl<N: Network> Service<N> {

pub struct BlockRangeScannerClient {
command_sender: mpsc::Sender<Command>,
buffer_capacity: usize,
}

impl BlockRangeScannerClient {
Expand All @@ -593,9 +615,10 @@ impl BlockRangeScannerClient {
/// # Arguments
///
/// * `command_sender` - The sender for sending commands to the subscription service.
/// * `buffer_capacity` - The capacity for buffering messages in the stream.
#[must_use]
pub fn new(command_sender: mpsc::Sender<Command>) -> Self {
Self { command_sender }
pub fn new(command_sender: mpsc::Sender<Command>, buffer_capacity: usize) -> Self {
Self { command_sender, buffer_capacity }
}

/// Streams live blocks starting from the latest block.
Expand All @@ -611,7 +634,7 @@ impl BlockRangeScannerClient {
&self,
block_confirmations: u64,
) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
let (response_tx, response_rx) = oneshot::channel();

let command = Command::StreamLive {
Expand Down Expand Up @@ -642,7 +665,7 @@ impl BlockRangeScannerClient {
start_id: impl Into<BlockId>,
end_id: impl Into<BlockId>,
) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
let (response_tx, response_rx) = oneshot::channel();

let command = Command::StreamHistorical {
Expand Down Expand Up @@ -674,7 +697,7 @@ impl BlockRangeScannerClient {
start_id: impl Into<BlockId>,
block_confirmations: u64,
) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
let (response_tx, response_rx) = oneshot::channel();

let command = Command::StreamFrom {
Expand Down Expand Up @@ -733,7 +756,7 @@ impl BlockRangeScannerClient {
start_id: impl Into<BlockId>,
end_id: impl Into<BlockId>,
) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity);
let (response_tx, response_rx) = oneshot::channel();

let command = Command::Rewind {
Expand All @@ -754,23 +777,28 @@ impl BlockRangeScannerClient {
#[cfg(test)]
mod tests {
use super::*;
use alloy::eips::{BlockId, BlockNumberOrTag};
use alloy::{
eips::{BlockId, BlockNumberOrTag},
network::Ethereum,
providers::{RootProvider, mock::Asserter},
rpc::client::RpcClient,
};
use tokio::sync::mpsc;

#[test]
fn block_range_scanner_defaults_match_constants() {
let scanner = BlockRangeScanner::new();

assert_eq!(scanner.max_block_range, DEFAULT_MAX_BLOCK_RANGE);
assert_eq!(scanner.buffer_capacity, DEFAULT_STREAM_BUFFER_CAPACITY);
}

#[test]
fn builder_methods_update_configuration() {
let max_block_range = 42;
let scanner = BlockRangeScanner::new().max_block_range(42).buffer_capacity(33);

let scanner = BlockRangeScanner::new().max_block_range(max_block_range);

assert_eq!(scanner.max_block_range, max_block_range);
assert_eq!(scanner.max_block_range, 42);
assert_eq!(scanner.buffer_capacity, 33);
}

#[tokio::test]
Expand All @@ -784,4 +812,20 @@ mod tests {
Some(Err(ScannerError::BlockNotFound(BlockId::Number(BlockNumberOrTag::Number(4)))))
));
}

#[tokio::test]
async fn returns_error_with_zero_buffer_capacity() {
let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
let result = BlockRangeScanner::new().buffer_capacity(0).connect(provider).await;

assert!(matches!(result, Err(ScannerError::InvalidBufferCapacity)));
}

#[tokio::test]
async fn returns_error_with_zero_max_block_range() {
let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
let result = BlockRangeScanner::new().max_block_range(0).connect(provider).await;

assert!(matches!(result, Err(ScannerError::InvalidMaxBlockRange)));
}
}
2 changes: 1 addition & 1 deletion src/block_range_scanner/reorg_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{

use super::ring_buffer::RingBuffer;

#[derive(Clone, Debug)]
#[derive(Clone)]
pub(crate) struct ReorgHandler<N: Network = Ethereum> {
provider: RobustProvider<N>,
buffer: RingBuffer<BlockHash>,
Expand Down
2 changes: 1 addition & 1 deletion src/block_range_scanner/ring_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ macro_rules! impl_from_unsigned {

impl_from_unsigned!(RingBufferCapacity; u8, u16, u32, usize);

#[derive(Clone, Debug)]
#[derive(Clone)]
pub(crate) struct RingBuffer<T> {
inner: VecDeque<T>,
capacity: RingBufferCapacity,
Expand Down
2 changes: 0 additions & 2 deletions src/block_range_scanner/sync_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,13 @@ use crate::{
};

/// Represents the initial state when starting a sync operation
#[derive(Debug)]
enum SyncState {
/// Start block is already at or beyond the confirmed tip - go straight to live
AlreadyLive { start_block: BlockNumber },
/// Start block is behind - need to catch up first, then go live
NeedsCatchup { start_block: BlockNumber, confirmed_tip: BlockNumber },
}

#[derive(Debug)]
pub(crate) struct SyncHandler<N: Network> {
provider: RobustProvider<N>,
max_block_range: u64,
Expand Down
3 changes: 3 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ pub enum ScannerError {
#[error("Max block range must be greater than 0")]
InvalidMaxBlockRange,

#[error("Stream buffer capacity must be greater than 0")]
InvalidBufferCapacity,

#[error("Max concurrent fetches must be greater than 0")]
InvalidMaxConcurrentFetches,

Expand Down
2 changes: 1 addition & 1 deletion src/event_scanner/listener.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::event_scanner::{EventScannerResult, filter::EventFilter};
use tokio::sync::mpsc::Sender;

#[derive(Clone, Debug)]
#[derive(Clone)]
pub(crate) struct EventListener {
pub filter: EventFilter,
pub sender: Sender<EventScannerResult>,
Expand Down
5 changes: 3 additions & 2 deletions src/event_scanner/scanner/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::ops::RangeInclusive;

use crate::{
Message, Notification, ScannerError, ScannerMessage,
block_range_scanner::{BlockScannerResult, MAX_BUFFERED_MESSAGES},
block_range_scanner::BlockScannerResult,
event_scanner::{filter::EventFilter, listener::EventListener},
robust_provider::{RobustProvider, provider::Error as RobustProviderError},
types::TryStream,
Expand Down Expand Up @@ -56,8 +56,9 @@ pub(crate) async fn handle_stream<N: Network, S: Stream<Item = BlockScannerResul
listeners: &[EventListener],
mode: ConsumerMode,
max_concurrent_fetches: usize,
buffer_capacity: usize,
) {
let (range_tx, _) = broadcast::channel::<BlockScannerResult>(MAX_BUFFERED_MESSAGES);
let (range_tx, _) = broadcast::channel::<BlockScannerResult>(buffer_capacity);

let consumers = match mode {
ConsumerMode::Stream => spawn_log_consumers_in_stream_mode(
Expand Down
24 changes: 22 additions & 2 deletions src/event_scanner/scanner/historic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ impl<N: Network> EventScanner<Historic, N> {
let max_concurrent_fetches = self.config.max_concurrent_fetches;
let provider = self.block_range_scanner.provider().clone();
let listeners = self.listeners.clone();
let buffer_capacity = self.buffer_capacity();

tokio::spawn(async move {
handle_stream(
Expand All @@ -152,6 +153,7 @@ impl<N: Network> EventScanner<Historic, N> {
&listeners,
ConsumerMode::Stream,
max_concurrent_fetches,
buffer_capacity,
)
.await;
});
Expand All @@ -162,7 +164,10 @@ impl<N: Network> EventScanner<Historic, N> {

#[cfg(test)]
mod tests {
use crate::event_scanner::scanner::DEFAULT_MAX_CONCURRENT_FETCHES;
use crate::{
DEFAULT_STREAM_BUFFER_CAPACITY, block_range_scanner::DEFAULT_MAX_BLOCK_RANGE,
event_scanner::scanner::DEFAULT_MAX_CONCURRENT_FETCHES,
};

use super::*;
use alloy::{
Expand All @@ -180,12 +185,14 @@ mod tests {
.to_block(200)
.max_block_range(50)
.max_concurrent_fetches(10)
.buffer_capacity(33)
.from_block(100);

assert_eq!(builder.config.from_block, BlockNumberOrTag::Number(100).into());
assert_eq!(builder.config.to_block, BlockNumberOrTag::Number(200).into());
assert_eq!(builder.config.max_concurrent_fetches, 10);
assert_eq!(builder.block_range_scanner.max_block_range, 50);
assert_eq!(builder.block_range_scanner.buffer_capacity, 33);
}

#[test]
Expand All @@ -195,6 +202,8 @@ mod tests {
assert_eq!(builder.config.from_block, BlockNumberOrTag::Earliest.into());
assert_eq!(builder.config.to_block, BlockNumberOrTag::Latest.into());
assert_eq!(builder.config.max_concurrent_fetches, DEFAULT_MAX_CONCURRENT_FETCHES);
assert_eq!(builder.block_range_scanner.max_block_range, DEFAULT_MAX_BLOCK_RANGE);
assert_eq!(builder.block_range_scanner.buffer_capacity, DEFAULT_STREAM_BUFFER_CAPACITY);
}

#[test]
Expand All @@ -208,12 +217,15 @@ mod tests {
.to_block(100)
.to_block(200)
.max_concurrent_fetches(10)
.max_concurrent_fetches(20);
.max_concurrent_fetches(20)
.buffer_capacity(20)
.buffer_capacity(40);

assert_eq!(builder.block_range_scanner.max_block_range, 105);
assert_eq!(builder.config.from_block, BlockNumberOrTag::Number(2).into());
assert_eq!(builder.config.to_block, BlockNumberOrTag::Number(200).into());
assert_eq!(builder.config.max_concurrent_fetches, 20);
assert_eq!(builder.block_range_scanner.buffer_capacity, 40);
}

#[tokio::test]
Expand Down Expand Up @@ -293,6 +305,14 @@ mod tests {
}
}

#[tokio::test]
async fn returns_error_with_zero_buffer_capacity() {
let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
let result = EventScannerBuilder::historic().buffer_capacity(0).connect(provider).await;

assert!(matches!(result, Err(ScannerError::InvalidBufferCapacity)));
}

#[tokio::test]
async fn returns_error_with_zero_max_concurrent_fetches() {
let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
Expand Down
Loading
Loading