Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 179 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,22 +1,190 @@
# Event Scanner

> ⚠️ **WARNING: ACTIVE DEVELOPMENT** ⚠️
>
> This project is under active development and likely contains many bugs. Use at your own risk!
> APIs and functionality may change without notice.
>
> This project is under active development and likely contains bugs. APIs and behaviour may change without notice. Use at your own risk.

## About

This is an ethereum L1 event scanner is a Rust-based Ethereum blockchain event monitoring library built on top of the Alloy framework. It provides a flexible and efficient way to:
Event Scanner is a Rust library for monitoring EVM-based smart contract events. It is built on top of the [`alloy`](https://github.com/alloy-rs/alloy) ecosystem and focuses on in-memory scanning without a backing database. Applications provide event filters and callback implementations; the scanner takes care of subscribing to historical ranges, bridging into live mode, and delivering events with retry-aware execution strategies.

- Subscribe to and monitor specific smart contract events in real-time
- Process historical events from a specified starting block
- Handle event callbacks with configurable retry logic
- Support both WebSocket and IPC connections to Ethereum nodes
---

The scanner allows you to define custom event filters with associated callbacks, making it easy to build applications that react to on-chain events, specifically with rollups in mind. This is varies from traditional indexers in that all logic is handlded in memory and no database is used.
## Table of Contents

- [Features](#features)
- [Architecture Overview](#architecture-overview)
- [Quick Start](#quick-start)
- [Usage](#usage)
- [Building a Scanner](#building-a-scanner)
- [Defining Event Filters](#defining-event-filters)
- [Scanning Modes](#scanning-modes)
- [Working with Callbacks](#working-with-callbacks)
- [Examples](#examples)
- [Testing](#testing)

## Status
---

## Features

- **Historical replay** – scan block ranges.
- **Live subscriptions** – stay up to date with latest blocks via WebSocket or IPC transports.
- **Hybrid flow** – automatically transition from historical catch-up into streaming mode.
- **Composable filters** – register one or many contract + event signature pairs with their own callbacks.
- **Retry strategies** – built-in retryable callback backoff strategies
- **No database** – processing happens in-memory; persistence is left to the host application.

---

## Architecture Overview

The library exposes two primary layers:

- `EventScannerBuilder` / `EventScanner` – the main module the application will interact with.
- `BlockScanner` – lower-level component that streams block ranges, handles reorg, batching, and provider subscriptions. This is exposed to the user but has many edge cases which will be documented in the future. For now interact with this via the `EventScanner`
Copy link
Collaborator

@0xNeshi 0xNeshi Sep 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're not really scanning blocks, but block ranges. I'm thinking that maybe BlockRangeScanner is a better name?
That way this becomes a sort of a primitive that we can use to also potentially implement an actual BlockScanner in the future.
Wdyt?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree - we should change it both here and in the code. Happy to do it here unless you want to take over

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do it


Callbacks implement the `EventCallback` trait. They are executed through a `CallbackStrategy` that performs retries when necessary before reporting failures.

---

## Quick Start

Add `event-scanner` to your `Cargo.toml`:

```toml
[dependencies]
event-scanner = "0.1.0-alpha.1"
```
Create a callback implementing `EventCallback` and register it with the builder:

```rust
use std::{sync::{Arc, atomic::{AtomicUsize, Ordering}}};
use alloy::{eips::BlockNumberOrTag, network::Ethereum, rpc::types::Log, sol_types::SolEvent};
use async_trait::async_trait;
use event_scanner::{event_scanner::EventScannerBuilder, EventCallback, EventFilter};

struct CounterCallback { processed: Arc<AtomicUsize> }

#[async_trait]
impl EventCallback for CounterCallback {
async fn on_event(&self, _log: &Log) -> anyhow::Result<()> {
self.processed.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}

async fn run_scanner(ws_url: alloy::transports::http::reqwest::Url, contract: alloy::primitives::Address) -> anyhow::Result<()> {
let filter = EventFilter {
contract_address: contract,
event: MyContract::SomeEvent::SIGNATURE.to_owned(),
callback: Arc::new(CounterCallback { processed: Arc::new(AtomicUsize::new(0)) }),
};

let mut scanner = EventScannerBuilder::new()
.with_event_filter(filter)
.connect_ws::<Ethereum>(ws_url)
.await?;

scanner.start(BlockNumberOrTag::Latest, None).await?;
Ok(())
}
```

---

## Usage

### Building a Scanner

`EventScannerBuilder` supports:

- `with_event_filter(s)` – attach [filters](#defining-event-filters).
- `with_callback_strategy(strategy)` – override retry behaviour (`StateSyncAwareStrategy` by default).
- `with_blocks_read_per_epoch` - how many blocks are read at a time in a single batch (taken into consideration when fetching historical blocks)
- `with_reorg_rewind_depth` - how many blocks to rewind when a reorg is detected
- `with_retry_interval` - how often to retry failed callbacks
- `with_block_confirmations` - how many confirmations to wait for before considering a block final

Once configured, connect using either `connect_ws::<Ethereum>(ws_url)` or `connect_ipc::<Ethereum>(path)`. This will `build` the `EventScanner` and allow you to call run to start in various [modes](#scanning-Modes).


### Defining Event Filters

Create an `EventFilter` for each contract/event pair you want to track. The filter bundles the contract address, the event signature (from `SolEvent::SIGNATURE`), and an `Arc<dyn EventCallback + Send + Sync>`.

```rust
let filter = EventFilter {
contract_address: *counter_contract.address(),
event: Counter::CountIncreased::SIGNATURE.to_owned(),
callback: Arc::new(CounterCallback),
};
```

Register multiple filters by calling either `with_event_filter` repeatedly or `with_event_filters` once.


### Scanning Modes

- **Live mode** – `start(BlockNumberOrTag::Latest, None)` subscribes to new blocks only.
- **Historical mode** – `start(BlockNumberOrTag::Number(start, Some(BlockNumberOrTag::Number(end)))`, scanner fetches events from a historical block range.
- **Historical → Live** – `start(BlockNumberOrTag::Number(start, None)` replays from `start` to current head, then streams future blocks.

For now modes are deduced from the `start` and `end` parameters. In the future, we might add explicit commands to select the mode.

See the integration tests under `tests/live_mode`, `tests/historic_mode`, and `tests/historic_to_live` for concrete examples.

### Working with Callbacks

Implement `EventCallback`:

```rust
#[async_trait]
impl EventCallback for RollupCallback {
async fn on_event(&self, log: &Log) -> anyhow::Result<()> {
// decode event, send to EL etc.
Ok(())
}
}
```

Advanced users can write custom retry behaviour by implementing the `CallbackStrategy` trait. The default `StateSyncAwareStrategy` automatically detects state-sync errors and performs exponential backoff ([smart retry mechanism](https://github.com/taikoxyz/taiko-mono/blob/f4b3a0e830e42e2fee54829326389709dd422098/packages/taiko-client/pkg/chain_iterator/block_batch_iterator.go#L149) from the geth driver) before falling back to a fixed retry policy configured via `FixedRetryConfig`.

```rust
#[async_trait]
pub trait CallbackStrategy: Send + Sync {
async fn execute(
&self,
callback: &Arc<dyn EventCallback + Send + Sync>,
log: &Log,
) -> anyhow::Result<()>;
}
```

---

## Examples

- `examples/simple_counter` – minimal live-mode scanner
- `examples/historical_scanning` – demonstrates replaying from genesis (block 0) before continuing streaming latest blocks

Run an example with:

```bash
RUST_LOG=info cargo run -p simple_counter
# or
RUST_LOG=info cargo run -p historical_scanning
```

Both examples spin up a local `anvil` instance and deploy a demo counter contract before starting the scanner.

---

## Testing

Integration tests cover live, historical, and hybrid flows:
(We recommend using [nextest](https://crates.io/crates/cargo-nextest) to run the tests)

```bash
cargo nextest run
```

This library is in early alpha stage. Expect breaking changes and bugs.
11 changes: 11 additions & 0 deletions src/event_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ impl Default for EventScannerBuilder {

impl EventScannerBuilder {
#[must_use]
/// Creates a new builder with default block scanner and callback strategy.
pub fn new() -> Self {
Self {
block_scanner: BlockScanner::new(),
Expand All @@ -39,42 +40,49 @@ impl EventScannerBuilder {
}
}

/// Registers a single event filter for scanning.
#[must_use]
pub fn with_event_filter(mut self, filter: EventFilter) -> Self {
self.tracked_events.push(filter);
self
}

/// Registers a collection of event filters for scanning.
#[must_use]
pub fn with_event_filters(mut self, filters: Vec<EventFilter>) -> Self {
self.tracked_events.extend(filters);
self
}

/// Overrides the callback execution strategy used by the scanner.
#[must_use]
pub fn with_callback_strategy(mut self, strategy: Arc<dyn CallbackStrategy>) -> Self {
self.callback_strategy = strategy;
self
}

/// Configures how many blocks are read per epoch during a historical sync.
#[must_use]
pub fn with_blocks_read_per_epoch(mut self, blocks_read_per_epoch: usize) -> Self {
self.block_scanner = self.block_scanner.with_blocks_read_per_epoch(blocks_read_per_epoch);
self
}

/// Sets the depth to rewind when a reorg is detected.
#[must_use]
pub fn with_reorg_rewind_depth(mut self, reorg_rewind_depth: u64) -> Self {
self.block_scanner = self.block_scanner.with_reorg_rewind_depth(reorg_rewind_depth);
self
}

/// Adjusts the retry interval when reconnecting to the provider.
#[must_use]
pub fn with_retry_interval(mut self, retry_interval: Duration) -> Self {
self.block_scanner = self.block_scanner.with_retry_interval(retry_interval);
self
}

/// Configures how many confirmations are required before processing a block (used for reorgs).
#[must_use]
pub fn with_block_confirmations(mut self, block_confirmations: u64) -> Self {
self.block_scanner = self.block_scanner.with_block_confirmations(block_confirmations);
Expand Down Expand Up @@ -115,6 +123,7 @@ impl EventScannerBuilder {
})
}

/// Builds the default callback strategy used when none is provided.
fn get_default_callback_strategy() -> Arc<dyn CallbackStrategy> {
let state_sync_aware_strategy = StateSyncAwareStrategy::new();
Arc::new(state_sync_aware_strategy)
Expand Down Expand Up @@ -191,6 +200,7 @@ impl<N: Network> EventScanner<N> {
Ok(())
}

/// Spawns background tasks that drive callback execution for an event type.
fn spawn_event_callback_task_executors(
mut receiver: Receiver<Log>,
callback: Arc<dyn crate::callback::EventCallback + Send + Sync>,
Expand All @@ -211,6 +221,7 @@ impl<N: Network> EventScanner<N> {
});
}

/// Fetches logs for the supplied block range and forwards them to the callback channels.
async fn process_block_range(
&self,
from_block: u64,
Expand Down