Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ jobs:
- name: Install stable toolchain
uses: actions-rust-lang/setup-rust-toolchain@v1

- name: Install Foundry
uses: foundry-rs/foundry-toolchain@v1

- name: Cache cargo-nextest binary
id: cache-cargo-nextest
uses: actions/cache@v4
Expand All @@ -39,7 +42,7 @@ jobs:
tool: cargo-nextest

- name: Cargo test
run: cargo nextest run --locked --all-targets --all-features --no-tests=pass
run: cargo nextest run --locked --all-targets --all-features --no-tests=pass --no-fail-fast

# https://github.com/rust-lang/cargo/issues/6669
- name: Run doc tests
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
/target
/examples/**/target
.DS_Store
104 changes: 100 additions & 4 deletions src/block_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ use alloy::{
eips::BlockNumberOrTag,
network::Network,
providers::{Provider, RootProvider},
pubsub::PubSubConnect,
rpc::{
client::{ClientBuilder, RpcClient},
types::Header,
},
transports::TransportError,
transports::{TransportError, ipc::IpcConnect, ws::WsConnect},
};

// copied form https://github.com/taikoxyz/taiko-mono/blob/f4b3a0e830e42e2fee54829326389709dd422098/packages/taiko-client/pkg/chain_iterator/block_batch_iterator.go#L19
Expand Down Expand Up @@ -132,7 +133,7 @@ impl<N: Network> BlockScannerBuilder<N> {
/// Returns an error if the connection fails
pub async fn connect_ws(
self,
connect: alloy::transports::ws::WsConnect,
connect: WsConnect,
) -> Result<BlockScanner<RootProvider<N>, N>, TransportError> {
let client = ClientBuilder::default().ws(connect).await?;
Ok(self.connect_client(client))
Expand All @@ -145,10 +146,10 @@ impl<N: Network> BlockScannerBuilder<N> {
/// Returns an error if the connection fails
pub async fn connect_ipc<T>(
self,
connect: alloy::transports::ipc::IpcConnect<T>,
connect: IpcConnect<T>,
) -> Result<BlockScanner<RootProvider<N>, N>, TransportError>
where
alloy::transports::ipc::IpcConnect<T>: alloy::pubsub::PubSubConnect,
IpcConnect<T>: PubSubConnect,
{
let client = ClientBuilder::default().ipc(connect).await?;
Ok(self.connect_client(client))
Expand Down Expand Up @@ -219,3 +220,98 @@ where
receiver_stream
}
}

#[cfg(test)]
mod tests {
use super::*;
use alloy::network::{Ethereum, Network};
use alloy_node_bindings::Anvil;
use tokio_stream::StreamExt;

#[allow(clippy::unnecessary_wraps)]
fn no_op_on_blocks<N: Network>(
_block: <N as Network>::BlockResponse,
_update_current: UpdateCurrentFunc,
_end_iter: EndIterFunc,
) -> anyhow::Result<()> {
Ok(())
}

#[test]
fn test_block_scanner_error_display() {
assert_eq!(format!("{}", BlockScannerError::ErrEOF), "end of block batch iterator");
assert_eq!(format!("{}", BlockScannerError::ErrContinue), "continue");
assert_eq!(
format!("{}", BlockScannerError::TerminalError(42)),
"terminal error at block height 42"
);
}

#[test]
fn test_builder_defaults() {
let builder = BlockScannerBuilder::<Ethereum>::new();
assert_eq!(builder.blocks_read_per_epoch, DEFAULT_BLOCKS_READ_PER_EPOCH);
assert!(matches!(builder.start_height, BlockNumberOrTag::Earliest));
assert!(matches!(builder.end_height, BlockNumberOrTag::Latest));
assert_eq!(builder.reorg_rewind_depth, DEFAULT_REORG_REWIND_DEPTH);
assert_eq!(builder.retry_interval, DEFAULT_RETRY_INTERVAL);
assert_eq!(builder.block_confirmations, DEFAULT_BLOCK_CONFIRMATIONS);
}

#[test]
fn test_builder_setters() {
let mut builder = BlockScannerBuilder::<Ethereum>::new();
builder.with_blocks_read_per_epoch(25);
builder.with_start_height(BlockNumberOrTag::Earliest);
builder.with_end_height(BlockNumberOrTag::Latest);
builder.with_on_blocks(no_op_on_blocks::<Ethereum>);
builder.with_reorg_rewind_depth(5);
let interval = Duration::from_secs(3);
builder.with_retry_interval(interval);
builder.with_block_confirmations(12);

assert_eq!(builder.blocks_read_per_epoch, 25);
assert!(matches!(builder.start_height, BlockNumberOrTag::Earliest));
assert!(matches!(builder.end_height, BlockNumberOrTag::Latest));
assert_eq!(builder.reorg_rewind_depth, 5);
assert_eq!(builder.retry_interval, interval);
assert_eq!(builder.block_confirmations, 12);
}

#[tokio::test]
async fn test_connect_ws_and_start_stream_eof() {
let anvil = Anvil::new().try_spawn().expect("failed to spawn anvil");
let ws = WsConnect::new(anvil.ws_endpoint_url());

let builder = BlockScannerBuilder::<Ethereum>::new();
let scanner = builder.connect_ws(ws).await.expect("failed to connect ws");

let mut stream = scanner.start().await;
let first = stream.next().await;
match first {
Some(Err(BlockScannerError::ErrEOF)) => {}
other => panic!("expected first stream item to be ErrEOF, got: {other:?}"),
}
}

#[tokio::test]
async fn test_channel_buffer_is_equal_to_blocks_read_per_epoch() {
let anvil = Anvil::new().try_spawn().expect("failed to spawn anvil");
let ws = WsConnect::new(anvil.ws_endpoint_url());

let mut builder = BlockScannerBuilder::<Ethereum>::new();
builder.with_blocks_read_per_epoch(5);

let scanner = builder.connect_ws(ws).await.expect("failed to connect ws");

for _ in 0..scanner.blocks_read_per_epoch {
scanner
.sender
.try_send(Err(BlockScannerError::ErrContinue))
.expect("channel should not be full yet");
}

let res = scanner.sender.try_send(Err(BlockScannerError::ErrContinue));
assert!(matches!(res, Err(tokio::sync::mpsc::error::TrySendError::Full(_))));
}
}
201 changes: 201 additions & 0 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,204 @@ impl ScannerBuilder {
.await
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::callback::EventCallback;
use alloy::{primitives::address, rpc::types::Log};
use async_trait::async_trait;
use std::sync::Arc;

struct MockCallback;

#[async_trait]
impl EventCallback for MockCallback {
async fn on_event(&self, _log: &Log) -> anyhow::Result<()> {
Ok(())
}
}

const WS_URL: &str = "ws://localhost:8545";

#[test]
fn test_builder_new_defaults() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

although we're changing the design, these tests might come in handy for the event scanner, so we might as well keep them temporarily

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah wasnt sure what to do as we transition

let builder = ScannerBuilder::new(WS_URL);
assert_eq!(builder.rpc_url, WS_URL);
assert_eq!(builder.start_block, None);
assert_eq!(builder.end_block, None);
assert_eq!(builder.max_blocks_per_filter, 1000);
assert!(builder.tracked_events.is_empty());
}

#[test]
fn test_builder_start_block() {
let start_block = 100;
let builder = ScannerBuilder::new(WS_URL).start_block(start_block);
assert_eq!(builder.start_block, Some(start_block));
}

#[test]
fn test_builder_end_block() {
let end_block = 500;
let builder = ScannerBuilder::new(WS_URL).end_block(end_block);
assert_eq!(builder.end_block, Some(end_block));
}

#[test]
fn test_builder_block_range() {
let start_block = 100;
let end_block = 500;
let builder = ScannerBuilder::new(WS_URL).start_block(start_block).end_block(end_block);
assert_eq!(builder.start_block, Some(start_block));
assert_eq!(builder.end_block, Some(end_block));
}

#[test]
fn test_builder_max_blocks_per_filter() {
let max_blocks = 5000;
let builder = ScannerBuilder::new(WS_URL).max_blocks_per_filter(max_blocks);
assert_eq!(builder.max_blocks_per_filter, max_blocks);
}

#[test]
fn test_builder_callback_config() {
let max_attempts = 5;
let delay_ms = 500;
let config = CallbackConfig { max_attempts, delay_ms };

let builder = ScannerBuilder::new(WS_URL).callback_config(config.clone());

assert_eq!(builder.callback_config.max_attempts, max_attempts);
assert_eq!(builder.callback_config.delay_ms, delay_ms);
}

#[test]
fn test_builder_default_callback_config() {
let builder = ScannerBuilder::new(WS_URL);

assert_eq!(builder.callback_config.max_attempts, 3);
assert_eq!(builder.callback_config.delay_ms, 200);
}

#[test]
fn test_builder_add_event_filter() {
let addr = address!("f39Fd6e51aad88F6F4ce6aB8827279cffFb92266");
let event = "Transfer(address,address,uint256)".to_string();
let filter = EventFilter {
contract_address: addr,
event: event.clone(),
callback: Arc::new(MockCallback),
};
let builder = ScannerBuilder::new(WS_URL).add_event_filter(filter.clone());

assert_eq!(builder.tracked_events.len(), 1);
assert_eq!(builder.tracked_events[0].contract_address, addr);
assert_eq!(builder.tracked_events[0].event, event);
}

#[test]
fn test_builder_add_multiple_event_filters() {
let addr1 = address!("f39Fd6e51aad88F6F4ce6aB8827279cffFb92266");
let event1 = "Transfer(address,address,uint256)".to_string();
let addr2 = address!("70997970C51812dc3A010C7d01b50e0d17dc79C8");
let event2 = "Approval(address,address,uint256)".to_string();

let filter1 = EventFilter {
contract_address: addr1,
event: event1.clone(),
callback: Arc::new(MockCallback),
};
let filter2 = EventFilter {
contract_address: addr2,
event: event2.clone(),
callback: Arc::new(MockCallback),
};

let builder = ScannerBuilder::new(WS_URL)
.add_event_filter(filter1.clone())
.add_event_filter(filter2.clone());

assert_eq!(builder.tracked_events.len(), 2);
for (i, expected_filter) in builder.tracked_events.iter().enumerate() {
assert_eq!(
builder.tracked_events[i].contract_address,
expected_filter.contract_address
);
assert_eq!(builder.tracked_events[i].event, expected_filter.event);
}
}

#[test]
fn test_builder_add_event_filters_batch() {
let addr1 = address!("f39Fd6e51aad88F6F4ce6aB8827279cffFb92266");
let event1 = "Transfer(address,address,uint256)".to_string();
let addr2 = address!("70997970C51812dc3A010C7d01b50e0d17dc79C8");
let event2 = "Approval(address,address,uint256)".to_string();
let addr3 = address!("3C44CdDdB6a900fa2b585dd299e03d12FA4293BC");
let event3 = "Mint(address,uint256)".to_string();

let filter_1 = EventFilter {
contract_address: addr1,
event: event1.clone(),
callback: Arc::new(MockCallback),
};
let filter_2 = EventFilter {
contract_address: addr2,
event: event2.clone(),
callback: Arc::new(MockCallback),
};
let filter_3 = EventFilter {
contract_address: addr3,
event: event3.clone(),
callback: Arc::new(MockCallback),
};

let filters = vec![filter_1.clone(), filter_2.clone(), filter_3.clone()];
let builder = ScannerBuilder::new(WS_URL).add_event_filters(filters.clone());

assert_eq!(builder.tracked_events.len(), filters.len());

for (i, expected_filter) in filters.iter().enumerate() {
assert_eq!(
builder.tracked_events[i].contract_address,
expected_filter.contract_address
);
assert_eq!(builder.tracked_events[i].event, expected_filter.event);
}
}

#[test]
fn test_builder_chain_all_methods() {
let start_block = 100;
let end_block = 500;

let addr = address!("f39Fd6e51aad88F6F4ce6aB8827279cffFb92266");
let event = "Transfer(address,address,uint256)".to_string();

let filter = EventFilter {
contract_address: addr,
event: event.clone(),
callback: Arc::new(MockCallback),
};

let max_attempts = 5;
let delay_ms = 500;
let config = CallbackConfig { max_attempts, delay_ms };

let max_blocks_per_filter = 2000;
let builder = ScannerBuilder::new(WS_URL)
.start_block(start_block)
.end_block(end_block)
.max_blocks_per_filter(max_blocks_per_filter)
.add_event_filter(filter.clone())
.callback_config(config.clone());

assert_eq!(builder.start_block, Some(start_block));
assert_eq!(builder.end_block, Some(end_block));
assert_eq!(builder.max_blocks_per_filter, max_blocks_per_filter);
assert_eq!(builder.tracked_events.len(), 1);
assert_eq!(builder.callback_config.max_attempts, max_attempts);
assert_eq!(builder.callback_config.delay_ms, delay_ms);
}
}
Loading