Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 44 additions & 7 deletions src/block_range_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ impl IntoScannerResult<RangeInclusive<BlockNumber>> for RangeInclusive<BlockNumb
pub struct BlockRangeScanner {
pub max_block_range: u64,
pub past_blocks_storage_capacity: RingBufferCapacity,
pub max_stream_capacity: usize,
}

impl Default for BlockRangeScanner {
Expand All @@ -139,6 +140,7 @@ impl BlockRangeScanner {
Self {
max_block_range: DEFAULT_MAX_BLOCK_RANGE,
past_blocks_storage_capacity: RingBufferCapacity::Limited(10),
max_stream_capacity: MAX_BUFFERED_MESSAGES,
}
}

Expand All @@ -157,6 +159,21 @@ impl BlockRangeScanner {
self
}

/// Sets the maximum capacity for internal message channels.
///
/// This controls the buffer size for channels used to stream block ranges.
/// Higher values allow more messages to be buffered, which can help with
/// throughput at the cost of memory usage.
///
/// # Arguments
///
/// * `max_stream_capacity` - Maximum number of messages to buffer (must be greater than 0)
#[must_use]
pub fn max_stream_capacity(mut self, max_stream_capacity: usize) -> Self {
self.max_stream_capacity = max_stream_capacity;
self
}

/// Connects to an existing provider
///
/// # Errors
Expand All @@ -171,6 +188,7 @@ impl BlockRangeScanner {
provider,
max_block_range: self.max_block_range,
past_blocks_storage_capacity: self.past_blocks_storage_capacity,
max_stream_capacity: self.max_stream_capacity,
})
}
}
Expand All @@ -179,6 +197,7 @@ pub struct ConnectedBlockRangeScanner<N: Network> {
provider: RobustProvider<N>,
max_block_range: u64,
past_blocks_storage_capacity: RingBufferCapacity,
max_stream_capacity: usize,
}

impl<N: Network> ConnectedBlockRangeScanner<N> {
Expand All @@ -188,6 +207,12 @@ impl<N: Network> ConnectedBlockRangeScanner<N> {
&self.provider
}

/// Returns the maximum stream capacity.
#[must_use]
pub fn max_stream_capacity(&self) -> usize {
self.max_stream_capacity
}

/// Starts the subscription service and returns a client for sending commands.
///
/// # Errors
Expand All @@ -202,7 +227,7 @@ impl<N: Network> ConnectedBlockRangeScanner<N> {
tokio::spawn(async move {
service.run().await;
});
Ok(BlockRangeScannerClient::new(cmd_tx))
Ok(BlockRangeScannerClient::new(cmd_tx, self.max_stream_capacity))
}
}

