diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index 1c0cfb9b..e54ea133 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -125,6 +125,7 @@ impl IntoScannerResult> for RangeInclusive Self { + self.max_stream_capacity = max_stream_capacity; + self + } + /// Connects to an existing provider /// /// # Errors @@ -171,6 +188,7 @@ impl BlockRangeScanner { provider, max_block_range: self.max_block_range, past_blocks_storage_capacity: self.past_blocks_storage_capacity, + max_stream_capacity: self.max_stream_capacity, }) } } @@ -179,6 +197,7 @@ pub struct ConnectedBlockRangeScanner { provider: RobustProvider, max_block_range: u64, past_blocks_storage_capacity: RingBufferCapacity, + max_stream_capacity: usize, } impl ConnectedBlockRangeScanner { @@ -188,6 +207,12 @@ impl ConnectedBlockRangeScanner { &self.provider } + /// Returns the maximum stream capacity. + #[must_use] + pub fn max_stream_capacity(&self) -> usize { + self.max_stream_capacity + } + /// Starts the subscription service and returns a client for sending commands. /// /// # Errors @@ -202,7 +227,7 @@ impl ConnectedBlockRangeScanner { tokio::spawn(async move { service.run().await; }); - Ok(BlockRangeScannerClient::new(cmd_tx)) + Ok(BlockRangeScannerClient::new(cmd_tx, self.max_stream_capacity)) } } @@ -535,6 +560,7 @@ impl Service { pub struct BlockRangeScannerClient { command_sender: mpsc::Sender, + max_stream_capacity: usize, } impl BlockRangeScannerClient { @@ -543,9 +569,10 @@ impl BlockRangeScannerClient { /// # Arguments /// /// * `command_sender` - The sender for sending commands to the subscription service. + /// * `max_stream_capacity` - Maximum capacity for message channels. #[must_use] - pub fn new(command_sender: mpsc::Sender) -> Self { - Self { command_sender } + pub fn new(command_sender: mpsc::Sender, max_stream_capacity: usize) -> Self { + Self { command_sender, max_stream_capacity } } /// Streams live blocks starting from the latest block. @@ -561,7 +588,7 @@ impl BlockRangeScannerClient { &self, block_confirmations: u64, ) -> Result, ScannerError> { - let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES); + let (blocks_sender, blocks_receiver) = mpsc::channel(self.max_stream_capacity); let (response_tx, response_rx) = oneshot::channel(); let command = Command::StreamLive { @@ -592,7 +619,7 @@ impl BlockRangeScannerClient { start_id: impl Into, end_id: impl Into, ) -> Result, ScannerError> { - let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES); + let (blocks_sender, blocks_receiver) = mpsc::channel(self.max_stream_capacity); let (response_tx, response_rx) = oneshot::channel(); let command = Command::StreamHistorical { @@ -624,7 +651,7 @@ impl BlockRangeScannerClient { start_id: impl Into, block_confirmations: u64, ) -> Result, ScannerError> { - let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES); + let (blocks_sender, blocks_receiver) = mpsc::channel(self.max_stream_capacity); let (response_tx, response_rx) = oneshot::channel(); let command = Command::StreamFrom { @@ -656,7 +683,7 @@ impl BlockRangeScannerClient { start_id: impl Into, end_id: impl Into, ) -> Result, ScannerError> { - let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES); + let (blocks_sender, blocks_receiver) = mpsc::channel(self.max_stream_capacity); let (response_tx, response_rx) = oneshot::channel(); let command = Command::Rewind { @@ -685,6 +712,7 @@ mod tests { let scanner = BlockRangeScanner::new(); assert_eq!(scanner.max_block_range, DEFAULT_MAX_BLOCK_RANGE); + assert_eq!(scanner.max_stream_capacity, MAX_BUFFERED_MESSAGES); } #[test] @@ -696,6 +724,15 @@ mod tests { assert_eq!(scanner.max_block_range, max_block_range); } + #[test] + fn max_stream_capacity_builder_updates_configuration() { + let capacity = 1000; + + let scanner = BlockRangeScanner::new().max_stream_capacity(capacity); + + assert_eq!(scanner.max_stream_capacity, capacity); + } + #[tokio::test] async fn try_send_forwards_errors_to_subscribers() { let (tx, mut rx) = mpsc::channel::(1); diff --git a/src/event_scanner/scanner/common.rs b/src/event_scanner/scanner/common.rs index 90892357..68388984 100644 --- a/src/event_scanner/scanner/common.rs +++ b/src/event_scanner/scanner/common.rs @@ -2,7 +2,7 @@ use std::ops::RangeInclusive; use crate::{ Notification, ScannerMessage, - block_range_scanner::{BlockScannerResult, MAX_BUFFERED_MESSAGES}, + block_range_scanner::BlockScannerResult, event_scanner::{filter::EventFilter, listener::EventListener}, robust_provider::{RobustProvider, provider::Error as RobustProviderError}, types::TryStream, @@ -51,8 +51,9 @@ pub async fn handle_stream + Un provider: &RobustProvider, listeners: &[EventListener], mode: ConsumerMode, + max_stream_capacity: usize, ) { - let (range_tx, _) = broadcast::channel::(MAX_BUFFERED_MESSAGES); + let (range_tx, _) = broadcast::channel::(max_stream_capacity); let consumers = match mode { ConsumerMode::Stream => spawn_log_consumers_in_stream_mode(provider, listeners, &range_tx), diff --git a/src/event_scanner/scanner/historic.rs b/src/event_scanner/scanner/historic.rs index 95db8b14..e489d493 100644 --- a/src/event_scanner/scanner/historic.rs +++ b/src/event_scanner/scanner/historic.rs @@ -85,6 +85,7 @@ impl EventScanner { /// /// [subscribe]: EventScanner::subscribe pub async fn start(self) -> Result<(), ScannerError> { + let max_stream_capacity = self.block_range_scanner.max_stream_capacity(); let client = self.block_range_scanner.run()?; let stream = client.stream_historical(self.config.from_block, self.config.to_block).await?; @@ -92,7 +93,8 @@ impl EventScanner { let listeners = self.listeners.clone(); tokio::spawn(async move { - handle_stream(stream, &provider, &listeners, ConsumerMode::Stream).await; + handle_stream(stream, &provider, &listeners, ConsumerMode::Stream, max_stream_capacity) + .await; }); Ok(()) diff --git a/src/event_scanner/scanner/latest.rs b/src/event_scanner/scanner/latest.rs index b32e7f6d..c249929c 100644 --- a/src/event_scanner/scanner/latest.rs +++ b/src/event_scanner/scanner/latest.rs @@ -92,6 +92,7 @@ impl EventScanner { /// /// [subscribe]: EventScanner::subscribe pub async fn start(self) -> Result<(), ScannerError> { + let max_stream_capacity = self.block_range_scanner.max_stream_capacity(); let client = self.block_range_scanner.run()?; let stream = client.rewind(self.config.from_block, self.config.to_block).await?; @@ -104,6 +105,7 @@ impl EventScanner { &provider, &listeners, ConsumerMode::CollectLatest { count: self.config.count }, + max_stream_capacity, ) .await; }); diff --git a/src/event_scanner/scanner/live.rs b/src/event_scanner/scanner/live.rs index 3df67aea..bb27c8ec 100644 --- a/src/event_scanner/scanner/live.rs +++ b/src/event_scanner/scanner/live.rs @@ -44,6 +44,7 @@ impl EventScanner { /// /// [subscribe]: EventScanner::subscribe pub async fn start(self) -> Result<(), ScannerError> { + let max_stream_capacity = self.block_range_scanner.max_stream_capacity(); let client = self.block_range_scanner.run()?; let stream = client.stream_live(self.config.block_confirmations).await?; @@ -51,7 +52,8 @@ impl EventScanner { let listeners = self.listeners.clone(); tokio::spawn(async move { - handle_stream(stream, &provider, &listeners, ConsumerMode::Stream).await; + handle_stream(stream, &provider, &listeners, ConsumerMode::Stream, max_stream_capacity) + .await; }); Ok(()) diff --git a/src/event_scanner/scanner/mod.rs b/src/event_scanner/scanner/mod.rs index 84213faf..d7c1dd57 100644 --- a/src/event_scanner/scanner/mod.rs +++ b/src/event_scanner/scanner/mod.rs @@ -9,7 +9,7 @@ use crate::{ EventFilter, ScannerError, block_range_scanner::{ BlockRangeScanner, ConnectedBlockRangeScanner, DEFAULT_BLOCK_CONFIRMATIONS, - MAX_BUFFERED_MESSAGES, RingBufferCapacity, + RingBufferCapacity, }, event_scanner::{EventScannerResult, listener::EventListener}, robust_provider::IntoRobustProvider, @@ -418,6 +418,21 @@ impl EventScannerBuilder { self } + /// Sets the maximum capacity for internal message channels. + /// + /// This controls the buffer size for channels used to stream block ranges and events. + /// Higher values allow more messages to be buffered, which can help with throughput + /// at the cost of memory usage. + /// + /// # Arguments + /// + /// * `max_stream_capacity` - Maximum number of messages to buffer (must be greater than 0) + #[must_use] + pub fn max_stream_capacity(mut self, max_stream_capacity: usize) -> Self { + self.block_range_scanner.max_stream_capacity = max_stream_capacity; + self + } + /// Builds the scanner by connecting to an existing provider. /// /// This is a shared method used internally by scanner-specific `connect()` methods. @@ -436,7 +451,8 @@ impl EventScannerBuilder { impl EventScanner { #[must_use] pub fn subscribe(&mut self, filter: EventFilter) -> ReceiverStream { - let (sender, receiver) = mpsc::channel::(MAX_BUFFERED_MESSAGES); + let capacity = self.block_range_scanner.max_stream_capacity(); + let (sender, receiver) = mpsc::channel::(capacity); self.listeners.push(EventListener { filter, sender }); ReceiverStream::new(receiver) } @@ -504,6 +520,8 @@ mod tests { #[tokio::test] async fn test_historic_event_stream_channel_capacity() -> anyhow::Result<()> { + use crate::block_range_scanner::MAX_BUFFERED_MESSAGES; + let provider = RootProvider::::new(RpcClient::mocked(Asserter::new())); let mut scanner = EventScannerBuilder::historic().build(provider).await?; @@ -515,6 +533,24 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_custom_max_stream_capacity() -> anyhow::Result<()> { + let custom_capacity = 1000; + + let provider = RootProvider::::new(RpcClient::mocked(Asserter::new())); + let mut scanner = EventScannerBuilder::historic() + .max_stream_capacity(custom_capacity) + .build(provider) + .await?; + + let _ = scanner.subscribe(EventFilter::new()); + + let sender = &scanner.listeners[0].sender; + assert_eq!(sender.capacity(), custom_capacity); + + Ok(()) + } + #[tokio::test] async fn test_latest_returns_error_with_zero_count() { use alloy::{ diff --git a/src/event_scanner/scanner/sync/from_block.rs b/src/event_scanner/scanner/sync/from_block.rs index 277959af..11d662b0 100644 --- a/src/event_scanner/scanner/sync/from_block.rs +++ b/src/event_scanner/scanner/sync/from_block.rs @@ -54,6 +54,7 @@ impl EventScanner { /// /// [subscribe]: EventScanner::subscribe pub async fn start(self) -> Result<(), ScannerError> { + let max_stream_capacity = self.block_range_scanner.max_stream_capacity(); let client = self.block_range_scanner.run()?; let stream = client.stream_from(self.config.from_block, self.config.block_confirmations).await?; @@ -62,7 +63,8 @@ impl EventScanner { let listeners = self.listeners.clone(); tokio::spawn(async move { - handle_stream(stream, &provider, &listeners, ConsumerMode::Stream).await; + handle_stream(stream, &provider, &listeners, ConsumerMode::Stream, max_stream_capacity) + .await; }); Ok(()) diff --git a/src/event_scanner/scanner/sync/from_latest.rs b/src/event_scanner/scanner/sync/from_latest.rs index db237684..888dca53 100644 --- a/src/event_scanner/scanner/sync/from_latest.rs +++ b/src/event_scanner/scanner/sync/from_latest.rs @@ -58,6 +58,7 @@ impl EventScanner { #[allow(clippy::missing_panics_doc)] pub async fn start(self) -> Result<(), ScannerError> { let count = self.config.count; + let max_stream_capacity = self.block_range_scanner.max_stream_capacity(); let provider = self.block_range_scanner.provider().clone(); let listeners = self.listeners.clone(); @@ -85,6 +86,7 @@ impl EventScanner { &provider, &listeners, ConsumerMode::CollectLatest { count }, + max_stream_capacity, ) .await; @@ -104,7 +106,14 @@ impl EventScanner { }; // Start the live (sync) stream. - handle_stream(sync_stream, &provider, &listeners, ConsumerMode::Stream).await; + handle_stream( + sync_stream, + &provider, + &listeners, + ConsumerMode::Stream, + max_stream_capacity, + ) + .await; }); Ok(())