diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index 3c970626..75437cc9 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -2,7 +2,7 @@ //! //! ```rust,no_run //! use alloy::{eips::BlockNumberOrTag, network::Ethereum, primitives::BlockNumber}; -//! use std::ops::Range; +//! use std::ops::RangeInclusive; //! use tokio_stream::{StreamExt, wrappers::ReceiverStream}; //! //! use alloy::transports::http::reqwest::Url; @@ -28,13 +28,12 @@ //! // Create client to send subscribe command to block scanner //! let client: BlockRangeScannerClient = block_range_scanner.run()?; //! -//! let mut receiver: ReceiverStream, BlockRangeScannerError>> = -//! client -//! .subscribe( -//! BlockNumberOrTag::Latest, -//! None, // just subscribe to new blocks -//! ) -//! .await?; +//! let mut receiver = client +//! .subscribe( +//! BlockNumberOrTag::Latest, +//! None, // just subscribe to new blocks +//! ) +//! .await?; //! //! while let Some(result) = receiver.next().await { //! match result { @@ -68,7 +67,7 @@ //! } //! ``` -use std::ops::Range; +use std::ops::RangeInclusive; use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::ReceiverStream; @@ -138,7 +137,7 @@ pub enum Error { #[derive(Debug)] pub enum Command { Subscribe { - sender: mpsc::Sender, Error>>, + sender: mpsc::Sender, Error>>, start_height: BlockNumberOrTag, end_height: Option, response: oneshot::Sender>, @@ -297,7 +296,7 @@ impl ConnectedBlockRangeScanner { struct Service { config: Config, provider: RootProvider, - subscriber: Option, Error>>>, + subscriber: Option, Error>>>, current: Option, websocket_connected: bool, processed_count: u64, @@ -372,7 +371,7 @@ impl Service { async fn handle_subscribe( &mut self, - sender: mpsc::Sender, Error>>, + sender: mpsc::Sender, Error>>, start_height: BlockNumberOrTag, end_height: Option, ) -> Result<(), Error> { @@ -425,7 +424,7 @@ impl Service { let cutoff = sync_end_block.header().number(); let ws_task = tokio::spawn(async move { if end_height.is_none() { - Self::websocket_buffer_task(cutoff, provider, buffer_tx).await; + Self::websocket_buffer_task(cutoff + 1, provider, buffer_tx).await; } }); @@ -478,7 +477,7 @@ impl Service { let batch_end_block = self.provider.get_block_by_number(batch_to.into()).await?.expect("should be valid"); - self.send_to_subscriber(Ok(self.current.as_ref().unwrap().number..batch_to)).await; + self.send_to_subscriber(Ok(self.current.as_ref().unwrap().number..=batch_to)).await; self.current = Some(BlockHashAndNumber::from_header::(batch_end_block.header())); @@ -536,7 +535,7 @@ impl Service { async fn websocket_buffer_task>( mut current: BlockNumber, provider: P, - buffer_sender: mpsc::Sender>, + buffer_sender: mpsc::Sender>, ) { match Self::get_block_subscription(&provider).await { Ok(mut ws_stream) => { @@ -548,9 +547,8 @@ impl Service { continue; } - // we add 1 to include the latest block - #[allow(clippy::range_plus_one)] - if let Err(e) = buffer_sender.send(current..header_resp.number() + 1).await { + // RangeInclusive already includes the end block + if let Err(e) = buffer_sender.send(current..=header_resp.number()).await { error!("Buffer channel closed, stopping buffer task: {e}"); return; @@ -567,8 +565,8 @@ impl Service { } async fn process_buffered_messages( - mut buffer_rx: mpsc::Receiver>, - sender: mpsc::Sender, Error>>, + mut buffer_rx: mpsc::Receiver>, + sender: mpsc::Sender, Error>>, cutoff: BlockNumber, ) { let mut processed = 0; @@ -576,7 +574,7 @@ impl Service { // Process all buffered messages while let Some(range) = buffer_rx.recv().await { - let (start, end) = (range.start, range.end); + let (start, end) = (*range.start(), *range.end()); if start >= cutoff { if sender.send(Ok(range)).await.is_err() { warn!("Subscriber channel closed, cleaning up"); @@ -587,7 +585,7 @@ impl Service { discarded += cutoff - start; let start = cutoff; - if sender.send(Ok(start..end)).await.is_err() { + if sender.send(Ok(start..=end)).await.is_err() { warn!("Subscriber channel closed, cleaning up"); return; } @@ -609,7 +607,7 @@ impl Service { Ok(ws_stream) } - async fn send_to_subscriber(&mut self, result: Result, Error>) { + async fn send_to_subscriber(&mut self, result: Result, Error>) { if let Some(ref sender) = self.subscriber { if sender.send(result).await.is_err() { self.subscriber = None; @@ -658,7 +656,7 @@ impl BlockRangeScannerClient { /// # Arguments /// /// * `start_height` - The block number to start from. - /// * `end_height` - The block number to end at. + /// * `end_height` - The block number to end at (inclusive). /// /// # Errors /// @@ -667,7 +665,7 @@ impl BlockRangeScannerClient { &self, start_height: BlockNumberOrTag, end_height: Option, - ) -> Result, Error>>, Error> { + ) -> Result, Error>>, Error> { let (blocks_sender, blocks_receiver) = mpsc::channel(MAX_BUFFERED_MESSAGES); let (response_tx, response_rx) = oneshot::channel(); @@ -764,14 +762,14 @@ mod tests { while let Some(result) = receiver.next().await { match result { Ok(range) => { - println!("Received block range: {} - {}", range.start, range.end); + println!("Received block range: [{range:?}]"); if block_range_start == 0 { - block_range_start = range.start; + block_range_start = *range.start(); } - assert_eq!(block_range_start, range.start); - assert!(range.end >= range.start); - block_range_start = range.end; + assert_eq!(block_range_start, *range.start()); + assert!(*range.end() >= *range.start()); + block_range_start = *range.end() + 1; } Err(e) => { panic!("Received error from subscription: {e}"); diff --git a/src/event_scanner.rs b/src/event_scanner.rs index c38a1f26..c3dd1f6d 100644 --- a/src/event_scanner.rs +++ b/src/event_scanner.rs @@ -181,8 +181,8 @@ impl EventScanner { while let Some(range) = stream.next().await { match range { Ok(range) => { - let from_block = range.start; - let to_block = range.end; + let from_block = *range.start(); + let to_block = *range.end(); info!(from_block, to_block, "processing block range"); self.process_block_range(from_block, to_block, &event_channels).await?; } @@ -216,7 +216,8 @@ impl EventScanner { }); } - /// Fetches logs for the supplied block range and forwards them to the callback channels. + /// Fetches logs for the supplied inclusive block range [`from_block..=to_block`] and forwards + /// them to the callback channels. async fn process_block_range( &self, from_block: u64,