diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 19b870f3..3423a54f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -25,6 +25,9 @@ jobs: - name: Install stable toolchain uses: actions-rust-lang/setup-rust-toolchain@v1 + - name: Install Foundry + uses: foundry-rs/foundry-toolchain@v1 + - name: Cache cargo-nextest binary id: cache-cargo-nextest uses: actions/cache@v4 @@ -39,7 +42,7 @@ jobs: tool: cargo-nextest - name: Cargo test - run: cargo nextest run --locked --all-targets --all-features --no-tests=pass + run: cargo nextest run --locked --all-targets --all-features --no-tests=pass --no-fail-fast # https://github.com/rust-lang/cargo/issues/6669 - name: Run doc tests diff --git a/.gitignore b/.gitignore index 677149fd..34f380f9 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /target /examples/**/target +.DS_Store diff --git a/src/block_scanner.rs b/src/block_scanner.rs index bba72319..4f120dbe 100644 --- a/src/block_scanner.rs +++ b/src/block_scanner.rs @@ -9,11 +9,12 @@ use alloy::{ eips::BlockNumberOrTag, network::Network, providers::{Provider, RootProvider}, + pubsub::PubSubConnect, rpc::{ client::{ClientBuilder, RpcClient}, types::Header, }, - transports::TransportError, + transports::{TransportError, ipc::IpcConnect, ws::WsConnect}, }; // copied form https://github.com/taikoxyz/taiko-mono/blob/f4b3a0e830e42e2fee54829326389709dd422098/packages/taiko-client/pkg/chain_iterator/block_batch_iterator.go#L19 @@ -132,7 +133,7 @@ impl BlockScannerBuilder { /// Returns an error if the connection fails pub async fn connect_ws( self, - connect: alloy::transports::ws::WsConnect, + connect: WsConnect, ) -> Result, N>, TransportError> { let client = ClientBuilder::default().ws(connect).await?; Ok(self.connect_client(client)) @@ -145,10 +146,10 @@ impl BlockScannerBuilder { /// Returns an error if the connection fails pub async fn connect_ipc( self, - connect: alloy::transports::ipc::IpcConnect, + connect: IpcConnect, ) -> Result, N>, TransportError> where - alloy::transports::ipc::IpcConnect: alloy::pubsub::PubSubConnect, + IpcConnect: PubSubConnect, { let client = ClientBuilder::default().ipc(connect).await?; Ok(self.connect_client(client)) @@ -219,3 +220,98 @@ where receiver_stream } } + +#[cfg(test)] +mod tests { + use super::*; + use alloy::network::{Ethereum, Network}; + use alloy_node_bindings::Anvil; + use tokio_stream::StreamExt; + + #[allow(clippy::unnecessary_wraps)] + fn no_op_on_blocks( + _block: ::BlockResponse, + _update_current: UpdateCurrentFunc, + _end_iter: EndIterFunc, + ) -> anyhow::Result<()> { + Ok(()) + } + + #[test] + fn test_block_scanner_error_display() { + assert_eq!(format!("{}", BlockScannerError::ErrEOF), "end of block batch iterator"); + assert_eq!(format!("{}", BlockScannerError::ErrContinue), "continue"); + assert_eq!( + format!("{}", BlockScannerError::TerminalError(42)), + "terminal error at block height 42" + ); + } + + #[test] + fn test_builder_defaults() { + let builder = BlockScannerBuilder::::new(); + assert_eq!(builder.blocks_read_per_epoch, DEFAULT_BLOCKS_READ_PER_EPOCH); + assert!(matches!(builder.start_height, BlockNumberOrTag::Earliest)); + assert!(matches!(builder.end_height, BlockNumberOrTag::Latest)); + assert_eq!(builder.reorg_rewind_depth, DEFAULT_REORG_REWIND_DEPTH); + assert_eq!(builder.retry_interval, DEFAULT_RETRY_INTERVAL); + assert_eq!(builder.block_confirmations, DEFAULT_BLOCK_CONFIRMATIONS); + } + + #[test] + fn test_builder_setters() { + let mut builder = BlockScannerBuilder::::new(); + builder.with_blocks_read_per_epoch(25); + builder.with_start_height(BlockNumberOrTag::Earliest); + builder.with_end_height(BlockNumberOrTag::Latest); + builder.with_on_blocks(no_op_on_blocks::); + builder.with_reorg_rewind_depth(5); + let interval = Duration::from_secs(3); + builder.with_retry_interval(interval); + builder.with_block_confirmations(12); + + assert_eq!(builder.blocks_read_per_epoch, 25); + assert!(matches!(builder.start_height, BlockNumberOrTag::Earliest)); + assert!(matches!(builder.end_height, BlockNumberOrTag::Latest)); + assert_eq!(builder.reorg_rewind_depth, 5); + assert_eq!(builder.retry_interval, interval); + assert_eq!(builder.block_confirmations, 12); + } + + #[tokio::test] + async fn test_connect_ws_and_start_stream_eof() { + let anvil = Anvil::new().try_spawn().expect("failed to spawn anvil"); + let ws = WsConnect::new(anvil.ws_endpoint_url()); + + let builder = BlockScannerBuilder::::new(); + let scanner = builder.connect_ws(ws).await.expect("failed to connect ws"); + + let mut stream = scanner.start().await; + let first = stream.next().await; + match first { + Some(Err(BlockScannerError::ErrEOF)) => {} + other => panic!("expected first stream item to be ErrEOF, got: {other:?}"), + } + } + + #[tokio::test] + async fn test_channel_buffer_is_equal_to_blocks_read_per_epoch() { + let anvil = Anvil::new().try_spawn().expect("failed to spawn anvil"); + let ws = WsConnect::new(anvil.ws_endpoint_url()); + + let mut builder = BlockScannerBuilder::::new(); + builder.with_blocks_read_per_epoch(5); + + let scanner = builder.connect_ws(ws).await.expect("failed to connect ws"); + + for _ in 0..scanner.blocks_read_per_epoch { + scanner + .sender + .try_send(Err(BlockScannerError::ErrContinue)) + .expect("channel should not be full yet"); + } + + let res = scanner.sender.try_send(Err(BlockScannerError::ErrContinue)); + assert!(matches!(res, Err(tokio::sync::mpsc::error::TrySendError::Full(_)))); + } +} diff --git a/src/builder.rs b/src/builder.rs index 11542f3e..258013c4 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -77,3 +77,204 @@ impl ScannerBuilder { .await } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::callback::EventCallback; + use alloy::{primitives::address, rpc::types::Log}; + use async_trait::async_trait; + use std::sync::Arc; + + struct MockCallback; + + #[async_trait] + impl EventCallback for MockCallback { + async fn on_event(&self, _log: &Log) -> anyhow::Result<()> { + Ok(()) + } + } + + const WS_URL: &str = "ws://localhost:8545"; + + #[test] + fn test_builder_new_defaults() { + let builder = ScannerBuilder::new(WS_URL); + assert_eq!(builder.rpc_url, WS_URL); + assert_eq!(builder.start_block, None); + assert_eq!(builder.end_block, None); + assert_eq!(builder.max_blocks_per_filter, 1000); + assert!(builder.tracked_events.is_empty()); + } + + #[test] + fn test_builder_start_block() { + let start_block = 100; + let builder = ScannerBuilder::new(WS_URL).start_block(start_block); + assert_eq!(builder.start_block, Some(start_block)); + } + + #[test] + fn test_builder_end_block() { + let end_block = 500; + let builder = ScannerBuilder::new(WS_URL).end_block(end_block); + assert_eq!(builder.end_block, Some(end_block)); + } + + #[test] + fn test_builder_block_range() { + let start_block = 100; + let end_block = 500; + let builder = ScannerBuilder::new(WS_URL).start_block(start_block).end_block(end_block); + assert_eq!(builder.start_block, Some(start_block)); + assert_eq!(builder.end_block, Some(end_block)); + } + + #[test] + fn test_builder_max_blocks_per_filter() { + let max_blocks = 5000; + let builder = ScannerBuilder::new(WS_URL).max_blocks_per_filter(max_blocks); + assert_eq!(builder.max_blocks_per_filter, max_blocks); + } + + #[test] + fn test_builder_callback_config() { + let max_attempts = 5; + let delay_ms = 500; + let config = CallbackConfig { max_attempts, delay_ms }; + + let builder = ScannerBuilder::new(WS_URL).callback_config(config.clone()); + + assert_eq!(builder.callback_config.max_attempts, max_attempts); + assert_eq!(builder.callback_config.delay_ms, delay_ms); + } + + #[test] + fn test_builder_default_callback_config() { + let builder = ScannerBuilder::new(WS_URL); + + assert_eq!(builder.callback_config.max_attempts, 3); + assert_eq!(builder.callback_config.delay_ms, 200); + } + + #[test] + fn test_builder_add_event_filter() { + let addr = address!("f39Fd6e51aad88F6F4ce6aB8827279cffFb92266"); + let event = "Transfer(address,address,uint256)".to_string(); + let filter = EventFilter { + contract_address: addr, + event: event.clone(), + callback: Arc::new(MockCallback), + }; + let builder = ScannerBuilder::new(WS_URL).add_event_filter(filter.clone()); + + assert_eq!(builder.tracked_events.len(), 1); + assert_eq!(builder.tracked_events[0].contract_address, addr); + assert_eq!(builder.tracked_events[0].event, event); + } + + #[test] + fn test_builder_add_multiple_event_filters() { + let addr1 = address!("f39Fd6e51aad88F6F4ce6aB8827279cffFb92266"); + let event1 = "Transfer(address,address,uint256)".to_string(); + let addr2 = address!("70997970C51812dc3A010C7d01b50e0d17dc79C8"); + let event2 = "Approval(address,address,uint256)".to_string(); + + let filter1 = EventFilter { + contract_address: addr1, + event: event1.clone(), + callback: Arc::new(MockCallback), + }; + let filter2 = EventFilter { + contract_address: addr2, + event: event2.clone(), + callback: Arc::new(MockCallback), + }; + + let builder = ScannerBuilder::new(WS_URL) + .add_event_filter(filter1.clone()) + .add_event_filter(filter2.clone()); + + assert_eq!(builder.tracked_events.len(), 2); + for (i, expected_filter) in builder.tracked_events.iter().enumerate() { + assert_eq!( + builder.tracked_events[i].contract_address, + expected_filter.contract_address + ); + assert_eq!(builder.tracked_events[i].event, expected_filter.event); + } + } + + #[test] + fn test_builder_add_event_filters_batch() { + let addr1 = address!("f39Fd6e51aad88F6F4ce6aB8827279cffFb92266"); + let event1 = "Transfer(address,address,uint256)".to_string(); + let addr2 = address!("70997970C51812dc3A010C7d01b50e0d17dc79C8"); + let event2 = "Approval(address,address,uint256)".to_string(); + let addr3 = address!("3C44CdDdB6a900fa2b585dd299e03d12FA4293BC"); + let event3 = "Mint(address,uint256)".to_string(); + + let filter_1 = EventFilter { + contract_address: addr1, + event: event1.clone(), + callback: Arc::new(MockCallback), + }; + let filter_2 = EventFilter { + contract_address: addr2, + event: event2.clone(), + callback: Arc::new(MockCallback), + }; + let filter_3 = EventFilter { + contract_address: addr3, + event: event3.clone(), + callback: Arc::new(MockCallback), + }; + + let filters = vec![filter_1.clone(), filter_2.clone(), filter_3.clone()]; + let builder = ScannerBuilder::new(WS_URL).add_event_filters(filters.clone()); + + assert_eq!(builder.tracked_events.len(), filters.len()); + + for (i, expected_filter) in filters.iter().enumerate() { + assert_eq!( + builder.tracked_events[i].contract_address, + expected_filter.contract_address + ); + assert_eq!(builder.tracked_events[i].event, expected_filter.event); + } + } + + #[test] + fn test_builder_chain_all_methods() { + let start_block = 100; + let end_block = 500; + + let addr = address!("f39Fd6e51aad88F6F4ce6aB8827279cffFb92266"); + let event = "Transfer(address,address,uint256)".to_string(); + + let filter = EventFilter { + contract_address: addr, + event: event.clone(), + callback: Arc::new(MockCallback), + }; + + let max_attempts = 5; + let delay_ms = 500; + let config = CallbackConfig { max_attempts, delay_ms }; + + let max_blocks_per_filter = 2000; + let builder = ScannerBuilder::new(WS_URL) + .start_block(start_block) + .end_block(end_block) + .max_blocks_per_filter(max_blocks_per_filter) + .add_event_filter(filter.clone()) + .callback_config(config.clone()); + + assert_eq!(builder.start_block, Some(start_block)); + assert_eq!(builder.end_block, Some(end_block)); + assert_eq!(builder.max_blocks_per_filter, max_blocks_per_filter); + assert_eq!(builder.tracked_events.len(), 1); + assert_eq!(builder.callback_config.max_attempts, max_attempts); + assert_eq!(builder.callback_config.delay_ms, delay_ms); + } +} diff --git a/src/event_scanner.rs b/src/event_scanner.rs index 0c2d69ed..e1e6f393 100644 --- a/src/event_scanner.rs +++ b/src/event_scanner.rs @@ -9,7 +9,8 @@ use crate::{ use alloy::{ eips::BlockNumberOrTag, network::Network, - providers::{Provider, RootProvider}, + providers::{IpcConnect, Provider, RootProvider, WsConnect}, + pubsub::PubSubConnect, rpc::client::RpcClient, transports::TransportError, }; @@ -54,7 +55,6 @@ impl EventScannerBuilder { self } - #[must_use] #[must_use] pub fn with_blocks_read_per_epoch(&mut self, blocks_read_per_epoch: usize) -> &mut Self { self.block_scanner.with_blocks_read_per_epoch(blocks_read_per_epoch); @@ -104,7 +104,7 @@ impl EventScannerBuilder { /// Returns an error if the connection fails pub async fn connect_ws( self, - connect: alloy::transports::ws::WsConnect, + connect: WsConnect, ) -> Result, N>, TransportError> { let block_scanner = self.block_scanner.connect_ws(connect).await?; Ok(EventScanner { @@ -121,10 +121,10 @@ impl EventScannerBuilder { /// Returns an error if the connection fails pub async fn connect_ipc( self, - connect: alloy::transports::ipc::IpcConnect, + connect: IpcConnect, ) -> Result, N>, TransportError> where - alloy::transports::ipc::IpcConnect: alloy::pubsub::PubSubConnect, + IpcConnect: PubSubConnect, { let block_scanner = self.block_scanner.connect_ipc(connect).await?; Ok(EventScanner { diff --git a/src/scanner.rs b/src/scanner.rs index 4860a23f..fa3360fb 100644 --- a/src/scanner.rs +++ b/src/scanner.rs @@ -265,3 +265,52 @@ impl Scanner { Err(last_err.unwrap_or_else(|| anyhow::anyhow!("callback failed with unknown error"))) } } + +#[cfg(test)] +mod tests { + use alloy_node_bindings::Anvil; + + use crate::ScannerBuilder; + + use super::*; + + #[test] + fn test_detect_provider_type_websocket() { + assert!(matches!( + Scanner::detect_provider_type("ws://localhost:8545"), + Ok(ProviderType::WebSocket) + )); + assert!(matches!( + Scanner::detect_provider_type("wss://mainnet.infura.io/ws"), + Ok(ProviderType::WebSocket) + )); + } + + #[test] + fn test_detect_provider_type_ipc() { + assert!(matches!(Scanner::detect_provider_type("/tmp/geth.ipc"), Ok(ProviderType::Ipc))); + assert!(matches!( + Scanner::detect_provider_type("./path/to/node.ipc"), + Ok(ProviderType::Ipc) + )); + assert!(matches!( + Scanner::detect_provider_type("/var/run/geth/ipc"), + Ok(ProviderType::Ipc) + )); + } + + #[test] + fn test_detect_provider_type_invalid() { + assert!(Scanner::detect_provider_type("http://localhost:8545").is_err()); + assert!(Scanner::detect_provider_type("https://mainnet.infura.io").is_err()); + assert!(Scanner::detect_provider_type("invalid-url").is_err()); + } + + #[tokio::test] + async fn test_builds_ws_provider() { + let anvil = Anvil::new().try_spawn().unwrap(); + let rpc_url = anvil.ws_endpoint_url(); + let scanner = ScannerBuilder::new(rpc_url).build().await; + assert!(scanner.is_ok()); + } +}