Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 195 additions & 0 deletions tests/live_scanning_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
use std::{
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
time::Duration,
};

use alloy::{providers::ProviderBuilder, rpc::types::Log, sol, sol_types::SolEvent};
use alloy_node_bindings::Anvil;
use async_trait::async_trait;
use event_scanner::{EventCallback, EventFilter, ScannerBuilder};
use tokio::time::sleep;

sol! {
#[allow(missing_docs)]
#[sol(rpc, bytecode="608080604052346015576101b0908161001a8239f35b5f80fdfe6080806040526004361015610012575f80fd5b5f3560e01c90816306661abd1461016157508063a87d942c14610145578063d732d955146100ad5763e8927fbc14610048575f80fd5b346100a9575f3660031901126100a9575f5460018101809111610095576020817f7ca2ca9527391044455246730762df008a6b47bbdb5d37a890ef78394535c040925f55604051908152a1005b634e487b7160e01b5f52601160045260245ffd5b5f80fd5b346100a9575f3660031901126100a9575f548015610100575f198101908111610095576020817f53a71f16f53e57416424d0d18ccbd98504d42a6f98fe47b09772d8f357c620ce925f55604051908152a1005b60405162461bcd60e51b815260206004820152601860248201527f436f756e742063616e6e6f74206265206e6567617469766500000000000000006044820152606490fd5b346100a9575f3660031901126100a95760205f54604051908152f35b346100a9575f3660031901126100a9576020905f548152f3fea2646970667358221220b846b706f79f5ae1fc4a4238319e723a092f47ce4051404186424739164ab02264736f6c634300081e0033")]
contract LiveTestCounter {
uint256 public count;

event CountIncreased(uint256 newCount);
event CountDecreased(uint256 newCount);

function increase() public {
count += 1;
emit CountIncreased(count);
}

function decrease() public {
require(count > 0, "Count cannot be negative");
count -= 1;
emit CountDecreased(count);
}

function getCount() public view returns (uint256) {
return count;
}
}
}

struct EventCounter {
pub count: Arc<AtomicUsize>,
}

#[async_trait]
impl EventCallback for EventCounter {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually love the callback approach you took with EventCallback - instead of storing the callback itself, you store a callback struct type that implements this callback trait 👍

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes thought it was simpler glad you like it 😄

async fn on_event(&self, _log: &Log) -> anyhow::Result<()> {
self.count.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}

struct SlowProcessor {
pub delay_ms: u64,
pub processed: Arc<AtomicUsize>,
}

#[async_trait]
impl EventCallback for SlowProcessor {
async fn on_event(&self, _log: &Log) -> anyhow::Result<()> {
sleep(Duration::from_millis(self.delay_ms)).await;
self.processed.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}

#[tokio::test]
async fn test_live_scanning_basic() -> anyhow::Result<()> {
Copy link
Collaborator

@0xNeshi 0xNeshi Sep 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

red alert, just had this test randomly fail, seems it has a race condition too...

My guess is that the scanner_handle.abort() call is again to blame.
There's probably a race condition in test_live_scanning_multiple_events too.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like there must be a more reliable way to write these tests to avoid this problem, but I'd leave that for when we're writing tests for channels

let anvil = Anvil::new().block_time(1).try_spawn()?;
let wallet = anvil.wallet();
let provider =
ProviderBuilder::new().wallet(wallet.unwrap()).connect(anvil.endpoint().as_str()).await?;

let contract = LiveTestCounter::deploy(provider.clone()).await?;
let contract_address = *contract.address();

let event_count = Arc::new(AtomicUsize::new(0));
let callback = Arc::new(EventCounter { count: event_count.clone() });

let filter = EventFilter {
contract_address,
event: LiveTestCounter::CountIncreased::SIGNATURE.to_owned(),
callback,
};

let mut scanner =
ScannerBuilder::new(anvil.ws_endpoint_url()).add_event_filter(filter).build().await?;

let scanner_handle = tokio::spawn(async move { scanner.start().await });

for _ in 0..5 {
let _ = contract.increase().send().await?.get_receipt().await?;
}

sleep(Duration::from_millis(200)).await;

scanner_handle.abort();

assert_eq!(event_count.load(Ordering::SeqCst), 5);

Ok(())
}

#[tokio::test]
async fn test_live_scanning_multiple_events() -> anyhow::Result<()> {
let anvil = Anvil::new().block_time(1).try_spawn()?;
let wallet = anvil.wallet();
let provider =
ProviderBuilder::new().wallet(wallet.unwrap()).connect(anvil.endpoint().as_str()).await?;

let contract = LiveTestCounter::deploy(provider.clone()).await?;
let contract_address = *contract.address();

let increase_count = Arc::new(AtomicUsize::new(0));
let decrease_count = Arc::new(AtomicUsize::new(0));

let increase_callback = Arc::new(EventCounter { count: increase_count.clone() });

let decrease_callback = Arc::new(EventCounter { count: decrease_count.clone() });

let increase_filter = EventFilter {
contract_address,
event: LiveTestCounter::CountIncreased::SIGNATURE.to_owned(),
callback: increase_callback,
};

let decrease_filter = EventFilter {
contract_address,
event: LiveTestCounter::CountDecreased::SIGNATURE.to_owned(),
callback: decrease_callback,
};

let mut scanner = ScannerBuilder::new(anvil.ws_endpoint_url())
.add_event_filter(increase_filter)
.add_event_filter(decrease_filter)
.build()
.await?;

let scanner_handle = tokio::spawn(async move { scanner.start().await });

for i in 0..6 {
let _ = contract.increase().send().await?.get_receipt().await?;
if i >= 4 {
let _ = contract.decrease().send().await?.get_receipt().await?;
}
}

sleep(Duration::from_millis(200)).await;

scanner_handle.abort();

assert_eq!(increase_count.load(Ordering::SeqCst), 6);
assert_eq!(decrease_count.load(Ordering::SeqCst), 2);

Ok(())
}

#[tokio::test]
async fn test_live_scanning_with_slow_processor() -> anyhow::Result<()> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just found out that there's a race condition in this test, it fails intermittently...

Copy link
Collaborator

@0xNeshi 0xNeshi Sep 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The race condition seems to appear when aborting the scanner thread. Because it is slow to process the pending events (events are emitted every ~50ms, while the scanner "runs" every ~100ms), occasionally it doesn't manage to handle the last event before being aborted, causing the assertion below it to fail (in my case it failed because the actual number of processed events was 2, not 3). It could easily happen that the actual number of processed events is just 1.

The solution could be one of the following:

