diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index 821d747f..5920be28 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -10,7 +10,7 @@ //! //! # Output stream //! -//! Streams returned by [`BlockRangeScannerClient`] yield [`BlockScannerResult`] items: +//! Streams returned by [`ConnectedBlockRangeScanner`] yield [`BlockScannerResult`] items: //! //! - `Ok(ScannerMessage::Data(range))` for a block range to process. //! - `Ok(ScannerMessage::Notification(_))` for scanner notifications. @@ -33,8 +33,7 @@ //! use event_scanner::{ //! ScannerError, ScannerMessage, //! block_range_scanner::{ -//! BlockRangeScanner, BlockRangeScannerClient, DEFAULT_BLOCK_CONFIRMATIONS, -//! DEFAULT_MAX_BLOCK_RANGE, +//! BlockRangeScanner, DEFAULT_BLOCK_CONFIRMATIONS, DEFAULT_MAX_BLOCK_RANGE, //! }, //! robust_provider::RobustProviderBuilder, //! }; @@ -51,11 +50,9 @@ //! let robust_provider = RobustProviderBuilder::new(provider).build().await?; //! let block_range_scanner = BlockRangeScanner::new().connect(robust_provider).await?; //! -//! // Create client to send subscribe command to block scanner -//! let client: BlockRangeScannerClient = block_range_scanner.run()?; -//! -//! let mut stream = -//! client.stream_from(BlockNumberOrTag::Number(5), DEFAULT_BLOCK_CONFIRMATIONS).await?; +//! let mut stream = block_range_scanner +//! .stream_from(BlockNumberOrTag::Number(5), DEFAULT_BLOCK_CONFIRMATIONS) +//! .await?; //! //! while let Some(message) = stream.next().await { //! match message { @@ -68,7 +65,7 @@ //! Err(e) => { //! error!("Received error from subscription: {e}"); //! match e { -//! ScannerError::ServiceShutdown => break, +//! ScannerError::Lagged(_) => break, //! _ => { //! error!("Non-fatal error, continuing: {e}"); //! } @@ -84,10 +81,7 @@ //! ``` use std::{cmp::Ordering, ops::RangeInclusive}; -use tokio::{ - sync::{mpsc, oneshot}, - try_join, -}; +use tokio::{sync::mpsc, try_join}; use tokio_stream::wrappers::ReceiverStream; use crate::{ @@ -240,9 +234,6 @@ impl BlockRangeScanner { } /// A [`BlockRangeScanner`] connected to a provider. -/// -/// Use [`ConnectedBlockRangeScanner::run`] to start the background service and obtain a -/// [`BlockRangeScannerClient`]. #[derive(Debug)] pub struct ConnectedBlockRangeScanner { provider: RobustProvider, @@ -264,139 +255,22 @@ impl ConnectedBlockRangeScanner { self.buffer_capacity } - /// Starts the subscription service and returns a client for sending commands. + /// Streams live blocks starting from the latest block. + /// + /// # Arguments + /// + /// * `block_confirmations` - Number of confirmations to apply once in live mode. /// /// # Errors /// - /// Returns an error if the subscription service fails to start. - pub fn run(&self) -> Result { - let (service, cmd_tx) = Service::new( - self.provider.clone(), - self.max_block_range, - self.past_blocks_storage_capacity, - ); - tokio::spawn(async move { - service.run().await; - }); - Ok(BlockRangeScannerClient::new(cmd_tx, self.buffer_capacity)) - } -} - -/// Commands accepted by the internal block-range service. -#[derive(Debug)] -enum Command { - /// Start a live stream. - StreamLive { - sender: mpsc::Sender, - block_confirmations: u64, - response: oneshot::Sender>, - }, - /// Start a historical range stream. - StreamHistorical { - sender: mpsc::Sender, - start_id: BlockId, - end_id: BlockId, - response: oneshot::Sender>, - }, - /// Start a stream that catches up from `start_id` and then transitions to live streaming. - StreamFrom { - sender: mpsc::Sender, - start_id: BlockId, - block_confirmations: u64, - response: oneshot::Sender>, - }, - /// Start a reverse stream (newer to older), in batches. - Rewind { - sender: mpsc::Sender, - start_id: BlockId, - end_id: BlockId, - response: oneshot::Sender>, - }, -} - -struct Service { - provider: RobustProvider, - max_block_range: u64, - past_blocks_storage_capacity: RingBufferCapacity, - error_count: u64, - command_receiver: mpsc::Receiver, - shutdown: bool, -} - -impl Service { - /// Creates a new background service instance and its command channel. - pub fn new( - provider: RobustProvider, - max_block_range: u64, - past_blocks_storage_capacity: RingBufferCapacity, - ) -> (Self, mpsc::Sender) { - let (cmd_tx, cmd_rx) = mpsc::channel(100); - - let service = Self { - provider, - max_block_range, - past_blocks_storage_capacity, - error_count: 0, - command_receiver: cmd_rx, - shutdown: false, - }; - - (service, cmd_tx) - } - - pub async fn run(mut self) { - info!("Starting subscription service"); - - while !self.shutdown { - tokio::select! { - cmd = self.command_receiver.recv() => { - if let Some(command) = cmd { - if let Err(e) = self.handle_command(command).await { - error!(error = %e, "Command handling error"); - self.error_count += 1; - } - } else { - warn!("Command channel closed, shutting down"); - break; - } - } - } - } - - info!("Subscription service stopped"); - } - - async fn handle_command(&mut self, command: Command) -> Result<(), ScannerError> { - match command { - Command::StreamLive { sender, block_confirmations, response } => { - info!("Starting live stream"); - let result = self.handle_live(block_confirmations, sender).await; - let _ = response.send(result); - } - Command::StreamHistorical { sender, start_id, end_id, response } => { - info!(start_id = ?start_id, end_id = ?end_id, "Starting historical stream"); - let result = self.handle_historical(start_id, end_id, sender).await; - let _ = response.send(result); - } - Command::StreamFrom { sender, start_id, block_confirmations, response } => { - info!(start_id = ?start_id, "Starting streaming from"); - let result = self.handle_sync(start_id, block_confirmations, sender).await; - let _ = response.send(result); - } - Command::Rewind { sender, start_id, end_id, response } => { - info!(start_id = ?start_id, end_id = ?end_id, "Starting rewind"); - let result = self.handle_rewind(start_id, end_id, sender).await; - let _ = response.send(result); - } - } - Ok(()) - } - - async fn handle_live( + /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out. + /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails. + pub async fn stream_live( &mut self, block_confirmations: u64, - sender: mpsc::Sender, - ) -> Result<(), ScannerError> { + ) -> Result, ScannerError> { + info!("Starting live stream"); + let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity); let max_block_range = self.max_block_range; let past_blocks_storage_capacity = self.past_blocks_storage_capacity; let latest = self.provider.get_block_number().await?; @@ -418,7 +292,7 @@ impl Service { common::stream_live_blocks( range_start, subscription, - &sender, + &blocks_sender, &provider, block_confirmations, max_block_range, @@ -428,15 +302,31 @@ impl Service { .await; }); - Ok(()) + Ok(ReceiverStream::new(blocks_receiver)) } - async fn handle_historical( + /// Streams a batch of historical blocks from `start_id` to `end_id`. + /// + /// # Arguments + /// + /// * `start_id` - The starting block id + /// * `end_id` - The ending block id + /// + /// # Errors + /// + /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out. + /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails. + /// * [`ScannerError::BlockNotFound`] - if `start_id` or `end_id` cannot be resolved. + pub async fn stream_historical( &mut self, - start_id: BlockId, - end_id: BlockId, - sender: mpsc::Sender, - ) -> Result<(), ScannerError> { + start_id: impl Into, + end_id: impl Into, + ) -> Result, ScannerError> { + let start_id = start_id.into(); + let end_id = end_id.into(); + info!(start_id = ?start_id, end_id = ?end_id, "Starting historical stream"); + let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity); + let max_block_range = self.max_block_range; let past_blocks_storage_capacity = self.past_blocks_storage_capacity; let provider = self.provider.clone(); @@ -466,39 +356,99 @@ impl Service { start_block_num, end_block_num, max_block_range, - &sender, + &blocks_sender, &provider, &mut reorg_handler, ) .await; }); - Ok(()) + Ok(ReceiverStream::new(blocks_receiver)) } - async fn handle_sync( + /// Streams blocks starting from `start_id` and transitions to live mode. + /// + /// # Arguments + /// + /// * `start_id` - The starting block id. + /// * `block_confirmations` - Number of confirmations to apply once in live mode. + /// + /// # Errors + /// + /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out. + /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails. + /// * [`ScannerError::BlockNotFound`] - if `start_id` cannot be resolved. + pub async fn stream_from( &self, - start_id: BlockId, + start_id: impl Into, block_confirmations: u64, - sender: mpsc::Sender, - ) -> Result<(), ScannerError> { + ) -> Result, ScannerError> { + let start_id = start_id.into(); + info!(start_id = ?start_id, "Starting streaming from"); + + let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity); let sync_handler = SyncHandler::new( self.provider.clone(), self.max_block_range, start_id, block_confirmations, self.past_blocks_storage_capacity, - sender, + blocks_sender, ); - sync_handler.run().await + + sync_handler.run().await?; + + Ok(ReceiverStream::new(blocks_receiver)) } - async fn handle_rewind( + /// Streams blocks in reverse order from `start_id` to `end_id`. + /// + /// The `start_id` block is assumed to be greater than or equal to the `end_id` block. + /// Blocks are streamed in batches, where each batch is ordered from lower to higher + /// block numbers (chronological order within each batch), but batches themselves + /// progress from newer to older blocks. + /// + /// # Arguments + /// + /// * `start_id` - The starting block id (higher block number). + /// * `end_id` - The ending block id (lower block number). + /// + /// # Reorg Handling + /// + /// Reorg checks are only performed when the specified block range tip is above the + /// current finalized block height. When a reorg is detected: + /// + /// 1. A [`Notification::ReorgDetected`] is emitted with the common ancestor block + /// 2. The scanner fetches the new tip block at the same height + /// 3. Reorged blocks are re-streamed in chronological order (from `common_ancestor + 1` up to + /// the new tip) + /// 4. The reverse scan continues from where it left off + /// + /// If the range tip is at or below the finalized block, no reorg checks are + /// performed since finalized blocks cannot be reorganized. + /// + /// # Note + /// + /// The reason reorged blocks are streamed in chronological order is to make it easier to handle + /// reorgs in [`EventScannerBuilder::latest`][latest mode] mode, i.e. to prepend reorged blocks + /// to the result collection, which must maintain chronological order. + /// + /// [latest mode]: crate::EventScannerBuilder::latest + /// + /// # Errors + /// + /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out. + /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails. + /// * [`ScannerError::BlockNotFound`] - if `start_id` or `end_id` cannot be resolved. + pub async fn stream_rewind( &mut self, - start_id: BlockId, - end_id: BlockId, - sender: mpsc::Sender, - ) -> Result<(), ScannerError> { + start_id: impl Into, + end_id: impl Into, + ) -> Result, ScannerError> { + let start_id = start_id.into(); + let end_id = end_id.into(); + info!(start_id = ?start_id, end_id = ?end_id, "Starting rewind"); + let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity); let max_block_range = self.max_block_range; let past_blocks_storage_capacity = self.past_blocks_storage_capacity; let provider = self.provider.clone(); @@ -516,15 +466,22 @@ impl Service { let mut reorg_handler = ReorgHandler::new(provider.clone(), past_blocks_storage_capacity); - Self::stream_rewind(from, to, max_block_range, &sender, &provider, &mut reorg_handler) - .await; + Self::handle_stream_rewind( + from, + to, + max_block_range, + &blocks_sender, + &provider, + &mut reorg_handler, + ) + .await; }); - Ok(()) + Ok(ReceiverStream::new(blocks_receiver)) } /// Streams blocks in reverse order from `from` to `to`. - async fn stream_rewind( + async fn handle_stream_rewind( from: N::BlockResponse, to: N::BlockResponse, max_block_range: u64, @@ -638,179 +595,6 @@ impl Service { } } -/// Client for requesting block-range streams from the background service. -/// -/// Each method returns a new stream whose items are [`BlockScannerResult`] values. -pub struct BlockRangeScannerClient { - command_sender: mpsc::Sender, - buffer_capacity: usize, -} - -impl BlockRangeScannerClient { - /// Creates a new subscription client. - /// - /// # Arguments - /// - /// * `command_sender` - The sender for sending commands to the subscription service. - /// * `buffer_capacity` - The capacity for buffering messages in the stream. - #[must_use] - fn new(command_sender: mpsc::Sender, buffer_capacity: usize) -> Self { - Self { command_sender, buffer_capacity } - } - - /// Streams live blocks starting from the latest block. - /// - /// # Arguments - /// - /// * `block_confirmations` - Number of confirmations to apply once in live mode. - /// - /// # Errors - /// - /// * `ScannerError::ServiceShutdown` - if the service is already shutting down. - pub async fn stream_live( - &self, - block_confirmations: u64, - ) -> Result, ScannerError> { - let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity); - let (response_tx, response_rx) = oneshot::channel(); - - let command = Command::StreamLive { - sender: blocks_sender, - block_confirmations, - response: response_tx, - }; - - self.command_sender.send(command).await.map_err(|_| ScannerError::ServiceShutdown)?; - - response_rx.await.map_err(|_| ScannerError::ServiceShutdown)??; - - Ok(ReceiverStream::new(blocks_receiver)) - } - - /// Streams a batch of historical blocks from `start_id` to `end_id`. - /// - /// # Arguments - /// - /// * `start_id` - The starting block id - /// * `end_id` - The ending block id - /// - /// # Errors - /// - /// * `ScannerError::ServiceShutdown` - if the service is already shutting down. - pub async fn stream_historical( - &self, - start_id: impl Into, - end_id: impl Into, - ) -> Result, ScannerError> { - let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity); - let (response_tx, response_rx) = oneshot::channel(); - - let command = Command::StreamHistorical { - sender: blocks_sender, - start_id: start_id.into(), - end_id: end_id.into(), - response: response_tx, - }; - - self.command_sender.send(command).await.map_err(|_| ScannerError::ServiceShutdown)?; - - response_rx.await.map_err(|_| ScannerError::ServiceShutdown)??; - - Ok(ReceiverStream::new(blocks_receiver)) - } - - /// Streams blocks starting from `start_id` and transitions to live mode. - /// - /// # Arguments - /// - /// * `start_id` - The starting block id. - /// * `block_confirmations` - Number of confirmations to apply once in live mode. - /// - /// # Errors - /// - /// * `ScannerError::ServiceShutdown` - if the service is already shutting down. - pub async fn stream_from( - &self, - start_id: impl Into, - block_confirmations: u64, - ) -> Result, ScannerError> { - let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity); - let (response_tx, response_rx) = oneshot::channel(); - - let command = Command::StreamFrom { - sender: blocks_sender, - start_id: start_id.into(), - block_confirmations, - response: response_tx, - }; - - self.command_sender.send(command).await.map_err(|_| ScannerError::ServiceShutdown)?; - - response_rx.await.map_err(|_| ScannerError::ServiceShutdown)??; - - Ok(ReceiverStream::new(blocks_receiver)) - } - - /// Streams blocks in reverse order from `start_id` to `end_id`. - /// - /// The `start_id` block is assumed to be greater than or equal to the `end_id` block. - /// Blocks are streamed in batches, where each batch is ordered from lower to higher - /// block numbers (chronological order within each batch), but batches themselves - /// progress from newer to older blocks. - /// - /// # Arguments - /// - /// * `start_id` - The starting block id (higher block number). - /// * `end_id` - The ending block id (lower block number). - /// - /// # Reorg Handling - /// - /// Reorg checks are only performed when the specified block range tip is above the - /// current finalized block height. When a reorg is detected: - /// - /// 1. A [`Notification::ReorgDetected`] is emitted with the common ancestor block - /// 2. The scanner fetches the new tip block at the same height - /// 3. Reorged blocks are re-streamed in chronological order (from `common_ancestor + 1` up to - /// the new tip) - /// 4. The reverse scan continues from where it left off - /// - /// If the range tip is at or below the finalized block, no reorg checks are - /// performed since finalized blocks cannot be reorganized. - /// - /// # Note - /// - /// The reason reorged blocks are streamed in chronological order is to make it easier to handle - /// reorgs in [`EventScannerBuilder::latest`][latest mode] mode, i.e. to prepend reorged blocks - /// to the result collection, which must maintain chronological order. - /// - /// # Errors - /// - /// * `ScannerError::ServiceShutdown` - if the service is already shutting down. - /// - /// [latest mode]: crate::EventScannerBuilder::latest - pub async fn rewind( - &self, - start_id: impl Into, - end_id: impl Into, - ) -> Result, ScannerError> { - let (blocks_sender, blocks_receiver) = mpsc::channel(self.buffer_capacity); - let (response_tx, response_rx) = oneshot::channel(); - - let command = Command::Rewind { - sender: blocks_sender, - start_id: start_id.into(), - end_id: end_id.into(), - response: response_tx, - }; - - self.command_sender.send(command).await.map_err(|_| ScannerError::ServiceShutdown)?; - - response_rx.await.map_err(|_| ScannerError::ServiceShutdown)??; - - Ok(ReceiverStream::new(blocks_receiver)) - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/src/error.rs b/src/error.rs index 26d524c0..4fcf4688 100644 --- a/src/error.rs +++ b/src/error.rs @@ -20,10 +20,6 @@ pub enum ScannerError { #[error("RPC error: {0}")] RpcError(Arc>), - /// The internal service has shut down and can no longer process commands. - #[error("Service is shutting down")] - ServiceShutdown, - /// A requested block (by number, hash or tag) could not be retrieved. #[error("Block not found, Block Id: {0}")] BlockNotFound(BlockId), diff --git a/src/event_scanner/scanner/historic.rs b/src/event_scanner/scanner/historic.rs index 8430f8c9..621da264 100644 --- a/src/event_scanner/scanner/historic.rs +++ b/src/event_scanner/scanner/historic.rs @@ -130,13 +130,14 @@ impl EventScanner { /// /// # Errors /// - /// * [`ScannerError::ServiceShutdown`] - if the internal block-range service cannot be started. /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out. /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails. /// * [`ScannerError::BlockNotFound`] - if `from_block` or `to_block` cannot be resolved. - pub async fn start(self) -> Result<(), ScannerError> { - let client = self.block_range_scanner.run()?; - let stream = client.stream_historical(self.config.from_block, self.config.to_block).await?; + pub async fn start(mut self) -> Result<(), ScannerError> { + let stream = self + .block_range_scanner + .stream_historical(self.config.from_block, self.config.to_block) + .await?; let max_concurrent_fetches = self.config.max_concurrent_fetches; let provider = self.block_range_scanner.provider().clone(); diff --git a/src/event_scanner/scanner/latest.rs b/src/event_scanner/scanner/latest.rs index 304a1b1b..c946f515 100644 --- a/src/event_scanner/scanner/latest.rs +++ b/src/event_scanner/scanner/latest.rs @@ -141,13 +141,14 @@ impl EventScanner { /// /// # Errors /// - /// * [`ScannerError::ServiceShutdown`] - if the internal block-range service cannot be started. /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out. /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails. /// * [`ScannerError::BlockNotFound`] - if `from_block` or `to_block` cannot be resolved. - pub async fn start(self) -> Result<(), ScannerError> { - let client = self.block_range_scanner.run()?; - let stream = client.rewind(self.config.from_block, self.config.to_block).await?; + pub async fn start(mut self) -> Result<(), ScannerError> { + let stream = self + .block_range_scanner + .stream_rewind(self.config.from_block, self.config.to_block) + .await?; let max_concurrent_fetches = self.config.max_concurrent_fetches; let provider = self.block_range_scanner.provider().clone(); diff --git a/src/event_scanner/scanner/live.rs b/src/event_scanner/scanner/live.rs index ac3a274e..0a6e5bfb 100644 --- a/src/event_scanner/scanner/live.rs +++ b/src/event_scanner/scanner/live.rs @@ -65,13 +65,10 @@ impl EventScanner { /// /// # Errors /// - /// * [`ScannerError::ServiceShutdown`] - if the internal block-range service cannot be started. /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out. /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails. - pub async fn start(self) -> Result<(), ScannerError> { - let client = self.block_range_scanner.run()?; - let stream = client.stream_live(self.config.block_confirmations).await?; - + pub async fn start(mut self) -> Result<(), ScannerError> { + let stream = self.block_range_scanner.stream_live(self.config.block_confirmations).await?; let max_concurrent_fetches = self.config.max_concurrent_fetches; let provider = self.block_range_scanner.provider().clone(); let listeners = self.listeners.clone(); diff --git a/src/event_scanner/scanner/sync/from_block.rs b/src/event_scanner/scanner/sync/from_block.rs index a91f2bc0..e5560246 100644 --- a/src/event_scanner/scanner/sync/from_block.rs +++ b/src/event_scanner/scanner/sync/from_block.rs @@ -72,14 +72,14 @@ impl EventScanner { /// /// # Errors /// - /// * [`ScannerError::ServiceShutdown`] - if the internal block-range service cannot be started. /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out. /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails. /// * [`ScannerError::BlockNotFound`] - if `from_block` cannot be resolved. pub async fn start(self) -> Result<(), ScannerError> { - let client = self.block_range_scanner.run()?; - let stream = - client.stream_from(self.config.from_block, self.config.block_confirmations).await?; + let stream = self + .block_range_scanner + .stream_from(self.config.from_block, self.config.block_confirmations) + .await?; let max_concurrent_fetches = self.config.max_concurrent_fetches; let provider = self.block_range_scanner.provider().clone(); diff --git a/src/event_scanner/scanner/sync/from_latest.rs b/src/event_scanner/scanner/sync/from_latest.rs index 4a4141b0..f883677f 100644 --- a/src/event_scanner/scanner/sync/from_latest.rs +++ b/src/event_scanner/scanner/sync/from_latest.rs @@ -71,11 +71,10 @@ impl EventScanner { /// /// # Errors /// - /// * [`ScannerError::ServiceShutdown`] - if the internal block-range service cannot be started. /// * [`ScannerError::Timeout`] - if an RPC call required for startup times out. /// * [`ScannerError::RpcError`] - if an RPC call required for startup fails. #[allow(clippy::missing_panics_doc)] - pub async fn start(self) -> Result<(), ScannerError> { + pub async fn start(mut self) -> Result<(), ScannerError> { let count = self.config.count; let provider = self.block_range_scanner.provider().clone(); let listeners = self.listeners.clone(); @@ -84,8 +83,6 @@ impl EventScanner { info!(count = count, "Starting scanner, mode: fetch latest events and switch to live"); - let client = self.block_range_scanner.run()?; - // Fetch the latest block number. // This is used to determine the starting point for the rewind stream and the live // stream. We do this before starting the streams to avoid a race condition @@ -93,7 +90,10 @@ impl EventScanner { let latest_block = provider.get_block_number().await?; // Setup rewind and live streams to run in parallel. - let rewind_stream = client.rewind(latest_block, BlockNumberOrTag::Earliest).await?; + let rewind_stream = self + .block_range_scanner + .stream_rewind(latest_block, BlockNumberOrTag::Earliest) + .await?; // Start streaming... tokio::spawn(async move { @@ -114,17 +114,20 @@ impl EventScanner { // We actually rely on the sync mode for the live stream, as more blocks could have been // minted while the scanner was collecting the latest `count` events. // Note: Sync mode will notify the client when it switches to live streaming. - let sync_stream = - match client.stream_from(latest_block + 1, self.config.block_confirmations).await { - Ok(stream) => stream, - Err(e) => { - error!(error = %e, "Error during sync mode setup"); - for listener in listeners { - _ = listener.sender.try_stream(e.clone()).await; - } - return; + let sync_stream = match self + .block_range_scanner + .stream_from(latest_block + 1, self.config.block_confirmations) + .await + { + Ok(stream) => stream, + Err(e) => { + error!(error = %e, "Error during sync mode setup"); + for listener in listeners { + _ = listener.sender.try_stream(e.clone()).await; } - }; + return; + } + }; // Start the live (sync) stream. handle_stream( diff --git a/tests/block_range_scanner.rs b/tests/block_range_scanner.rs index 8075f643..2952f6a6 100644 --- a/tests/block_range_scanner.rs +++ b/tests/block_range_scanner.rs @@ -16,7 +16,7 @@ async fn live_mode_processes_all_blocks_respecting_block_confirmations() -> anyh // --- Zero block confirmations -> stream immediately --- - let client = BlockRangeScanner::new().connect(provider.clone()).await?.run()?; + let mut client = BlockRangeScanner::new().connect(provider.clone()).await?; let mut stream = client.stream_live(0).await?; @@ -51,7 +51,7 @@ async fn live_mode_processes_all_blocks_respecting_block_confirmations() -> anyh async fn live_with_block_confirmations_always_emits_genesis_block() -> anyhow::Result<()> { let anvil = Anvil::new().try_spawn()?; let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?; - let client = BlockRangeScanner::new().connect(provider.clone()).await?.run()?; + let mut client = BlockRangeScanner::new().connect(provider.clone()).await?; let mut stream = client.stream_live(3).await?; @@ -78,7 +78,7 @@ async fn stream_from_starts_at_latest_once_it_has_enough_confirmations() -> anyh let anvil = Anvil::new().try_spawn()?; let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?; - let client = BlockRangeScanner::new().connect(provider.clone()).await?.run()?; + let client = BlockRangeScanner::new().connect(provider.clone()).await?; provider.anvil_mine(Some(20), None).await?; @@ -106,7 +106,7 @@ async fn continuous_blocks_if_reorg_less_than_block_confirmation() -> anyhow::Re let anvil = Anvil::new().try_spawn()?; let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?; - let client = BlockRangeScanner::new().connect(provider.clone()).await?.run()?; + let mut client = BlockRangeScanner::new().connect(provider.clone()).await?; let mut stream = client.stream_live(5).await?; @@ -138,7 +138,7 @@ async fn shallow_block_confirmation_does_not_mitigate_reorg() -> anyhow::Result< let anvil = Anvil::new().try_spawn()?; let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?; - let client = BlockRangeScanner::new().connect(provider.clone()).await?.run()?; + let mut client = BlockRangeScanner::new().connect(provider.clone()).await?; let mut stream = client.stream_live(3).await?; @@ -182,8 +182,7 @@ async fn historical_emits_correction_range_when_reorg_below_end() -> anyhow::Res let end_num = 110; - let client = - BlockRangeScanner::new().max_block_range(30).connect(provider.clone()).await?.run()?; + let mut client = BlockRangeScanner::new().max_block_range(30).connect(provider.clone()).await?; let mut stream = client .stream_historical(BlockNumberOrTag::Number(0), BlockNumberOrTag::Number(end_num)) @@ -214,8 +213,7 @@ async fn historical_emits_correction_range_when_end_num_reorgs() -> anyhow::Resu provider.anvil_mine(Some(120), None).await?; - let client = - BlockRangeScanner::new().max_block_range(30).connect(provider.clone()).await?.run()?; + let mut client = BlockRangeScanner::new().max_block_range(30).connect(provider.clone()).await?; let mut stream = client.stream_historical(BlockNumberOrTag::Number(0), BlockNumberOrTag::Latest).await?; @@ -247,8 +245,7 @@ async fn historical_reorg_occurring_immediately_after_finalized_processing_is_ig provider.anvil_mine(Some(11), None).await?; - let client = - BlockRangeScanner::new().max_block_range(10).connect(provider.clone()).await?.run()?; + let mut client = BlockRangeScanner::new().max_block_range(10).connect(provider.clone()).await?; let mut stream = client.stream_historical(BlockNumberOrTag::Earliest, BlockNumberOrTag::Latest).await?; @@ -272,8 +269,7 @@ async fn historic_mode_respects_blocks_read_per_epoch() -> anyhow::Result<()> { provider.anvil_mine(Some(100), None).await?; - let client = - BlockRangeScanner::new().max_block_range(5).connect(provider.clone()).await?.run()?; + let mut client = BlockRangeScanner::new().max_block_range(5).connect(provider.clone()).await?; // ranges where each batch is of max blocks per epoch size let mut stream = client.stream_historical(0, 19).await?; @@ -300,7 +296,7 @@ async fn historic_mode_respects_blocks_read_per_epoch() -> anyhow::Result<()> { assert_closed!(stream); // range where blocks per epoch is larger than the number of blocks on chain - let client = BlockRangeScanner::new().max_block_range(200).connect(provider).await?.run()?; + let mut client = BlockRangeScanner::new().max_block_range(200).connect(provider).await?; let mut stream = client.stream_historical(0, 20).await?; assert_next!(stream, 0..=20); @@ -321,7 +317,7 @@ async fn historic_mode_normalises_start_and_end_block() -> anyhow::Result<()> { provider.anvil_mine(Some(11), None).await?; - let client = BlockRangeScanner::new().max_block_range(5).connect(provider).await?.run()?; + let mut client = BlockRangeScanner::new().max_block_range(5).connect(provider).await?; let mut stream = client.stream_historical(10, 0).await?; assert_next!(stream, 0..=0); @@ -345,9 +341,9 @@ async fn rewind_single_batch_when_epoch_larger_than_range() -> anyhow::Result<() provider.anvil_mine(Some(150), None).await?; - let client = BlockRangeScanner::new().max_block_range(100).connect(provider).await?.run()?; + let mut client = BlockRangeScanner::new().max_block_range(100).connect(provider).await?; - let mut stream = client.rewind(100, 150).await?; + let mut stream = client.stream_rewind(100, 150).await?; // Range length is 51, epoch is 100 -> single batch [100..=150] assert_next!(stream, 100..=150); @@ -363,9 +359,9 @@ async fn rewind_exact_multiple_of_epoch_creates_full_batches_in_reverse() -> any provider.anvil_mine(Some(15), None).await?; - let client = BlockRangeScanner::new().max_block_range(5).connect(provider).await?.run()?; + let mut client = BlockRangeScanner::new().max_block_range(5).connect(provider).await?; - let mut stream = client.rewind(0, 14).await?; + let mut stream = client.stream_rewind(0, 14).await?; // 0..=14 with epoch 5 -> [10..=14, 5..=9, 0..=4] assert_next!(stream, 10..=14); @@ -383,9 +379,9 @@ async fn rewind_with_remainder_trims_first_batch_to_stream_start() -> anyhow::Re provider.anvil_mine(Some(15), None).await?; - let client = BlockRangeScanner::new().max_block_range(4).connect(provider).await?.run()?; + let mut client = BlockRangeScanner::new().max_block_range(4).connect(provider).await?; - let mut stream = client.rewind(3, 12).await?; + let mut stream = client.stream_rewind(3, 12).await?; // 3..=12 with epoch 4 -> ends: 12,8,4 -> batches: [9..=12, 5..=8, 3..=4] assert_next!(stream, 9..=12); @@ -403,9 +399,9 @@ async fn rewind_single_block_range() -> anyhow::Result<()> { provider.anvil_mine(Some(15), None).await?; - let client = BlockRangeScanner::new().max_block_range(5).connect(provider).await?.run()?; + let mut client = BlockRangeScanner::new().max_block_range(5).connect(provider).await?; - let mut stream = client.rewind(7, 7).await?; + let mut stream = client.stream_rewind(7, 7).await?; assert_next!(stream, 7..=7); assert_closed!(stream); @@ -420,9 +416,9 @@ async fn rewind_epoch_of_one_sends_each_block_in_reverse_order() -> anyhow::Resu provider.anvil_mine(Some(15), None).await?; - let client = BlockRangeScanner::new().max_block_range(1).connect(provider).await?.run()?; + let mut client = BlockRangeScanner::new().max_block_range(1).connect(provider).await?; - let mut stream = client.rewind(5, 8).await?; + let mut stream = client.stream_rewind(5, 8).await?; // 5..=8 with epoch 1 -> [8..=8, 7..=7, 6..=6, 5..=5] assert_next!(stream, 8..=8); @@ -442,9 +438,10 @@ async fn command_rewind_defaults_latest_to_earliest_batches_correctly() -> anyho // Mine 20 blocks, so the total number of blocks is 21 (including 0th block) provider.anvil_mine(Some(20), None).await?; - let client = BlockRangeScanner::new().max_block_range(7).connect(provider).await?.run()?; + let mut client = BlockRangeScanner::new().max_block_range(7).connect(provider).await?; - let mut stream = client.rewind(BlockNumberOrTag::Earliest, BlockNumberOrTag::Latest).await?; + let mut stream = + client.stream_rewind(BlockNumberOrTag::Earliest, BlockNumberOrTag::Latest).await?; assert_next!(stream, 14..=20); assert_next!(stream, 7..=13); @@ -462,16 +459,16 @@ async fn command_rewind_handles_start_and_end_in_any_order() -> anyhow::Result<( // Ensure blocks at 3 and 15 exist provider.anvil_mine(Some(16), None).await?; - let client = BlockRangeScanner::new().max_block_range(5).connect(provider).await?.run()?; + let mut client = BlockRangeScanner::new().max_block_range(5).connect(provider).await?; - let mut stream = client.rewind(15, 3).await?; + let mut stream = client.stream_rewind(15, 3).await?; assert_next!(stream, 11..=15); assert_next!(stream, 6..=10); assert_next!(stream, 3..=5); assert_closed!(stream); - let mut stream = client.rewind(3, 15).await?; + let mut stream = client.stream_rewind(3, 15).await?; assert_next!(stream, 11..=15); assert_next!(stream, 6..=10); @@ -487,9 +484,9 @@ async fn command_rewind_propagates_block_not_found_error() -> anyhow::Result<()> // Do not mine up to 999 so start won't exist let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?; - let client = BlockRangeScanner::new().max_block_range(5).connect(provider).await?.run()?; + let mut client = BlockRangeScanner::new().max_block_range(5).connect(provider).await?; - let stream = client.rewind(0, 999).await; + let stream = client.stream_rewind(0, 999).await; assert!(matches!( stream, @@ -500,17 +497,16 @@ async fn command_rewind_propagates_block_not_found_error() -> anyhow::Result<()> } #[tokio::test] -#[ignore = "rewind reorg tests require ack-channels to reliably halt processing: https://github.com/OpenZeppelin/Event-Scanner/issues/218"] +#[ignore = "stream_rewind reorg tests require ack-channels to reliably halt processing: https://github.com/OpenZeppelin/Event-Scanner/issues/218"] async fn rewind_reorg_emits_notification_and_rescans_affected_range() -> anyhow::Result<()> { let anvil = Anvil::new().try_spawn()?; let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?; provider.anvil_mine(Some(20), None).await?; - let client = - BlockRangeScanner::new().max_block_range(5).connect(provider.clone()).await?.run()?; + let mut client = BlockRangeScanner::new().max_block_range(5).connect(provider.clone()).await?; - let mut stream = client.rewind(5, 20).await?; + let mut stream = client.stream_rewind(5, 20).await?; assert_next!(stream, 16..=20); assert_next!(stream, 11..=15); @@ -530,17 +526,16 @@ async fn rewind_reorg_emits_notification_and_rescans_affected_range() -> anyhow: } #[tokio::test] -#[ignore = "rewind reorg tests require ack-channels to reliably halt processing: https://github.com/OpenZeppelin/Event-Scanner/issues/218"] +#[ignore = "stream_rewind reorg tests require ack-channels to reliably halt processing: https://github.com/OpenZeppelin/Event-Scanner/issues/218"] async fn deep_rewind_reorg_streams_affected_range_in_chronologi() -> anyhow::Result<()> { let anvil = Anvil::new().try_spawn()?; let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?; provider.anvil_mine(Some(20), None).await?; - let client = - BlockRangeScanner::new().max_block_range(5).connect(provider.clone()).await?.run()?; + let mut client = BlockRangeScanner::new().max_block_range(5).connect(provider.clone()).await?; - let mut stream = client.rewind(5, 20).await?; + let mut stream = client.stream_rewind(5, 20).await?; assert_next!(stream, 16..=20); @@ -561,7 +556,7 @@ async fn deep_rewind_reorg_streams_affected_range_in_chronologi() -> anyhow::Res } #[tokio::test] -#[ignore = "rewind reorg tests require ack-channels to reliably halt processing: https://github.com/OpenZeppelin/Event-Scanner/issues/218"] +#[ignore = "stream_rewind reorg tests require ack-channels to reliably halt processing: https://github.com/OpenZeppelin/Event-Scanner/issues/218"] async fn rewind_skips_reorg_check_when_tip_below_finalized() -> anyhow::Result<()> { let anvil = Anvil::new().try_spawn()?; let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?; @@ -570,11 +565,10 @@ async fn rewind_skips_reorg_check_when_tip_below_finalized() -> anyhow::Result<( let finalized = provider.get_block_by_number(BlockNumberOrTag::Finalized).await?.unwrap(); - let client = - BlockRangeScanner::new().max_block_range(5).connect(provider.clone()).await?.run()?; + let mut client = BlockRangeScanner::new().max_block_range(5).connect(provider.clone()).await?; // Rewind with tip < finalized - let mut stream = client.rewind(0, finalized.header.number - 1).await?; + let mut stream = client.stream_rewind(0, finalized.header.number - 1).await?; assert_next!(stream, 31..=35); assert_next!(stream, 26..=30); @@ -595,7 +589,7 @@ async fn rewind_skips_reorg_check_when_tip_below_finalized() -> anyhow::Result<( } #[tokio::test] -#[ignore = "rewind reorg tests require ack-channels to reliably halt processing: https://github.com/OpenZeppelin/Event-Scanner/issues/218"] +#[ignore = "stream_rewind reorg tests require ack-channels to reliably halt processing: https://github.com/OpenZeppelin/Event-Scanner/issues/218"] async fn rewind_skips_reorg_when_tip_is_at_finalized() -> anyhow::Result<()> { let anvil = Anvil::new().try_spawn()?; let provider = ProviderBuilder::new().connect(anvil.endpoint().as_str()).await?; @@ -603,11 +597,10 @@ async fn rewind_skips_reorg_when_tip_is_at_finalized() -> anyhow::Result<()> { provider.anvil_mine(Some(100), None).await?; let finalized = provider.get_block_by_number(BlockNumberOrTag::Finalized).await?.unwrap(); - let client = - BlockRangeScanner::new().max_block_range(5).connect(provider.clone()).await?.run()?; + let mut client = BlockRangeScanner::new().max_block_range(5).connect(provider.clone()).await?; // Rewind with tip == finalized - let mut stream = client.rewind(0, finalized.header.number).await?; + let mut stream = client.stream_rewind(0, finalized.header.number).await?; assert_next!(stream, 32..=36); assert_next!(stream, 27..=31);