Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ async fn run_scanner(ws_url: alloy::transports::http::reqwest::Url, contract: al
- `with_callback_strategy(strategy)` – override retry behaviour (`StateSyncAwareStrategy` by default).
- `with_blocks_read_per_epoch` - how many blocks are read at a time in a single batch (taken into consideration when fetching historical blocks)
- `with_reorg_rewind_depth` - how many blocks to rewind when a reorg is detected
- `with_retry_interval` - how often to retry failed callbacks
- `with_block_confirmations` - how many confirmations to wait for before considering a block final

Once configured, connect using either `connect_ws::<Ethereum>(ws_url)` or `connect_ipc::<Ethereum>(path)`. This will `build` the `EventScanner` and allow you to call run to start in various [modes](#scanning-Modes).
Expand Down
50 changes: 10 additions & 40 deletions src/block_range_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
//! let block_range_scanner = BlockRangeScanner::new()
//! .with_blocks_read_per_epoch(1000)
//! .with_reorg_rewind_depth(5)
//! .with_retry_interval(Duration::from_secs(12))
//! .with_block_confirmations(5)
//! .connect_ws::<Ethereum>(Url::parse("ws://localhost:8546").unwrap())
//! .await?;
Expand Down Expand Up @@ -69,7 +68,7 @@
//! }
//! ```

use std::{ops::Range, time::Duration};
use std::ops::Range;

use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::ReceiverStream;
Expand All @@ -93,13 +92,11 @@ use tracing::{debug, error, info, warn};

// copied form https://github.com/taikoxyz/taiko-mono/blob/f4b3a0e830e42e2fee54829326389709dd422098/packages/taiko-client/pkg/chain_iterator/block_batch_iterator.go#L19
const DEFAULT_BLOCKS_READ_PER_EPOCH: usize = 1000;
const DEFAULT_RETRY_INTERVAL: Duration = Duration::from_secs(12);
const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0;
// const BACK_OFF_MAX_RETRIES: u64 = 5;

const MAX_BUFFERED_MESSAGES: usize = 50000;

// TODO: determine check exact default value
const DEFAULT_REORG_REWIND_DEPTH: u64 = 0;

// // State sync aware retry settings
Expand Down Expand Up @@ -182,16 +179,16 @@ impl BlockHashAndNumber {
struct Config {
blocks_read_per_epoch: usize,
reorg_rewind_depth: u64,
#[allow(dead_code, reason = "TODO: will be used in smart retry mechanism")]
retry_interval: Duration,
#[allow(dead_code, reason = "TODO: will be used in reorg mechanism")]
#[allow(
dead_code,
reason = "Will be used in reorg mechanism: https://github.com/OpenZeppelin/Event-Scanner/issues/5"
)]
block_confirmations: u64,
}

pub struct BlockRangeScanner {
blocks_read_per_epoch: usize,
reorg_rewind_depth: u64,
retry_interval: Duration,
block_confirmations: u64,
}

Expand All @@ -207,7 +204,6 @@ impl BlockRangeScanner {
Self {
blocks_read_per_epoch: DEFAULT_BLOCKS_READ_PER_EPOCH,
reorg_rewind_depth: DEFAULT_REORG_REWIND_DEPTH,
retry_interval: DEFAULT_RETRY_INTERVAL,
block_confirmations: DEFAULT_BLOCK_CONFIRMATIONS,
}
}
Expand All @@ -224,12 +220,6 @@ impl BlockRangeScanner {
self
}

#[must_use]
pub fn with_retry_interval(mut self, retry_interval: Duration) -> Self {
self.retry_interval = retry_interval;
self
}

#[must_use]
pub fn with_block_confirmations(mut self, block_confirmations: u64) -> Self {
self.block_confirmations = block_confirmations;
Expand All @@ -252,7 +242,6 @@ impl BlockRangeScanner {
config: Config {
blocks_read_per_epoch: self.blocks_read_per_epoch,
reorg_rewind_depth: self.reorg_rewind_depth,
retry_interval: self.retry_interval,
block_confirmations: self.block_confirmations,
},
})
Expand All @@ -273,7 +262,6 @@ impl BlockRangeScanner {
config: Config {
blocks_read_per_epoch: self.blocks_read_per_epoch,
reorg_rewind_depth: self.reorg_rewind_depth,
retry_interval: self.retry_interval,
block_confirmations: self.block_confirmations,
},
})
Expand All @@ -292,7 +280,6 @@ impl<N: Network> ConnectedBlockRangeScanner<N> {
&self.provider
}