  1. Add an additional sleep(200) to allow the scanner thread to "catch up". <- 150ms (the difference in delay) + 50ms buffer just in case; still could theoretically have a race condition in the right circumstances, but much less likely.
  2. Take into account that the last event might not be processed, so update the assertion to state something like assert!((1..=3).contains(processed.load(Ordering::SeqCst)); <- this makes the test an "approximation" of valid behavior, and a poor approximation at that.
  3. Do you have any alternative ideas?

Wdyt?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for now I will add a sleep(200).
Thanks for looking into this - perhaps once we have channels we can expose a method to see if the channels are empty? Im not sure if this is relevant or needed but my reasoning is for any reason a process want to stop the scanner, they must ensure that any callbacks have finished executing.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll try to make the channels testable.

I think we should ensure the scanner does it's job, but I don't think we can make assumptions about what happens when someone intentionally kills the whole process

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking more of like a pulse check

let bool = scanner.has_pending_callbacks();

or like

scanner.safe_abort();

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would the test assertions look like for each?

let anvil = Anvil::new().block_time(1).try_spawn()?;
let wallet = anvil.wallet();
let provider =
ProviderBuilder::new().wallet(wallet.unwrap()).connect(anvil.endpoint().as_str()).await?;

let contract = LiveTestCounter::deploy(provider.clone()).await?;
let contract_address = *contract.address();

let processed = Arc::new(AtomicUsize::new(0));
let callback = Arc::new(SlowProcessor { delay_ms: 100, processed: processed.clone() });

let filter = EventFilter {
contract_address,
event: LiveTestCounter::CountIncreased::SIGNATURE.to_owned(),
callback,
};

let mut scanner =
ScannerBuilder::new(anvil.ws_endpoint_url()).add_event_filter(filter).build().await?;

let scanner_handle = tokio::spawn(async move { scanner.start().await });

for _ in 0..3 {
let _ = contract.increase().send().await?.get_receipt().await?;
// Less than processor delay
sleep(Duration::from_millis(50)).await;
}

sleep(Duration::from_millis(200)).await;

scanner_handle.abort();

assert_eq!(processed.load(Ordering::SeqCst), 3);

Ok(())
}