Expand Down Expand Up @@ -535,6 +560,7 @@ impl<N: Network> Service<N> {

pub struct BlockRangeScannerClient {
command_sender: mpsc::Sender<Command>,
max_stream_capacity: usize,
}

impl BlockRangeScannerClient {
Expand All @@ -543,9 +569,10 @@ impl BlockRangeScannerClient {
/// # Arguments
///
/// * `command_sender` - The sender for sending commands to the subscription service.
/// * `max_stream_capacity` - Maximum capacity for message channels.
#[must_use]
pub fn new(command_sender: mpsc::Sender<Command>) -> Self {
Self { command_sender }
pub fn new(command_sender: mpsc::Sender<Command>, max_stream_capacity: usize) -> Self {
Self { command_sender, max_stream_capacity }
}

/// Streams live blocks starting from the latest block.
Expand All @@ -561,7 +588,7 @@ impl BlockRangeScannerClient {
&self,
block_confirmations: u64,
) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
let (blocks_sender, blocks_receiver) = mpsc::channel(self.max_stream_capacity);
let (response_tx, response_rx) = oneshot::channel();

let command = Command::StreamLive {
Expand Down Expand Up @@ -592,7 +619,7 @@ impl BlockRangeScannerClient {
start_id: impl Into<BlockId>,
end_id: impl Into<BlockId>,
) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
let (blocks_sender, blocks_receiver) = mpsc::channel(self.max_stream_capacity);
let (response_tx, response_rx) = oneshot::channel();

let command = Command::StreamHistorical {
Expand Down Expand Up @@ -624,7 +651,7 @@ impl BlockRangeScannerClient {
start_id: impl Into<BlockId>,
block_confirmations: u64,
) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
let (blocks_sender, blocks_receiver) = mpsc::channel(self.max_stream_capacity);
let (response_tx, response_rx) = oneshot::channel();

let command = Command::StreamFrom {
Expand Down Expand Up @@ -656,7 +683,7 @@ impl BlockRangeScannerClient {
start_id: impl Into<BlockId>,
end_id: impl Into<BlockId>,
) -> Result<ReceiverStream<BlockScannerResult>, ScannerError> {
let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES);
let (blocks_sender, blocks_receiver) = mpsc::channel(self.max_stream_capacity);
let (response_tx, response_rx) = oneshot::channel();

let command = Command::Rewind {
Expand Down Expand Up @@ -685,6 +712,7 @@ mod tests {
let scanner = BlockRangeScanner::new();

assert_eq!(scanner.max_block_range, DEFAULT_MAX_BLOCK_RANGE);
assert_eq!(scanner.max_stream_capacity, MAX_BUFFERED_MESSAGES);
}

#[test]
Expand All @@ -696,6 +724,15 @@ mod tests {
assert_eq!(scanner.max_block_range, max_block_range);
}

#[test]
fn max_stream_capacity_builder_updates_configuration() {
let capacity = 1000;

let scanner = BlockRangeScanner::new().max_stream_capacity(capacity);

assert_eq!(scanner.max_stream_capacity, capacity);
}

#[tokio::test]
async fn try_send_forwards_errors_to_subscribers() {
let (tx, mut rx) = mpsc::channel::<BlockScannerResult>(1);
Expand Down
5 changes: 3 additions & 2 deletions src/event_scanner/scanner/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::ops::RangeInclusive;

use crate::{
Notification, ScannerMessage,
block_range_scanner::{BlockScannerResult, MAX_BUFFERED_MESSAGES},
block_range_scanner::BlockScannerResult,
event_scanner::{filter::EventFilter, listener::EventListener},
robust_provider::{RobustProvider, provider::Error as RobustProviderError},
types::TryStream,
Expand Down Expand Up @@ -51,8 +51,9 @@ pub async fn handle_stream<N: Network, S: Stream<Item = BlockScannerResult> + Un
provider: &RobustProvider<N>,
listeners: &[EventListener],
mode: ConsumerMode,
max_stream_capacity: usize,
) {
let (range_tx, _) = broadcast::channel::<BlockScannerResult>(MAX_BUFFERED_MESSAGES);
let (range_tx, _) = broadcast::channel::<BlockScannerResult>(max_stream_capacity);

let consumers = match mode {
ConsumerMode::Stream => spawn_log_consumers_in_stream_mode(provider, listeners, &range_tx),
Expand Down
4 changes: 3 additions & 1 deletion src/event_scanner/scanner/historic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,16 @@ impl<N: Network> EventScanner<Historic, N> {
///
/// [subscribe]: EventScanner::subscribe
pub async fn start(self) -> Result<(), ScannerError> {
let max_stream_capacity = self.block_range_scanner.max_stream_capacity();
let client = self.block_range_scanner.run()?;
let stream = client.stream_historical(self.config.from_block, self.config.to_block).await?;

let provider = self.block_range_scanner.provider().clone();
let listeners = self.listeners.clone();

tokio::spawn(async move {
handle_stream(stream, &provider, &listeners, ConsumerMode::Stream).await;
handle_stream(stream, &provider, &listeners, ConsumerMode::Stream, max_stream_capacity)
.await;
});

Ok(())
Expand Down
2 changes: 2 additions & 0 deletions src/event_scanner/scanner/latest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ impl<N: Network> EventScanner<LatestEvents, N> {
///
/// [subscribe]: EventScanner::subscribe
pub async fn start(self) -> Result<(), ScannerError> {
let max_stream_capacity = self.block_range_scanner.max_stream_capacity();
let client = self.block_range_scanner.run()?;
let stream = client.rewind(self.config.from_block, self.config.to_block).await?;

Expand All @@ -104,6 +105,7 @@ impl<N: Network> EventScanner<LatestEvents, N> {
&provider,
&listeners,
ConsumerMode::CollectLatest { count: self.config.count },
max_stream_capacity,
)
.await;
});
Expand Down
4 changes: 3 additions & 1 deletion src/event_scanner/scanner/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,16 @@ impl<N: Network> EventScanner<Live, N> {
///
/// [subscribe]: EventScanner::subscribe
pub async fn start(self) -> Result<(), ScannerError> {
let max_stream_capacity = self.block_range_scanner.max_stream_capacity();
let client = self.block_range_scanner.run()?;
let stream = client.stream_live(self.config.block_confirmations).await?;

let provider = self.block_range_scanner.provider().clone();
let listeners = self.listeners.clone();

tokio::spawn(async move {
handle_stream(stream, &provider, &listeners, ConsumerMode::Stream).await;
handle_stream(stream, &provider, &listeners, ConsumerMode::Stream, max_stream_capacity)
.await;
});

Ok(())
Expand Down
40 changes: 38 additions & 2 deletions src/event_scanner/scanner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
EventFilter, ScannerError,
block_range_scanner::{
BlockRangeScanner, ConnectedBlockRangeScanner, DEFAULT_BLOCK_CONFIRMATIONS,
MAX_BUFFERED_MESSAGES, RingBufferCapacity,
RingBufferCapacity,
},
event_scanner::{EventScannerResult, listener::EventListener},
robust_provider::IntoRobustProvider,
Expand Down Expand Up @@ -418,6 +418,21 @@ impl<M> EventScannerBuilder<M> {
self
}

/// Sets the maximum capacity for internal message channels.
///
/// This controls the buffer size for channels used to stream block ranges and events.
/// Higher values allow more messages to be buffered, which can help with throughput
/// at the cost of memory usage.
///
/// # Arguments
///
/// * `max_stream_capacity` - Maximum number of messages to buffer (must be greater than 0)
#[must_use]
pub fn max_stream_capacity(mut self, max_stream_capacity: usize) -> Self {
self.block_range_scanner.max_stream_capacity = max_stream_capacity;
self
}

/// Builds the scanner by connecting to an existing provider.
///
/// This is a shared method used internally by scanner-specific `connect()` methods.
Expand All @@ -436,7 +451,8 @@ impl<M> EventScannerBuilder<M> {
impl<M, N: Network> EventScanner<M, N> {
#[must_use]
pub fn subscribe(&mut self, filter: EventFilter) -> ReceiverStream<EventScannerResult> {
let (sender, receiver) = mpsc::channel::<EventScannerResult>(MAX_BUFFERED_MESSAGES);
let capacity = self.block_range_scanner.max_stream_capacity();
let (sender, receiver) = mpsc::channel::<EventScannerResult>(capacity);
self.listeners.push(EventListener { filter, sender });
ReceiverStream::new(receiver)
}
Expand Down Expand Up @@ -504,6 +520,8 @@ mod tests {

#[tokio::test]
async fn test_historic_event_stream_channel_capacity() -> anyhow::Result<()> {
use crate::block_range_scanner::MAX_BUFFERED_MESSAGES;

let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
let mut scanner = EventScannerBuilder::historic().build(provider).await?;

Expand All @@ -515,6 +533,24 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_custom_max_stream_capacity() -> anyhow::Result<()> {
let custom_capacity = 1000;

let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
let mut scanner = EventScannerBuilder::historic()
.max_stream_capacity(custom_capacity)
.build(provider)
.await?;

let _ = scanner.subscribe(EventFilter::new());

let sender = &scanner.listeners[0].sender;
assert_eq!(sender.capacity(), custom_capacity);

Ok(())
}

#[tokio::test]
async fn test_latest_returns_error_with_zero_count() {
use alloy::{
Expand Down
4 changes: 3 additions & 1 deletion src/event_scanner/scanner/sync/from_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ impl<N: Network> EventScanner<SyncFromBlock, N> {
///
/// [subscribe]: EventScanner::subscribe
pub async fn start(self) -> Result<(), ScannerError> {
let max_stream_capacity = self.block_range_scanner.max_stream_capacity();
let client = self.block_range_scanner.run()?;
let stream =
client.stream_from(self.config.from_block, self.config.block_confirmations).await?;
Expand All @@ -62,7 +63,8 @@ impl<N: Network> EventScanner<SyncFromBlock, N> {
let listeners = self.listeners.clone();

tokio::spawn(async move {
handle_stream(stream, &provider, &listeners, ConsumerMode::Stream).await;
handle_stream(stream, &provider, &listeners, ConsumerMode::Stream, max_stream_capacity)
.await;
});

Ok(())
Expand Down
11 changes: 10 additions & 1 deletion src/event_scanner/scanner/sync/from_latest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ impl<N: Network> EventScanner<SyncFromLatestEvents, N> {
#[allow(clippy::missing_panics_doc)]
pub async fn start(self) -> Result<(), ScannerError> {
let count = self.config.count;
let max_stream_capacity = self.block_range_scanner.max_stream_capacity();
let provider = self.block_range_scanner.provider().clone();
let listeners = self.listeners.clone();

Expand Down Expand Up @@ -85,6 +86,7 @@ impl<N: Network> EventScanner<SyncFromLatestEvents, N> {
&provider,
&listeners,
ConsumerMode::CollectLatest { count },
max_stream_capacity,
)
.await;

Expand All @@ -104,7 +106,14 @@ impl<N: Network> EventScanner<SyncFromLatestEvents, N> {
};

// Start the live (sync) stream.
handle_stream(sync_stream, &provider, &listeners, ConsumerMode::Stream).await;
handle_stream(
sync_stream,
&provider,
&listeners,
ConsumerMode::Stream,
max_stream_capacity,
)
.await;
});

Ok(())
Expand Down