diff --git a/examples/historical_scanning/main.rs b/examples/historical_scanning/main.rs index 630fe1dc..56f3e3a2 100644 --- a/examples/historical_scanning/main.rs +++ b/examples/historical_scanning/main.rs @@ -67,11 +67,10 @@ async fn main() -> anyhow::Result<()> { let _ = counter_contract.increase().send().await?.get_receipt().await?; - let mut builder = EventScannerBuilder::new(); - - builder.with_event_filter(increase_filter); - - let mut scanner = builder.connect_ws::(anvil.ws_endpoint_url()).await?; + let mut scanner = EventScannerBuilder::new() + .with_event_filter(increase_filter) + .connect_ws::(anvil.ws_endpoint_url()) + .await?; sleep(Duration::from_secs(10)).await; scanner.start(BlockNumberOrTag::Number(0), None).await.expect("failed to start scanner"); diff --git a/examples/simple_counter/main.rs b/examples/simple_counter/main.rs index b0472234..b39ed49f 100644 --- a/examples/simple_counter/main.rs +++ b/examples/simple_counter/main.rs @@ -65,14 +65,12 @@ async fn main() -> anyhow::Result<()> { callback: Arc::new(CounterCallback), }; - let mut builder = EventScannerBuilder::new(); - - builder.with_event_filter(increase_filter); - - let scanner = builder.connect_ws::(anvil.ws_endpoint_url()).await?; + let mut scanner = EventScannerBuilder::new() + .with_event_filter(increase_filter) + .connect_ws::(anvil.ws_endpoint_url()) + .await?; let task_1 = tokio::spawn(async move { - let mut scanner = scanner; scanner.start(BlockNumberOrTag::Latest, None).await.expect("failed to start scanner"); }); diff --git a/src/block_scanner.rs b/src/block_scanner.rs index 77e606d7..5e015636 100644 --- a/src/block_scanner.rs +++ b/src/block_scanner.rs @@ -212,25 +212,25 @@ impl BlockScanner { } #[must_use] - pub fn with_blocks_read_per_epoch(&mut self, blocks_read_per_epoch: usize) -> &mut Self { + pub fn with_blocks_read_per_epoch(mut self, blocks_read_per_epoch: usize) -> Self { self.blocks_read_per_epoch = blocks_read_per_epoch; self } #[must_use] - pub fn with_reorg_rewind_depth(&mut self, reorg_rewind_depth: u64) -> &mut Self { + pub fn with_reorg_rewind_depth(mut self, reorg_rewind_depth: u64) -> Self { self.reorg_rewind_depth = reorg_rewind_depth; self } #[must_use] - pub fn with_retry_interval(&mut self, retry_interval: Duration) -> &mut Self { + pub fn with_retry_interval(mut self, retry_interval: Duration) -> Self { self.retry_interval = retry_interval; self } #[must_use] - pub fn with_block_confirmations(&mut self, block_confirmations: u64) -> &mut Self { + pub fn with_block_confirmations(mut self, block_confirmations: u64) -> Self { self.block_confirmations = block_confirmations; self } @@ -241,7 +241,7 @@ impl BlockScanner { /// /// Returns an error if the connection fails pub async fn connect_ws( - &self, + self, ws_url: Url, ) -> TransportResult> { let provider = @@ -263,7 +263,7 @@ impl BlockScanner { /// /// Returns an error if the connection fails pub async fn connect_ipc( - &self, + self, ipc_path: String, ) -> TransportResult> { let provider = RootProvider::::new(ClientBuilder::default().ipc(ipc_path.into()).await?); diff --git a/src/event_scanner.rs b/src/event_scanner.rs index 1f9cc164..bdacb735 100644 --- a/src/event_scanner.rs +++ b/src/event_scanner.rs @@ -8,6 +8,7 @@ use crate::{ use alloy::{ eips::BlockNumberOrTag, network::Network, + primitives::Address, providers::Provider, rpc::types::{Filter, Log}, transports::http::reqwest::Url, @@ -38,38 +39,45 @@ impl EventScannerBuilder { } } - pub fn with_event_filter(&mut self, filter: EventFilter) -> &mut Self { + #[must_use] + pub fn with_event_filter(mut self, filter: EventFilter) -> Self { self.tracked_events.push(filter); self } - pub fn with_event_filters(&mut self, filters: Vec) -> &mut Self { + #[must_use] + pub fn with_event_filters(mut self, filters: Vec) -> Self { self.tracked_events.extend(filters); self } - pub fn with_callback_strategy(&mut self, strategy: Arc) -> &mut Self { + #[must_use] + pub fn with_callback_strategy(mut self, strategy: Arc) -> Self { self.callback_strategy = strategy; self } - pub fn with_blocks_read_per_epoch(&mut self, blocks_read_per_epoch: usize) -> &mut Self { - let _ = self.block_scanner.with_blocks_read_per_epoch(blocks_read_per_epoch); + #[must_use] + pub fn with_blocks_read_per_epoch(mut self, blocks_read_per_epoch: usize) -> Self { + self.block_scanner = self.block_scanner.with_blocks_read_per_epoch(blocks_read_per_epoch); self } - pub fn with_reorg_rewind_depth(&mut self, reorg_rewind_depth: u64) -> &mut Self { - let _ = self.block_scanner.with_reorg_rewind_depth(reorg_rewind_depth); + #[must_use] + pub fn with_reorg_rewind_depth(mut self, reorg_rewind_depth: u64) -> Self { + self.block_scanner = self.block_scanner.with_reorg_rewind_depth(reorg_rewind_depth); self } - pub fn with_retry_interval(&mut self, retry_interval: Duration) -> &mut Self { - let _ = self.block_scanner.with_retry_interval(retry_interval); + #[must_use] + pub fn with_retry_interval(mut self, retry_interval: Duration) -> Self { + self.block_scanner = self.block_scanner.with_retry_interval(retry_interval); self } - pub fn with_block_confirmations(&mut self, block_confirmations: u64) -> &mut Self { - let _ = self.block_scanner.with_block_confirmations(block_confirmations); + #[must_use] + pub fn with_block_confirmations(mut self, block_confirmations: u64) -> Self { + self.block_scanner = self.block_scanner.with_block_confirmations(block_confirmations); self } @@ -119,6 +127,12 @@ pub struct EventScanner { callback_strategy: Arc, } +#[derive(Hash, Eq, PartialEq)] +struct EventIdentifier { + contract_address: Address, + event: String, +} + impl EventScanner { /// Starts the scanner /// @@ -130,29 +144,31 @@ impl EventScanner { start_height: BlockNumberOrTag, end_height: Option, ) -> anyhow::Result<()> { - let mut event_channels: HashMap> = HashMap::new(); + let mut event_channels: HashMap> = HashMap::new(); for filter in &self.tracked_events { - let event_name = filter.event.clone(); + let unique_event = EventIdentifier { + contract_address: filter.contract_address, + event: filter.event.clone(), + }; - if event_channels.contains_key(&event_name) { + if event_channels.contains_key(&unique_event) { continue; } // TODO: configurable buffer size / smaller buffer ? let (sender, receiver) = mpsc::channel::(1024); - let event_name_clone = event_name.clone(); let callback = filter.callback.clone(); let strategy = self.callback_strategy.clone(); Self::spawn_event_callback_task_executors( receiver, callback, strategy, - event_name_clone, + filter.event.clone(), ); - event_channels.insert(event_name, sender); + event_channels.insert(unique_event, sender); } let client = self.block_scanner.run()?; @@ -199,7 +215,7 @@ impl EventScanner { &self, from_block: u64, to_block: u64, - event_channels: &HashMap>, + event_channels: &HashMap>, ) -> anyhow::Result<()> { for event_filter in &self.tracked_events { let filter = Filter::new() @@ -222,7 +238,12 @@ impl EventScanner { "found logs for event in block range" ); - if let Some(sender) = event_channels.get(&event_filter.event) { + let event_identifier = EventIdentifier { + contract_address: event_filter.contract_address, + event: event_filter.event.clone(), + }; + + if let Some(sender) = event_channels.get(&event_identifier) { for log in logs { if let Err(e) = sender.send(log).await { warn!(event = %event_filter.event, error = %e, "failed to enqueue log for processing"); diff --git a/tests/common/mock_callbacks.rs b/tests/common/mock_callbacks.rs new file mode 100644 index 00000000..ad446711 --- /dev/null +++ b/tests/common/mock_callbacks.rs @@ -0,0 +1,105 @@ +use std::{ + sync::{ + Arc, + atomic::{AtomicU64, AtomicUsize, Ordering}, + }, + time::Duration, +}; + +use alloy::{rpc::types::Log, sol_types::SolEvent}; +use async_trait::async_trait; +use event_scanner::EventCallback; +use tokio::{sync::Mutex, time::sleep}; + +use crate::common::TestCounter; + +pub struct BasicCounterCallback { + pub count: Arc, +} + +#[async_trait] +impl EventCallback for BasicCounterCallback { + async fn on_event(&self, _log: &Log) -> anyhow::Result<()> { + self.count.fetch_add(1, Ordering::SeqCst); + Ok(()) + } +} + +pub struct SlowProcessorCallback { + pub delay_ms: u64, + pub processed: Arc, +} + +#[async_trait] +impl EventCallback for SlowProcessorCallback { + async fn on_event(&self, _log: &Log) -> anyhow::Result<()> { + sleep(Duration::from_millis(self.delay_ms)).await; + self.processed.fetch_add(1, Ordering::SeqCst); + Ok(()) + } +} + +/// A callback that fails `max_fail_times` attempts before succeeding once. +pub struct FlakyCallback { + pub attempts: Arc, + pub successes: Arc, + pub max_fail_times: usize, +} + +#[async_trait] +impl EventCallback for FlakyCallback { + async fn on_event(&self, _log: &Log) -> anyhow::Result<()> { + let attempt = self.attempts.fetch_add(1, Ordering::SeqCst) + 1; + if attempt <= self.max_fail_times { + anyhow::bail!("intentional failure on attempt {attempt}"); + } + self.successes.fetch_add(1, Ordering::SeqCst); + Ok(()) + } +} + +// A callback that always fails and records attempts. +pub struct AlwaysFailingCallback { + pub attempts: Arc, +} + +#[async_trait] +impl EventCallback for AlwaysFailingCallback { + async fn on_event(&self, _log: &Log) -> anyhow::Result<()> { + self.attempts.fetch_add(1, Ordering::SeqCst); + anyhow::bail!("always failing callback") + } +} + +// Captures block numbers in the order they are processed. +pub struct BlockOrderingCallback { + pub blocks: Arc>>, +} + +#[async_trait] +impl EventCallback for BlockOrderingCallback { + async fn on_event(&self, log: &Log) -> anyhow::Result<()> { + let mut guard = self.blocks.lock().await; + if let Some(n) = log.block_number { + guard.push(n); + } + Ok(()) + } +} + +// Captures decoded CountIncreased `newCount` values to verify callback/event ordering. +pub struct EventOrderingCallback { + pub counts: Arc>>, +} + +#[async_trait] +impl EventCallback for EventOrderingCallback { + async fn on_event(&self, log: &Log) -> anyhow::Result<()> { + if let Some(&TestCounter::CountIncreased::SIGNATURE_HASH) = log.topic0() { + let TestCounter::CountIncreased { newCount } = log.log_decode()?.inner.data; + let mut guard = self.counts.lock().await; + guard.push(newCount.try_into().unwrap()); + } + Ok(()) + } +} diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 901af214..cae700b1 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -1,16 +1,7 @@ -use std::{ - sync::{ - Arc, - atomic::{AtomicUsize, Ordering}, - }, - time::Duration, -}; - -use alloy::{network::Ethereum, providers::ProviderBuilder, rpc::types::Log, sol}; +use alloy::{network::Ethereum, providers::ProviderBuilder, sol}; use alloy_node_bindings::{Anvil, AnvilInstance}; -use async_trait::async_trait; -use event_scanner::EventCallback; -use tokio::time::sleep; + +pub mod mock_callbacks; // Shared test contract used across integration tests sol! { @@ -39,36 +30,13 @@ sol! { } } -pub struct EventCounter { - pub count: Arc, -} - -#[async_trait] -impl EventCallback for EventCounter { - async fn on_event(&self, _log: &Log) -> anyhow::Result<()> { - self.count.fetch_add(1, Ordering::SeqCst); - Ok(()) - } -} - -pub struct SlowProcessor { - pub delay_ms: u64, - pub processed: Arc, -} - -#[async_trait] -impl EventCallback for SlowProcessor { - async fn on_event(&self, _log: &Log) -> anyhow::Result<()> { - sleep(Duration::from_millis(self.delay_ms)).await; - self.processed.fetch_add(1, Ordering::SeqCst); - Ok(()) - } -} - -pub fn spawn_anvil(block_time_secs: u64) -> anyhow::Result { - Ok(Anvil::new().block_time(block_time_secs).try_spawn()?) +#[allow(clippy::missing_errors_doc)] +pub fn spawn_anvil(block_time_secs: f64) -> anyhow::Result { + Ok(Anvil::new().block_time_f64(block_time_secs).try_spawn()?) } +#[allow(clippy::missing_errors_doc)] +#[allow(clippy::missing_panics_doc)] pub async fn build_provider( anvil: &AnvilInstance, ) -> anyhow::Result + Clone> { @@ -77,6 +45,7 @@ pub async fn build_provider( Ok(provider) } +#[allow(clippy::missing_errors_doc)] pub async fn deploy_counter

(provider: P) -> anyhow::Result> where P: alloy::providers::Provider + Clone, diff --git a/tests/historic_mode/basic.rs b/tests/historic_mode/basic.rs new file mode 100644 index 00000000..f31e5281 --- /dev/null +++ b/tests/historic_mode/basic.rs @@ -0,0 +1,68 @@ +use std::{ + sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }, + time::Duration, +}; + +use alloy::{eips::BlockNumberOrTag, network::Ethereum, sol_types::SolEvent}; +use event_scanner::{event_scanner::EventScannerBuilder, types::EventFilter}; +use tokio::time::{sleep, timeout}; + +use crate::{ + common::{TestCounter, build_provider, deploy_counter, spawn_anvil}, + mock_callbacks::BasicCounterCallback, +}; + +#[tokio::test] +async fn processes_events_within_specified_historical_range() -> anyhow::Result<()> { + let anvil = spawn_anvil(0.1)?; + let provider = build_provider(&anvil).await?; + let contract = deploy_counter(provider.clone()).await?; + let contract_address = *contract.address(); + + let event_count = Arc::new(AtomicUsize::new(0)); + let callback = Arc::new(BasicCounterCallback { count: Arc::clone(&event_count) }); + + let filter = EventFilter { + contract_address, + event: TestCounter::CountIncreased::SIGNATURE.to_owned(), + callback, + }; + + let receipt = contract.increase().send().await?.get_receipt().await?; + let start_block = receipt.block_number.expect("receipt should contain block number"); + let mut end_block = 0; + + let expected_event_count = 4; + + for _ in 1..expected_event_count { + let receipt = contract.increase().send().await?.get_receipt().await?; + end_block = receipt.block_number.expect("receipt should contain block number"); + } + + let mut scanner = EventScannerBuilder::new() + .with_event_filter(filter) + .connect_ws::(anvil.ws_endpoint_url()) + .await?; + + tokio::spawn(async move { + scanner + .start(BlockNumberOrTag::Number(start_block), Some(BlockNumberOrTag::Number(end_block))) + .await + }); + + let event_count_clone = Arc::clone(&event_count); + let event_counting = async move { + while event_count_clone.load(Ordering::SeqCst) < 4 { + sleep(Duration::from_millis(100)).await; + } + }; + + if timeout(Duration::from_secs(3), event_counting).await.is_err() { + assert_eq!(event_count.load(Ordering::SeqCst), expected_event_count); + } + + Ok(()) +} diff --git a/tests/historic_mode/mod.rs b/tests/historic_mode/mod.rs new file mode 100644 index 00000000..38883ee0 --- /dev/null +++ b/tests/historic_mode/mod.rs @@ -0,0 +1 @@ +pub mod basic; diff --git a/tests/historic_to_live/basic.rs b/tests/historic_to_live/basic.rs new file mode 100644 index 00000000..e8a0c433 --- /dev/null +++ b/tests/historic_to_live/basic.rs @@ -0,0 +1,74 @@ +use std::sync::Arc; + +use alloy::{eips::BlockNumberOrTag, network::Ethereum, sol_types::SolEvent}; +use event_scanner::{event_scanner::EventScannerBuilder, types::EventFilter}; +use tokio::time::{Duration, sleep, timeout}; + +use crate::{ + common::{TestCounter, build_provider, deploy_counter, spawn_anvil}, + mock_callbacks::EventOrderingCallback, +}; + +#[tokio::test] +async fn replays_historical_then_switches_to_live() -> anyhow::Result<()> { + let anvil = spawn_anvil(0.1)?; + let provider = build_provider(&anvil).await?; + let contract = deploy_counter(provider).await?; + let contract_address = *contract.address(); + + let historical_events = 3; + let live_events = 2; + + let receipt = contract.increase().send().await?.get_receipt().await?; + let first_historical_block = + receipt.block_number.expect("historical receipt should contain block number"); + + for _ in 1..historical_events { + contract.increase().send().await?.watch().await?; + } + + let event_new_counts = Arc::new(tokio::sync::Mutex::new(Vec::::new())); + let callback = Arc::new(EventOrderingCallback { counts: Arc::clone(&event_new_counts) }); + + let filter = EventFilter { + contract_address, + event: TestCounter::CountIncreased::SIGNATURE.to_owned(), + callback, + }; + + let mut scanner = EventScannerBuilder::new() + .with_event_filter(filter) + .connect_ws::(anvil.ws_endpoint_url()) + .await?; + + tokio::spawn(async move { + scanner.start(BlockNumberOrTag::Number(first_historical_block), None).await + }); + + sleep(Duration::from_millis(200)).await; + + for _ in 0..live_events { + contract.increase().send().await?.watch().await?; + } + + let event_new_counts_clone = Arc::clone(&event_new_counts); + let event_counting = async move { + while event_new_counts_clone.lock().await.len() < historical_events + live_events { + sleep(Duration::from_millis(100)).await; + } + }; + + if timeout(Duration::from_secs(1), event_counting).await.is_err() { + assert_eq!(event_new_counts.lock().await.len(), historical_events + live_events); + } + + let event_new_counts = event_new_counts.lock().await; + + let mut expected_new_count = 1; + for &new_count in event_new_counts.iter() { + assert_eq!(new_count, expected_new_count); + expected_new_count += 1; + } + + Ok(()) +} diff --git a/tests/historic_to_live/mod.rs b/tests/historic_to_live/mod.rs new file mode 100644 index 00000000..38883ee0 --- /dev/null +++ b/tests/historic_to_live/mod.rs @@ -0,0 +1 @@ +pub mod basic; diff --git a/tests/lib.rs b/tests/lib.rs new file mode 100644 index 00000000..659210a9 --- /dev/null +++ b/tests/lib.rs @@ -0,0 +1,6 @@ +pub mod common; +pub mod historic_mode; +pub mod historic_to_live; +pub mod live_mode; + +pub use common::mock_callbacks; diff --git a/tests/live_mode/basic.rs b/tests/live_mode/basic.rs new file mode 100644 index 00000000..e9d7e32e --- /dev/null +++ b/tests/live_mode/basic.rs @@ -0,0 +1,255 @@ +use std::{ + sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }, + time::Duration, +}; + +use crate::{ + common::{TestCounter, build_provider, deploy_counter, spawn_anvil}, + mock_callbacks::BasicCounterCallback, +}; +use alloy::{eips::BlockNumberOrTag, network::Ethereum, sol_types::SolEvent}; +use event_scanner::{event_scanner::EventScannerBuilder, types::EventFilter}; +use tokio::time::{sleep, timeout}; + +#[tokio::test] +async fn basic_single_event_scanning() -> anyhow::Result<()> { + let anvil = spawn_anvil(0.1)?; + let provider = build_provider(&anvil).await?; + let contract = deploy_counter(provider.clone()).await?; + let contract_address = *contract.address(); + + let event_count = Arc::new(AtomicUsize::new(0)); + let callback = Arc::new(BasicCounterCallback { count: Arc::clone(&event_count) }); + + let filter = EventFilter { + contract_address, + event: TestCounter::CountIncreased::SIGNATURE.to_owned(), + callback, + }; + + let mut scanner = EventScannerBuilder::new() + .with_event_filter(filter) + .connect_ws::(anvil.ws_endpoint_url()) + .await?; + + tokio::spawn(async move { scanner.start(BlockNumberOrTag::Latest, None).await }); + + let expected_event_count = 5; + + for _ in 0..expected_event_count { + contract.increase().send().await?.watch().await?; + } + + let event_count_clone = Arc::clone(&event_count); + let event_counting = async move { + while event_count_clone.load(Ordering::SeqCst) < expected_event_count { + sleep(Duration::from_millis(100)).await; + } + }; + + if timeout(Duration::from_secs(1), event_counting).await.is_err() { + assert_eq!(event_count.load(Ordering::SeqCst), expected_event_count); + } + + Ok(()) +} + +#[tokio::test] +async fn multiple_contracts_same_event_isolate_callbacks() -> anyhow::Result<()> { + let anvil = spawn_anvil(0.1)?; + let provider = build_provider(&anvil).await?; + let a = deploy_counter(provider.clone()).await?; + let b = deploy_counter(provider.clone()).await?; + + let a_count = Arc::new(AtomicUsize::new(0)); + let b_count = Arc::new(AtomicUsize::new(0)); + let a_cb = Arc::new(BasicCounterCallback { count: Arc::clone(&a_count) }); + let b_cb = Arc::new(BasicCounterCallback { count: Arc::clone(&b_count) }); + + let a_filter = EventFilter { + contract_address: *a.address(), + event: TestCounter::CountIncreased::SIGNATURE.to_owned(), + callback: a_cb, + }; + let b_filter = EventFilter { + contract_address: *b.address(), + event: TestCounter::CountIncreased::SIGNATURE.to_owned(), + callback: b_cb, + }; + + let mut scanner = EventScannerBuilder::new() + .with_event_filters(vec![a_filter, b_filter]) + .connect_ws::(anvil.ws_endpoint_url()) + .await?; + + tokio::spawn(async move { scanner.start(BlockNumberOrTag::Latest, None).await }); + + let expected_events_a = 3; + let expected_events_b = 2; + + for _ in 0..expected_events_a { + a.increase().send().await?.watch().await?; + } + + for _ in 0..expected_events_b { + b.increase().send().await?.watch().await?; + } + + let a_count_clone = Arc::clone(&a_count); + let b_count_clone = Arc::clone(&b_count); + let event_counting = async move { + while a_count_clone.load(Ordering::SeqCst) < expected_events_a || + b_count_clone.load(Ordering::SeqCst) < expected_events_b + { + sleep(Duration::from_millis(100)).await; + } + }; + + if timeout(Duration::from_secs(1), event_counting).await.is_err() { + assert_eq!(a_count.load(Ordering::SeqCst), expected_events_a); + assert_eq!(b_count.load(Ordering::SeqCst), expected_events_b); + } + + Ok(()) +} + +#[tokio::test] +async fn multiple_events_same_contract() -> anyhow::Result<()> { + let anvil = spawn_anvil(0.1)?; + let provider = build_provider(&anvil).await?; + let contract = deploy_counter(provider).await?; + let contract_address = *contract.address(); + + let increase_count = Arc::new(AtomicUsize::new(0)); + let decrease_count = Arc::new(AtomicUsize::new(0)); + let increase_cb = Arc::new(BasicCounterCallback { count: Arc::clone(&increase_count) }); + let decrease_cb = Arc::new(BasicCounterCallback { count: Arc::clone(&decrease_count) }); + + let increase_filter = EventFilter { + contract_address, + event: TestCounter::CountIncreased::SIGNATURE.to_owned(), + callback: increase_cb, + }; + let decrease_filter = EventFilter { + contract_address, + event: TestCounter::CountDecreased::SIGNATURE.to_owned(), + callback: decrease_cb, + }; + + let mut scanner = EventScannerBuilder::new() + .with_event_filters(vec![increase_filter, decrease_filter]) + .connect_ws::(anvil.ws_endpoint_url()) + .await?; + + tokio::spawn(async move { scanner.start(BlockNumberOrTag::Latest, None).await }); + + let expected_incr_events = 6; + let expected_decr_events = 2; + + for _ in 0..expected_incr_events { + contract.increase().send().await?.watch().await?; + } + + contract.decrease().send().await?.watch().await?; + contract.decrease().send().await?.watch().await?; + + let increase_count_clone = Arc::clone(&increase_count); + let decrease_count_clone = Arc::clone(&decrease_count); + let event_counting = async move { + while increase_count_clone.load(Ordering::SeqCst) < expected_incr_events || + decrease_count_clone.load(Ordering::SeqCst) < expected_decr_events + { + sleep(Duration::from_millis(100)).await; + } + }; + + if timeout(Duration::from_secs(2), event_counting).await.is_err() { + assert_eq!(increase_count.load(Ordering::SeqCst), expected_incr_events); + assert_eq!(decrease_count.load(Ordering::SeqCst), expected_decr_events); + } + + Ok(()) +} + +#[tokio::test] +async fn signature_matching_ignores_irrelevant_events() -> anyhow::Result<()> { + let anvil = spawn_anvil(0.1)?; + let provider = build_provider(&anvil).await?; + let contract = deploy_counter(provider).await?; + + let event_count = Arc::new(AtomicUsize::new(0)); + let callback = Arc::new(BasicCounterCallback { count: Arc::clone(&event_count) }); + + // Subscribe to CountDecreased but only emit CountIncreased + let filter = EventFilter { + contract_address: *contract.address(), + event: TestCounter::CountDecreased::SIGNATURE.to_owned(), + callback, + }; + + let mut scanner = EventScannerBuilder::new() + .with_event_filter(filter) + .connect_ws::(anvil.ws_endpoint_url()) + .await?; + + tokio::spawn(async move { scanner.start(BlockNumberOrTag::Latest, None).await }); + + for _ in 0..3 { + contract.increase().send().await?.watch().await?; + } + + let event_count_clone = Arc::clone(&event_count); + let event_counting = async move { + while event_count_clone.load(Ordering::SeqCst) == 0 { + sleep(Duration::from_millis(100)).await; + } + }; + + if timeout(Duration::from_secs(1), event_counting).await.is_ok() { + anyhow::bail!("scanner should have ignored all of the emitted events"); + } + + Ok(()) +} + +#[tokio::test] +async fn live_filters_malformed_signature_graceful() -> anyhow::Result<()> { + let anvil = spawn_anvil(0.1)?; + let provider = build_provider(&anvil).await?; + let contract = deploy_counter(provider).await?; + + let event_count = Arc::new(AtomicUsize::new(0)); + let callback = Arc::new(BasicCounterCallback { count: Arc::clone(&event_count) }); + let filter = EventFilter { + contract_address: *contract.address(), + event: "invalid-sig".to_string(), + callback, + }; + + let mut scanner = EventScannerBuilder::new() + .with_event_filter(filter) + .connect_ws::(anvil.ws_endpoint_url()) + .await?; + + tokio::spawn(async move { scanner.start(BlockNumberOrTag::Latest, None).await }); + + for _ in 0..3 { + contract.increase().send().await?.watch().await?; + } + + let event_count_clone = Arc::clone(&event_count); + let event_counting = async move { + while event_count_clone.load(Ordering::SeqCst) == 0 { + sleep(Duration::from_millis(100)).await; + } + }; + + if timeout(Duration::from_secs(1), event_counting).await.is_ok() { + anyhow::bail!("scanner should have ignored all of the emitted events"); + } + + Ok(()) +} diff --git a/tests/live_mode/callbacks.rs b/tests/live_mode/callbacks.rs new file mode 100644 index 00000000..977418ba --- /dev/null +++ b/tests/live_mode/callbacks.rs @@ -0,0 +1,160 @@ +use std::{ + sync::{ + Arc, + atomic::{AtomicU64, AtomicUsize, Ordering}, + }, + time::Duration, +}; + +use crate::{ + common::{TestCounter, build_provider, deploy_counter, spawn_anvil}, + mock_callbacks::{AlwaysFailingCallback, FlakyCallback, SlowProcessorCallback}, +}; +use alloy::{eips::BlockNumberOrTag, network::Ethereum, sol_types::SolEvent}; +use event_scanner::{ + CallbackStrategy, EventFilter, + callback_strategy::{FixedRetryConfig, FixedRetryStrategy}, + event_scanner::EventScannerBuilder, +}; +use tokio::time::{sleep, timeout}; + +#[tokio::test] +async fn callbacks_slow_processing_does_not_drop_events() -> anyhow::Result<()> { + let anvil = spawn_anvil(0.1)?; + let provider = build_provider(&anvil).await?; + let contract = deploy_counter(provider.clone()).await?; + let contract_address = *contract.address(); + + let processed = Arc::new(AtomicUsize::new(0)); + let callback = + Arc::new(SlowProcessorCallback { delay_ms: 100, processed: Arc::clone(&processed) }); + + let filter = EventFilter { + contract_address, + event: TestCounter::CountIncreased::SIGNATURE.to_owned(), + callback, + }; + let mut scanner = EventScannerBuilder::new() + .with_event_filter(filter) + .connect_ws::(anvil.ws_endpoint_url()) + .await?; + + tokio::spawn(async move { scanner.start(BlockNumberOrTag::Latest, None).await }); + + let expected_event_count = 3; + + for _ in 0..expected_event_count { + // emits faster than processing to simulate backlog + contract.increase().send().await?.watch().await?; + } + + let processed_clone = Arc::clone(&processed); + let event_counting = async move { + while processed_clone.load(Ordering::SeqCst) < expected_event_count { + sleep(Duration::from_millis(100)).await; + } + }; + + if timeout(Duration::from_secs(1), event_counting).await.is_err() { + assert_eq!(processed.load(Ordering::SeqCst), expected_event_count); + } + + Ok(()) +} + +#[tokio::test] +async fn callbacks_failure_then_retry_success() -> anyhow::Result<()> { + let anvil = spawn_anvil(0.1)?; + let provider = build_provider(&anvil).await?; + let contract = deploy_counter(provider).await?; + + let attempts = Arc::new(AtomicUsize::new(0)); + let successes = Arc::new(AtomicUsize::new(0)); + let callback = Arc::new(FlakyCallback { + attempts: attempts.clone(), + successes: successes.clone(), + max_fail_times: 2, + }); + + let filter = EventFilter { + contract_address: *contract.address(), + event: TestCounter::CountIncreased::SIGNATURE.to_owned(), + callback, + }; + let cfg = FixedRetryConfig { max_attempts: 3, delay_ms: 50 }; + + let strategy: Arc = Arc::new(FixedRetryStrategy::new(cfg)); + + let mut scanner = EventScannerBuilder::new() + .with_event_filter(filter) + .with_callback_strategy(strategy) + .connect_ws::(anvil.ws_endpoint_url()) + .await?; + + tokio::spawn(async move { scanner.start(BlockNumberOrTag::Latest, None).await }); + + contract.increase().send().await?.watch().await?; + + let expected_attempts = 3; + let expected_successes = 1; + + let attempts_clone = Arc::clone(&attempts); + let successes_clone = Arc::clone(&successes); + let attempt_counting = async move { + while attempts_clone.load(Ordering::SeqCst) < expected_attempts || + successes_clone.load(Ordering::SeqCst) < expected_successes + { + sleep(Duration::from_millis(100)).await; + } + }; + + if timeout(Duration::from_secs(1), attempt_counting).await.is_err() { + assert_eq!(attempts.load(Ordering::SeqCst), expected_attempts); + assert_eq!(successes.load(Ordering::SeqCst), expected_successes); + } + + Ok(()) +} + +#[tokio::test] +async fn callbacks_always_failing_respects_max_attempts() -> anyhow::Result<()> { + let anvil = spawn_anvil(0.1)?; + let provider = build_provider(&anvil).await?; + let contract = deploy_counter(provider).await?; + + let expected_attempts = 2; + let attempts = Arc::new(AtomicU64::new(0)); + let callback = Arc::new(AlwaysFailingCallback { attempts: Arc::clone(&attempts) }); + + let filter = EventFilter { + contract_address: *contract.address(), + event: TestCounter::CountIncreased::SIGNATURE.to_owned(), + callback, + }; + let cfg = FixedRetryConfig { max_attempts: expected_attempts, delay_ms: 20 }; + + let strategy: Arc = Arc::new(FixedRetryStrategy::new(cfg)); + + let mut scanner = EventScannerBuilder::new() + .with_event_filter(filter) + .with_callback_strategy(strategy) + .connect_ws::(anvil.ws_endpoint_url()) + .await?; + + tokio::spawn(async move { scanner.start(BlockNumberOrTag::Latest, None).await }); + + contract.increase().send().await?.watch().await?; + + let attempts_clone = Arc::clone(&attempts); + let attempt_counting = async move { + while attempts_clone.load(Ordering::SeqCst) < expected_attempts { + sleep(Duration::from_millis(100)).await; + } + }; + + if timeout(Duration::from_secs(1), attempt_counting).await.is_err() { + assert_eq!(attempts.load(Ordering::SeqCst), expected_attempts); + } + + Ok(()) +} diff --git a/tests/live_mode/mod.rs b/tests/live_mode/mod.rs new file mode 100644 index 00000000..8ee93bac --- /dev/null +++ b/tests/live_mode/mod.rs @@ -0,0 +1,4 @@ +pub mod basic; +pub mod callbacks; +pub mod ordering; +pub mod performance; diff --git a/tests/live_mode/ordering.rs b/tests/live_mode/ordering.rs new file mode 100644 index 00000000..038d99e3 --- /dev/null +++ b/tests/live_mode/ordering.rs @@ -0,0 +1,104 @@ +use std::{sync::Arc, time::Duration}; + +use crate::{ + common, + mock_callbacks::{BlockOrderingCallback, EventOrderingCallback}, +}; +use alloy::{eips::BlockNumberOrTag, network::Ethereum, sol_types::SolEvent}; +use common::{TestCounter, build_provider, deploy_counter, spawn_anvil}; +use event_scanner::{event_scanner::EventScannerBuilder, types::EventFilter}; +use tokio::time::{sleep, timeout}; + +#[tokio::test] +async fn callback_occurs_in_order() -> anyhow::Result<()> { + let anvil = spawn_anvil(0.1)?; + let provider = build_provider(&anvil).await?; + let contract = deploy_counter(provider).await?; + + let counts = Arc::new(tokio::sync::Mutex::new(Vec::::new())); + let callback = Arc::new(EventOrderingCallback { counts: Arc::clone(&counts) }); + + let filter = EventFilter { + contract_address: *contract.address(), + event: TestCounter::CountIncreased::SIGNATURE.to_owned(), + callback, + }; + let mut scanner = EventScannerBuilder::new() + .with_event_filter(filter) + .connect_ws::(anvil.ws_endpoint_url()) + .await?; + + tokio::spawn(async move { scanner.start(BlockNumberOrTag::Latest, None).await }); + + for _ in 0..5 { + contract.increase().send().await?.watch().await?; + } + + let expected: Vec = (1..=5).collect(); + let expected_clone = expected.clone(); + let counts_clone = Arc::clone(&counts); + + let event_counting = async move { + while *counts_clone.lock().await != expected_clone { + sleep(Duration::from_millis(100)).await; + } + }; + + if timeout(Duration::from_secs(1), event_counting).await.is_err() { + anyhow::bail!( + "callback ordering mismatch counts, expected: {expected:?}: {:?}", + *counts.lock().await + ); + } + + Ok(()) +} + +#[tokio::test] +async fn blocks_and_events_arrive_in_order() -> anyhow::Result<()> { + let anvil = spawn_anvil(0.1)?; + let provider = build_provider(&anvil).await?; + let contract = deploy_counter(provider.clone()).await?; + + let blocks = Arc::new(tokio::sync::Mutex::new(Vec::::new())); + let callback = Arc::new(BlockOrderingCallback { blocks: Arc::clone(&blocks) }); + + let filter = EventFilter { + contract_address: *contract.address(), + event: TestCounter::CountIncreased::SIGNATURE.to_owned(), + callback, + }; + let mut scanner = EventScannerBuilder::new() + .with_event_filter(filter) + .connect_ws::(anvil.ws_endpoint_url()) + .await?; + + tokio::spawn(async move { scanner.start(BlockNumberOrTag::Latest, None).await }); + + let expected_event_count = 5; + + for _ in 0..expected_event_count { + let _pending = contract.increase().send().await?; + sleep(Duration::from_millis(200)).await; + } + + let blocks_clone = Arc::clone(&blocks); + + let event_counting = async move { + while blocks_clone.lock().await.len() < expected_event_count { + sleep(Duration::from_millis(100)).await; + } + }; + + if timeout(Duration::from_secs(1), event_counting).await.is_err() { + anyhow::bail!("expected {expected_event_count} events, got {}", blocks.lock().await.len()); + } + + let data = blocks.lock().await.clone(); + assert!( + data.windows(2).all(|w| w[0] <= w[1]), + "block numbers must be non-decreasing: {data:?}", + ); + + Ok(()) +} diff --git a/tests/live_mode/performance.rs b/tests/live_mode/performance.rs new file mode 100644 index 00000000..0864f81d --- /dev/null +++ b/tests/live_mode/performance.rs @@ -0,0 +1,56 @@ +use std::{ + sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }, + time::Duration, +}; + +use alloy::{eips::BlockNumberOrTag, network::Ethereum, sol_types::SolEvent}; +use event_scanner::{event_scanner::EventScannerBuilder, types::EventFilter}; +use tokio::time::{sleep, timeout}; + +use crate::{ + common::{TestCounter, build_provider, deploy_counter, spawn_anvil}, + mock_callbacks::BasicCounterCallback, +}; + +#[tokio::test] +async fn high_event_volume_no_loss() -> anyhow::Result<()> { + let anvil = spawn_anvil(0.05)?; + let provider = build_provider(&anvil).await?; + let contract = deploy_counter(provider).await?; + + let event_count = Arc::new(AtomicUsize::new(0)); + let callback = Arc::new(BasicCounterCallback { count: Arc::clone(&event_count) }); + let filter = EventFilter { + contract_address: *contract.address(), + event: TestCounter::CountIncreased::SIGNATURE.to_owned(), + callback, + }; + + let mut scanner = EventScannerBuilder::new() + .with_event_filter(filter) + .connect_ws::(anvil.ws_endpoint_url()) + .await?; + tokio::spawn(async move { scanner.start(BlockNumberOrTag::Latest, None).await }); + + let expected_event_count = 100; + + for _ in 0..expected_event_count { + contract.increase().send().await?.watch().await?; + } + + let event_count_clone = Arc::clone(&event_count); + let event_counting = async move { + while event_count_clone.load(Ordering::SeqCst) < expected_event_count { + sleep(Duration::from_millis(100)).await; + } + }; + + if timeout(Duration::from_secs(60), event_counting).await.is_err() { + assert_eq!(event_count.load(Ordering::SeqCst), expected_event_count); + } + + Ok(()) +}