// TODO: use wrapper errors
/// Starts the subscription service and returns a client for sending commands.
///
/// # Errors
Expand Down Expand Up @@ -393,8 +380,6 @@ impl<N: Network> Service<N> {
return Err(Error::MultipleSubscribers);
}

// TODO: update local state relate to reorg and validate data

info!("Starting subscription from point: {start_height:?}");
self.subscriber = Some(sender);

Expand All @@ -415,11 +400,8 @@ impl<N: Network> Service<N> {
let (start_block, sync_end_block) = if let Some(end_height) = end_height {
let start_block =
self.provider.get_block_by_number(start_height).await?.expect("already checked");
let end_block = self
.provider
.get_block(end_height.into())
.await?
.expect("TODO: check if really valid");
let end_block =
self.provider.get_block(end_height.into()).await?.expect("should be valid");
(start_block, end_block)
} else {
let start_block =
Expand All @@ -428,7 +410,7 @@ impl<N: Network> Service<N> {
.provider
.get_block(BlockId::Number(BlockNumberOrTag::Latest))
.await?
.expect("TODO: check if really valid");
.expect("should be valid");
(start_block, end_block)
};

Expand All @@ -447,7 +429,6 @@ impl<N: Network> Service<N> {
}
});

// TODO: invoke with smart retry mechanism with backoff
if let Err(e) = self.sync_historical_data(start_block, sync_end_block).await {
warn!("aborting ws_task");
ws_task.abort();
Expand Down Expand Up @@ -494,11 +475,8 @@ impl<N: Network> Service<N> {
self.current.as_ref().unwrap().number + self.config.blocks_read_per_epoch as u64
};

let batch_end_block = self
.provider
.get_block_by_number(batch_to.into())
.await?
.expect("TODO: check if really valid");
let batch_end_block =
self.provider.get_block_by_number(batch_to.into()).await?.expect("should be valid");

self.send_to_subscriber(Ok(self.current.as_ref().unwrap().number..batch_to)).await;

Expand Down Expand Up @@ -560,18 +538,12 @@ impl<N: Network> Service<N> {
provider: P,
buffer_sender: mpsc::Sender<Range<BlockNumber>>,
) {
// TODO: consider passing errors to the caller for handling

// TODO: use smart retry mechanism
match Self::get_block_subscription(&provider).await {
Ok(mut ws_stream) => {
info!("WebSocket connected for buffering");

// TODO: if latest != ws_stream.next(), then return latest.number and empty the
// ws_stream backlog
while let Ok(header_resp) = ws_stream.recv().await {
info!("Received block header: {}", header_resp.number());
// TODO: handle reorgs
if current == header_resp.number() {
continue;
}
Expand Down Expand Up @@ -612,7 +584,6 @@ impl<N: Network> Service<N> {
}
processed += end - start;
} else if end > cutoff {
// TODO: verify the math
discarded += cutoff - start;

let start = cutoff;
Expand Down Expand Up @@ -778,7 +749,6 @@ mod tests {
let client = BlockRangeScanner::new()
.with_blocks_read_per_epoch(3)
.with_reorg_rewind_depth(5)
.with_retry_interval(Duration::from_secs(1))
.with_block_confirmations(1)
.connect_ws::<Ethereum>(anvil.ws_endpoint_url())
.await?
Expand Down
10 changes: 1 addition & 9 deletions src/event_scanner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
use std::{collections::HashMap, sync::Arc};

use crate::{
block_range_scanner::{self, BlockRangeScanner, ConnectedBlockRangeScanner},
Expand Down Expand Up @@ -77,13 +77,6 @@ impl EventScannerBuilder {
self
}

/// Adjusts the retry interval when reconnecting to the provider.
#[must_use]
pub fn with_retry_interval(mut self, retry_interval: Duration) -> Self {
self.block_range_scanner = self.block_range_scanner.with_retry_interval(retry_interval);
self
}

/// Configures how many confirmations are required before processing a block (used for reorgs).
#[must_use]
pub fn with_block_confirmations(mut self, block_confirmations: u64) -> Self {
Expand Down Expand Up @@ -168,7 +161,6 @@ impl<N: Network> EventScanner<N> {
continue;
}

// TODO: configurable buffer size / smaller buffer ?
let (sender, receiver) = mpsc::channel::<Log>(1024);

let callback = filter.callback.clone();
Expand Down
1 change: 0 additions & 1 deletion src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use crate::callback::EventCallback;
pub struct EventFilter {
pub contract_address: Address,
/// Human-readable event signature, e.g. "Transfer(address,address,uint256)".
/// TODO: Maybe change this to selector i.e. B256
pub event: String,
pub callback: Arc<dyn EventCallback + Send + Sync>,
}
Loading