diff --git a/README.md b/README.md index 4d797611..7396e153 100644 --- a/README.md +++ b/README.md @@ -102,7 +102,6 @@ async fn run_scanner(ws_url: alloy::transports::http::reqwest::Url, contract: al - `with_callback_strategy(strategy)` – override retry behaviour (`StateSyncAwareStrategy` by default). - `with_blocks_read_per_epoch` - how many blocks are read at a time in a single batch (taken into consideration when fetching historical blocks) - `with_reorg_rewind_depth` - how many blocks to rewind when a reorg is detected -- `with_retry_interval` - how often to retry failed callbacks - `with_block_confirmations` - how many confirmations to wait for before considering a block final Once configured, connect using either `connect_ws::(ws_url)` or `connect_ipc::(path)`. This will `build` the `EventScanner` and allow you to call run to start in various [modes](#scanning-Modes). diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index e3ac8980..3c970626 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -21,7 +21,6 @@ //! let block_range_scanner = BlockRangeScanner::new() //! .with_blocks_read_per_epoch(1000) //! .with_reorg_rewind_depth(5) -//! .with_retry_interval(Duration::from_secs(12)) //! .with_block_confirmations(5) //! .connect_ws::(Url::parse("ws://localhost:8546").unwrap()) //! .await?; @@ -69,7 +68,7 @@ //! } //! ``` -use std::{ops::Range, time::Duration}; +use std::ops::Range; use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::ReceiverStream; @@ -93,13 +92,11 @@ use tracing::{debug, error, info, warn}; // copied form https://github.com/taikoxyz/taiko-mono/blob/f4b3a0e830e42e2fee54829326389709dd422098/packages/taiko-client/pkg/chain_iterator/block_batch_iterator.go#L19 const DEFAULT_BLOCKS_READ_PER_EPOCH: usize = 1000; -const DEFAULT_RETRY_INTERVAL: Duration = Duration::from_secs(12); const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0; // const BACK_OFF_MAX_RETRIES: u64 = 5; const MAX_BUFFERED_MESSAGES: usize = 50000; -// TODO: determine check exact default value const DEFAULT_REORG_REWIND_DEPTH: u64 = 0; // // State sync aware retry settings @@ -182,16 +179,16 @@ impl BlockHashAndNumber { struct Config { blocks_read_per_epoch: usize, reorg_rewind_depth: u64, - #[allow(dead_code, reason = "TODO: will be used in smart retry mechanism")] - retry_interval: Duration, - #[allow(dead_code, reason = "TODO: will be used in reorg mechanism")] + #[allow( + dead_code, + reason = "Will be used in reorg mechanism: https://github.com/OpenZeppelin/Event-Scanner/issues/5" + )] block_confirmations: u64, } pub struct BlockRangeScanner { blocks_read_per_epoch: usize, reorg_rewind_depth: u64, - retry_interval: Duration, block_confirmations: u64, } @@ -207,7 +204,6 @@ impl BlockRangeScanner { Self { blocks_read_per_epoch: DEFAULT_BLOCKS_READ_PER_EPOCH, reorg_rewind_depth: DEFAULT_REORG_REWIND_DEPTH, - retry_interval: DEFAULT_RETRY_INTERVAL, block_confirmations: DEFAULT_BLOCK_CONFIRMATIONS, } } @@ -224,12 +220,6 @@ impl BlockRangeScanner { self } - #[must_use] - pub fn with_retry_interval(mut self, retry_interval: Duration) -> Self { - self.retry_interval = retry_interval; - self - } - #[must_use] pub fn with_block_confirmations(mut self, block_confirmations: u64) -> Self { self.block_confirmations = block_confirmations; @@ -252,7 +242,6 @@ impl BlockRangeScanner { config: Config { blocks_read_per_epoch: self.blocks_read_per_epoch, reorg_rewind_depth: self.reorg_rewind_depth, - retry_interval: self.retry_interval, block_confirmations: self.block_confirmations, }, }) @@ -273,7 +262,6 @@ impl BlockRangeScanner { config: Config { blocks_read_per_epoch: self.blocks_read_per_epoch, reorg_rewind_depth: self.reorg_rewind_depth, - retry_interval: self.retry_interval, block_confirmations: self.block_confirmations, }, }) @@ -292,7 +280,6 @@ impl ConnectedBlockRangeScanner { &self.provider } - // TODO: use wrapper errors /// Starts the subscription service and returns a client for sending commands. /// /// # Errors @@ -393,8 +380,6 @@ impl Service { return Err(Error::MultipleSubscribers); } - // TODO: update local state relate to reorg and validate data - info!("Starting subscription from point: {start_height:?}"); self.subscriber = Some(sender); @@ -415,11 +400,8 @@ impl Service { let (start_block, sync_end_block) = if let Some(end_height) = end_height { let start_block = self.provider.get_block_by_number(start_height).await?.expect("already checked"); - let end_block = self - .provider - .get_block(end_height.into()) - .await? - .expect("TODO: check if really valid"); + let end_block = + self.provider.get_block(end_height.into()).await?.expect("should be valid"); (start_block, end_block) } else { let start_block = @@ -428,7 +410,7 @@ impl Service { .provider .get_block(BlockId::Number(BlockNumberOrTag::Latest)) .await? - .expect("TODO: check if really valid"); + .expect("should be valid"); (start_block, end_block) }; @@ -447,7 +429,6 @@ impl Service { } }); - // TODO: invoke with smart retry mechanism with backoff if let Err(e) = self.sync_historical_data(start_block, sync_end_block).await { warn!("aborting ws_task"); ws_task.abort(); @@ -494,11 +475,8 @@ impl Service { self.current.as_ref().unwrap().number + self.config.blocks_read_per_epoch as u64 }; - let batch_end_block = self - .provider - .get_block_by_number(batch_to.into()) - .await? - .expect("TODO: check if really valid"); + let batch_end_block = + self.provider.get_block_by_number(batch_to.into()).await?.expect("should be valid"); self.send_to_subscriber(Ok(self.current.as_ref().unwrap().number..batch_to)).await; @@ -560,18 +538,12 @@ impl Service { provider: P, buffer_sender: mpsc::Sender>, ) { - // TODO: consider passing errors to the caller for handling - - // TODO: use smart retry mechanism match Self::get_block_subscription(&provider).await { Ok(mut ws_stream) => { info!("WebSocket connected for buffering"); - // TODO: if latest != ws_stream.next(), then return latest.number and empty the - // ws_stream backlog while let Ok(header_resp) = ws_stream.recv().await { info!("Received block header: {}", header_resp.number()); - // TODO: handle reorgs if current == header_resp.number() { continue; } @@ -612,7 +584,6 @@ impl Service { } processed += end - start; } else if end > cutoff { - // TODO: verify the math discarded += cutoff - start; let start = cutoff; @@ -778,7 +749,6 @@ mod tests { let client = BlockRangeScanner::new() .with_blocks_read_per_epoch(3) .with_reorg_rewind_depth(5) - .with_retry_interval(Duration::from_secs(1)) .with_block_confirmations(1) .connect_ws::(anvil.ws_endpoint_url()) .await? diff --git a/src/event_scanner.rs b/src/event_scanner.rs index 3dca86ec..c38a1f26 100644 --- a/src/event_scanner.rs +++ b/src/event_scanner.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{collections::HashMap, sync::Arc}; use crate::{ block_range_scanner::{self, BlockRangeScanner, ConnectedBlockRangeScanner}, @@ -77,13 +77,6 @@ impl EventScannerBuilder { self } - /// Adjusts the retry interval when reconnecting to the provider. - #[must_use] - pub fn with_retry_interval(mut self, retry_interval: Duration) -> Self { - self.block_range_scanner = self.block_range_scanner.with_retry_interval(retry_interval); - self - } - /// Configures how many confirmations are required before processing a block (used for reorgs). #[must_use] pub fn with_block_confirmations(mut self, block_confirmations: u64) -> Self { @@ -168,7 +161,6 @@ impl EventScanner { continue; } - // TODO: configurable buffer size / smaller buffer ? let (sender, receiver) = mpsc::channel::(1024); let callback = filter.callback.clone(); diff --git a/src/types.rs b/src/types.rs index d0aaeb30..53d961e1 100644 --- a/src/types.rs +++ b/src/types.rs @@ -8,7 +8,6 @@ use crate::callback::EventCallback; pub struct EventFilter { pub contract_address: Address, /// Human-readable event signature, e.g. "Transfer(address,address,uint256)". - /// TODO: Maybe change this to selector i.e. B256 pub event: String, pub callback: Arc, }