Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
5f40c94
define block scanner types
0xNeshi Sep 11, 2025
2324ccd
setup basic interface
0xNeshi Sep 11, 2025
bb6de81
rename module to block_scanner
0xNeshi Sep 11, 2025
02de8d0
implement event scanner
0xNeshi Sep 11, 2025
4d3c4c2
enable websocket and ipc connections
0xNeshi Sep 11, 2025
a6d16aa
remove http, cleanup
LeoPatOZ Sep 11, 2025
c058ad9
Merge branch 'main' into channels
LeoPatOZ Sep 11, 2025
3961e75
event channeling
LeoPatOZ Sep 11, 2025
4efba82
Merge branch 'main' into event-channeling
LeoPatOZ Sep 12, 2025
335d4a7
make end height optional
0xNeshi Sep 12, 2025
eda1385
Merge branch 'main' into event-channeling
LeoPatOZ Sep 12, 2025
e36803a
merge
0xNeshi Sep 12, 2025
5d7f48c
update to use stream
LeoPatOZ Sep 12, 2025
1c67dbb
Merge branch 'main' into event-channeling
LeoPatOZ Sep 12, 2025
40c4cef
lint
LeoPatOZ Sep 12, 2025
4bca02c
assume errors when building block scanner
0xNeshi Sep 12, 2025
238d9c6
implement reorg logic
0xNeshi Sep 12, 2025
eb73b49
Merge branch 'main' into event-channeling
LeoPatOZ Sep 12, 2025
c54671c
remove comment about placeholder
LeoPatOZ Sep 12, 2025
597d87b
test migration
LeoPatOZ Sep 12, 2025
9f3d72d
add more int tests
LeoPatOZ Sep 12, 2025
19c3c7f
renameing and crate issues
LeoPatOZ Sep 12, 2025
3a51905
update order test
LeoPatOZ Sep 12, 2025
e48e119
udpate tests and add block ordering
LeoPatOZ Sep 12, 2025
e2f7828
clippy
LeoPatOZ Sep 12, 2025
0fb8d7b
format
LeoPatOZ Sep 12, 2025
cafdfb2
Merge branch 'main' into migrate-event-scanner-tests
LeoPatOZ Sep 17, 2025
4e6e580
merge main
0xNeshi Sep 17, 2025
8cd6f30
Block scanner refactor (#21)
0xNeshi Sep 17, 2025
499b683
Merge branch 'block-scanner' into migrate-event-scanner-tests
LeoPatOZ Sep 17, 2025
159c33f
fix test and bug
LeoPatOZ Sep 17, 2025
5e5e8f3
historic test
LeoPatOZ Sep 17, 2025
95109a2
historic to live test
LeoPatOZ Sep 17, 2025
eabf88b
remove legacy scanner
LeoPatOZ Sep 17, 2025
da70991
fix examples
LeoPatOZ Sep 17, 2025
4dfc202
Merge branch 'block-scanner' into migrate-event-scanner-tests
LeoPatOZ Sep 17, 2025
e90556a
fix doc test
LeoPatOZ Sep 17, 2025
1edc9b4
Merge branch 'block-scanner' into migrate-event-scanner-tests
LeoPatOZ Sep 17, 2025
0bc695d
merge
0xNeshi Sep 17, 2025
4a2e569
set timeout on processes_events_within_specified_historical_range test
0xNeshi Sep 17, 2025
7930ab2
set timeout on processes_events_within_specified_historical_range test
0xNeshi Sep 17, 2025
602d78e
make spawn_anvil accept f64
0xNeshi Sep 17, 2025
bbf641b
speed up block time in processes_events_within_specified_historical_r…
0xNeshi Sep 17, 2025
ee77985
fail_times -> max_fail_times
0xNeshi Sep 17, 2025
bd86c5c
fail_times -> max_fail_times
0xNeshi Sep 17, 2025
ed22861
change anvil block time to 0.1
0xNeshi Sep 17, 2025
1256f15
optimize high_event_volume_no_loss
0xNeshi Sep 17, 2025
f0baad8
make event scanner builder return consume self
0xNeshi Sep 17, 2025
b3b5421
make block scanner builder pattern consume self
0xNeshi Sep 17, 2025
39bcf9e
use arc clone convention
0xNeshi Sep 17, 2025
64b122d
use timeout approach to asserting
0xNeshi Sep 18, 2025
d117164
use last-attempt assertions with clear msgs on timeouts
0xNeshi Sep 18, 2025
203ddcd
fmt
0xNeshi Sep 18, 2025
552cb13
clippy
0xNeshi Sep 18, 2025
10f5a4a
assert event ordering in replays_historical_then_switches_to_live
0xNeshi Sep 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions examples/historical_scanning/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,10 @@ async fn main() -> anyhow::Result<()> {

let _ = counter_contract.increase().send().await?.get_receipt().await?;

let mut builder = EventScannerBuilder::new();

builder.with_event_filter(increase_filter);

let mut scanner = builder.connect_ws::<Ethereum>(anvil.ws_endpoint_url()).await?;
let mut scanner = EventScannerBuilder::new()
.with_event_filter(increase_filter)
.connect_ws::<Ethereum>(anvil.ws_endpoint_url())
.await?;

sleep(Duration::from_secs(10)).await;
scanner.start(BlockNumberOrTag::Number(0), None).await.expect("failed to start scanner");
Expand Down
10 changes: 4 additions & 6 deletions examples/simple_counter/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,12 @@ async fn main() -> anyhow::Result<()> {
callback: Arc::new(CounterCallback),
};

let mut builder = EventScannerBuilder::new();

builder.with_event_filter(increase_filter);

let scanner = builder.connect_ws::<Ethereum>(anvil.ws_endpoint_url()).await?;
let mut scanner = EventScannerBuilder::new()
.with_event_filter(increase_filter)
.connect_ws::<Ethereum>(anvil.ws_endpoint_url())
.await?;

let task_1 = tokio::spawn(async move {
let mut scanner = scanner;
scanner.start(BlockNumberOrTag::Latest, None).await.expect("failed to start scanner");
});

Expand Down
12 changes: 6 additions & 6 deletions src/block_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,25 +212,25 @@ impl BlockScanner {
}

#[must_use]
pub fn with_blocks_read_per_epoch(&mut self, blocks_read_per_epoch: usize) -> &mut Self {
pub fn with_blocks_read_per_epoch(mut self, blocks_read_per_epoch: usize) -> Self {
self.blocks_read_per_epoch = blocks_read_per_epoch;
self
}

#[must_use]
pub fn with_reorg_rewind_depth(&mut self, reorg_rewind_depth: u64) -> &mut Self {
pub fn with_reorg_rewind_depth(mut self, reorg_rewind_depth: u64) -> Self {
self.reorg_rewind_depth = reorg_rewind_depth;
self
}

#[must_use]
pub fn with_retry_interval(&mut self, retry_interval: Duration) -> &mut Self {
pub fn with_retry_interval(mut self, retry_interval: Duration) -> Self {
self.retry_interval = retry_interval;
self
}

#[must_use]
pub fn with_block_confirmations(&mut self, block_confirmations: u64) -> &mut Self {
pub fn with_block_confirmations(mut self, block_confirmations: u64) -> Self {
self.block_confirmations = block_confirmations;
self
}
Expand All @@ -241,7 +241,7 @@ impl BlockScanner {
///
/// Returns an error if the connection fails
pub async fn connect_ws<N: Network>(
&self,
self,
ws_url: Url,
) -> TransportResult<ConnectedBlockScanner<N>> {
let provider =
Expand All @@ -263,7 +263,7 @@ impl BlockScanner {
///
/// Returns an error if the connection fails
pub async fn connect_ipc<N: Network>(
&self,
self,
ipc_path: String,
) -> TransportResult<ConnectedBlockScanner<N>> {
let provider = RootProvider::<N>::new(ClientBuilder::default().ipc(ipc_path.into()).await?);
Expand Down
59 changes: 40 additions & 19 deletions src/event_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
use alloy::{
eips::BlockNumberOrTag,
network::Network,
primitives::Address,
providers::Provider,
rpc::types::{Filter, Log},
transports::http::reqwest::Url,
Expand Down Expand Up @@ -38,38 +39,45 @@ impl EventScannerBuilder {
}
}

pub fn with_event_filter(&mut self, filter: EventFilter) -> &mut Self {
#[must_use]
pub fn with_event_filter(mut self, filter: EventFilter) -> Self {
self.tracked_events.push(filter);
self
}

pub fn with_event_filters(&mut self, filters: Vec<EventFilter>) -> &mut Self {
#[must_use]
pub fn with_event_filters(mut self, filters: Vec<EventFilter>) -> Self {
self.tracked_events.extend(filters);
self
}

pub fn with_callback_strategy(&mut self, strategy: Arc<dyn CallbackStrategy>) -> &mut Self {
#[must_use]
pub fn with_callback_strategy(mut self, strategy: Arc<dyn CallbackStrategy>) -> Self {
self.callback_strategy = strategy;
self
}

pub fn with_blocks_read_per_epoch(&mut self, blocks_read_per_epoch: usize) -> &mut Self {
let _ = self.block_scanner.with_blocks_read_per_epoch(blocks_read_per_epoch);
#[must_use]
pub fn with_blocks_read_per_epoch(mut self, blocks_read_per_epoch: usize) -> Self {
self.block_scanner = self.block_scanner.with_blocks_read_per_epoch(blocks_read_per_epoch);
self
}

pub fn with_reorg_rewind_depth(&mut self, reorg_rewind_depth: u64) -> &mut Self {
let _ = self.block_scanner.with_reorg_rewind_depth(reorg_rewind_depth);
#[must_use]
pub fn with_reorg_rewind_depth(mut self, reorg_rewind_depth: u64) -> Self {
self.block_scanner = self.block_scanner.with_reorg_rewind_depth(reorg_rewind_depth);
self
}

pub fn with_retry_interval(&mut self, retry_interval: Duration) -> &mut Self {
let _ = self.block_scanner.with_retry_interval(retry_interval);
#[must_use]
pub fn with_retry_interval(mut self, retry_interval: Duration) -> Self {
self.block_scanner = self.block_scanner.with_retry_interval(retry_interval);
self
}

pub fn with_block_confirmations(&mut self, block_confirmations: u64) -> &mut Self {
let _ = self.block_scanner.with_block_confirmations(block_confirmations);
#[must_use]
pub fn with_block_confirmations(mut self, block_confirmations: u64) -> Self {
self.block_scanner = self.block_scanner.with_block_confirmations(block_confirmations);
self
}

Expand Down Expand Up @@ -119,6 +127,12 @@ pub struct EventScanner<N: Network> {
callback_strategy: Arc<dyn CallbackStrategy>,
}

#[derive(Hash, Eq, PartialEq)]
struct EventIdentifier {
contract_address: Address,
event: String,
}

impl<N: Network> EventScanner<N> {
/// Starts the scanner
///
Expand All @@ -130,29 +144,31 @@ impl<N: Network> EventScanner<N> {
start_height: BlockNumberOrTag,
end_height: Option<BlockNumberOrTag>,
) -> anyhow::Result<()> {
let mut event_channels: HashMap<String, mpsc::Sender<Log>> = HashMap::new();
let mut event_channels: HashMap<EventIdentifier, mpsc::Sender<Log>> = HashMap::new();

for filter in &self.tracked_events {
let event_name = filter.event.clone();
let unique_event = EventIdentifier {
contract_address: filter.contract_address,
event: filter.event.clone(),
};

if event_channels.contains_key(&event_name) {
if event_channels.contains_key(&unique_event) {
continue;
}

// TODO: configurable buffer size / smaller buffer ?
let (sender, receiver) = mpsc::channel::<Log>(1024);

let event_name_clone = event_name.clone();
let callback = filter.callback.clone();
let strategy = self.callback_strategy.clone();
Self::spawn_event_callback_task_executors(
receiver,
callback,
strategy,
event_name_clone,
filter.event.clone(),
);

event_channels.insert(event_name, sender);
event_channels.insert(unique_event, sender);
}

let client = self.block_scanner.run()?;
Expand Down Expand Up @@ -199,7 +215,7 @@ impl<N: Network> EventScanner<N> {
&self,
from_block: u64,
to_block: u64,
event_channels: &HashMap<String, mpsc::Sender<Log>>,
event_channels: &HashMap<EventIdentifier, mpsc::Sender<Log>>,
) -> anyhow::Result<()> {
for event_filter in &self.tracked_events {
let filter = Filter::new()
Expand All @@ -222,7 +238,12 @@ impl<N: Network> EventScanner<N> {
"found logs for event in block range"
);

if let Some(sender) = event_channels.get(&event_filter.event) {
let event_identifier = EventIdentifier {
contract_address: event_filter.contract_address,
event: event_filter.event.clone(),
};

if let Some(sender) = event_channels.get(&event_identifier) {
for log in logs {
if let Err(e) = sender.send(log).await {
warn!(event = %event_filter.event, error = %e, "failed to enqueue log for processing");
Expand Down
105 changes: 105 additions & 0 deletions tests/common/mock_callbacks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use std::{
sync::{
Arc,
atomic::{AtomicU64, AtomicUsize, Ordering},
},
time::Duration,
};

use alloy::{rpc::types::Log, sol_types::SolEvent};
use async_trait::async_trait;
use event_scanner::EventCallback;
use tokio::{sync::Mutex, time::sleep};

use crate::common::TestCounter;

pub struct BasicCounterCallback {
pub count: Arc<AtomicUsize>,
}

#[async_trait]
impl EventCallback for BasicCounterCallback {
async fn on_event(&self, _log: &Log) -> anyhow::Result<()> {
self.count.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}

pub struct SlowProcessorCallback {
pub delay_ms: u64,
pub processed: Arc<AtomicUsize>,
}

#[async_trait]
impl EventCallback for SlowProcessorCallback {
async fn on_event(&self, _log: &Log) -> anyhow::Result<()> {
sleep(Duration::from_millis(self.delay_ms)).await;
self.processed.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}

/// A callback that fails `max_fail_times` attempts before succeeding once.
pub struct FlakyCallback {
pub attempts: Arc<AtomicUsize>,
pub successes: Arc<AtomicUsize>,
pub max_fail_times: usize,
}

#[async_trait]
impl EventCallback for FlakyCallback {
async fn on_event(&self, _log: &Log) -> anyhow::Result<()> {
let attempt = self.attempts.fetch_add(1, Ordering::SeqCst) + 1;
if attempt <= self.max_fail_times {
anyhow::bail!("intentional failure on attempt {attempt}");
}
self.successes.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}

// A callback that always fails and records attempts.
pub struct AlwaysFailingCallback {
pub attempts: Arc<AtomicU64>,
}

#[async_trait]
impl EventCallback for AlwaysFailingCallback {
async fn on_event(&self, _log: &Log) -> anyhow::Result<()> {
self.attempts.fetch_add(1, Ordering::SeqCst);
anyhow::bail!("always failing callback")
}
}

// Captures block numbers in the order they are processed.
pub struct BlockOrderingCallback {
pub blocks: Arc<Mutex<Vec<u64>>>,
}

#[async_trait]
impl EventCallback for BlockOrderingCallback {
async fn on_event(&self, log: &Log) -> anyhow::Result<()> {
let mut guard = self.blocks.lock().await;
if let Some(n) = log.block_number {
guard.push(n);
}
Ok(())
}
}

// Captures decoded CountIncreased `newCount` values to verify callback/event ordering.
pub struct EventOrderingCallback {
pub counts: Arc<Mutex<Vec<u64>>>,
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Loving the event callback approach so far, seems super convenient for testing

#[async_trait]
impl EventCallback for EventOrderingCallback {
async fn on_event(&self, log: &Log) -> anyhow::Result<()> {
if let Some(&TestCounter::CountIncreased::SIGNATURE_HASH) = log.topic0() {
let TestCounter::CountIncreased { newCount } = log.log_decode()?.inner.data;
let mut guard = self.counts.lock().await;
guard.push(newCount.try_into().unwrap());
}
Ok(())
}
}
49 changes: 9 additions & 40 deletions tests/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,7 @@
use std::{
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
time::Duration,
};

use alloy::{network::Ethereum, providers::ProviderBuilder, rpc::types::Log, sol};
use alloy::{network::Ethereum, providers::ProviderBuilder, sol};
use alloy_node_bindings::{Anvil, AnvilInstance};
use async_trait::async_trait;
use event_scanner::EventCallback;
use tokio::time::sleep;

pub mod mock_callbacks;

// Shared test contract used across integration tests
sol! {
Expand Down Expand Up @@ -39,36 +30,13 @@ sol! {
}
}

pub struct EventCounter {
pub count: Arc<AtomicUsize>,
}

#[async_trait]
impl EventCallback for EventCounter {
async fn on_event(&self, _log: &Log) -> anyhow::Result<()> {
self.count.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}

pub struct SlowProcessor {
pub delay_ms: u64,
pub processed: Arc<AtomicUsize>,
}

#[async_trait]
impl EventCallback for SlowProcessor {
async fn on_event(&self, _log: &Log) -> anyhow::Result<()> {
sleep(Duration::from_millis(self.delay_ms)).await;
self.processed.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}

pub fn spawn_anvil(block_time_secs: u64) -> anyhow::Result<AnvilInstance> {
Ok(Anvil::new().block_time(block_time_secs).try_spawn()?)
#[allow(clippy::missing_errors_doc)]
pub fn spawn_anvil(block_time_secs: f64) -> anyhow::Result<AnvilInstance> {
Ok(Anvil::new().block_time_f64(block_time_secs).try_spawn()?)
}

#[allow(clippy::missing_errors_doc)]
#[allow(clippy::missing_panics_doc)]
pub async fn build_provider(
anvil: &AnvilInstance,
) -> anyhow::Result<impl alloy::providers::Provider<Ethereum> + Clone> {
Expand All @@ -77,6 +45,7 @@ pub async fn build_provider(
Ok(provider)
}

#[allow(clippy::missing_errors_doc)]
pub async fn deploy_counter<P>(provider: P) -> anyhow::Result<TestCounter::TestCounterInstance<P>>
where
P: alloy::providers::Provider<Ethereum> + Clone,
Expand Down
Loading