diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index 75437cc9..edb86e8b 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -733,13 +733,147 @@ impl BlockRangeScannerClient { #[cfg(test)] mod tests { - use alloy::network::Ethereum; + use alloy::{ + eips::BlockNumberOrTag, + network::Ethereum, + primitives::{B256, keccak256}, + rpc::{ + client::RpcClient, + types::{Block as RpcBlock, Header, Transaction}, + }, + transports::mock::Asserter, + }; use alloy_node_bindings::Anvil; - + use serde_json::{Value, json}; + use tokio::sync::mpsc; use tokio_stream::StreamExt; use super::*; + fn test_config() -> Config { + Config { blocks_read_per_epoch: 5, reorg_rewind_depth: 5, block_confirmations: 0 } + } + + fn mocked_provider(asserter: Asserter) -> RootProvider { + RootProvider::new(RpcClient::mocked(asserter)) + } + + fn mock_block(number: u64, hash: B256) -> RpcBlock { + let mut block: RpcBlock = RpcBlock::default(); + block.header.hash = hash; + block.header.number = number; + block + } + + #[test] + fn block_range_scanner_defaults_match_constants() { + let scanner = BlockRangeScanner::new(); + + assert_eq!(scanner.blocks_read_per_epoch, DEFAULT_BLOCKS_READ_PER_EPOCH); + assert_eq!(scanner.reorg_rewind_depth, DEFAULT_REORG_REWIND_DEPTH); + assert_eq!(scanner.block_confirmations, DEFAULT_BLOCK_CONFIRMATIONS); + } + + #[test] + fn builder_methods_update_configuration() { + let blocks_read_per_epoch = 42; + let reorg_rewind_depth = 12; + let block_confirmations = 7; + + let scanner = BlockRangeScanner::new() + .with_blocks_read_per_epoch(blocks_read_per_epoch) + .with_reorg_rewind_depth(reorg_rewind_depth) + .with_block_confirmations(block_confirmations); + + assert_eq!(scanner.blocks_read_per_epoch, blocks_read_per_epoch); + assert_eq!(scanner.block_confirmations, block_confirmations); + } + + #[test] + fn service_status_reflects_internal_state() { + let asserter = Asserter::new(); + let provider = mocked_provider(asserter); + let (mut service, _cmd) = Service::new(test_config(), provider); + + let processed_count = 7; + let error_count = 2; + service.processed_count = processed_count; + service.error_count = error_count; + let hash = keccak256(b"random"); + let block_number = 99; + service.current = Some(BlockHashAndNumber { hash, number: block_number }); + service.websocket_connected = true; + service.subscriber = Some(mpsc::channel(1).0); + + let status = service.get_status(); + + assert!(status.is_subscribed); + assert!(status.websocket_connected); + assert_eq!(status.processed_count, processed_count); + assert_eq!(status.error_count, error_count); + let last = status.last_synced_block.expect("last synced block is set"); + assert_eq!(last.number, block_number); + assert_eq!(last.hash, hash); + } + + #[tokio::test] + async fn send_to_subscriber_increments_processed_count() -> anyhow::Result<()> { + let asserter = Asserter::new(); + let provider = mocked_provider(asserter); + let (mut service, _cmd) = Service::new(test_config(), provider); + + let (tx, mut rx) = mpsc::channel(1); + service.subscriber = Some(tx); + + let expected_range = 10..=11; + service.send_to_subscriber(Ok(expected_range.clone())).await; + + assert_eq!(service.processed_count, 1); + assert!(service.subscriber.is_some()); + + let received = rx.recv().await.expect("range received")?; + assert_eq!(received, expected_range); + + Ok(()) + } + + #[tokio::test] + async fn send_to_subscriber_removes_closed_channel() -> anyhow::Result<()> { + let asserter = Asserter::new(); + let provider = mocked_provider(asserter); + let (mut service, _cmd) = Service::new(test_config(), provider); + + let (tx, rx) = mpsc::channel(1); + service.websocket_connected = true; + service.subscriber = Some(tx); + // channel is closed + drop(rx); + + service.send_to_subscriber(Ok(15..=15)).await; + + assert!(service.subscriber.is_none()); + assert!(!service.websocket_connected); + assert_eq!(service.processed_count, 0); + + Ok(()) + } + + #[test] + fn handle_unsubscribe_clears_subscriber() { + let asserter = Asserter::new(); + let provider = mocked_provider(asserter); + let (mut service, _cmd) = Service::new(test_config(), provider); + + let (tx, _rx) = mpsc::channel(1); + service.websocket_connected = true; + service.subscriber = Some(tx); + + service.handle_unsubscribe(); + + assert!(service.subscriber.is_none()); + assert!(!service.websocket_connected); + } + #[tokio::test] async fn live_mode_processes_all_blocks() -> anyhow::Result<()> { let anvil = Anvil::new().block_time_f64(0.01).try_spawn()?; @@ -779,4 +913,81 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn rewinds_on_detected_reorg() -> anyhow::Result<()> { + let asserter = Asserter::new(); + let provider = mocked_provider(asserter.clone()); + + let mut config = test_config(); + config.reorg_rewind_depth = 6; + let (mut service, _cmd) = Service::new(config.clone(), provider); + + let original_height = 10; + let original_hash = keccak256(b"original block"); + let original_block = mock_block(original_height, original_hash); + service.current = + Some(BlockHashAndNumber::from_header::(original_block.header())); + + let expected_rewind_height = original_height - config.reorg_rewind_depth; + let expected_rewind_hash = keccak256(b"rewound block"); + let rewound_block = mock_block(expected_rewind_height, expected_rewind_hash); + + // Mock provider responses for reorg detection and rewind: + // 1. get_block_by_hash(original_hash) -> None (block not found = reorg detected) + asserter.push_success(&Value::Null); + // 2. get_block_number() -> 12 (current chain head is at 12) + asserter.push_success(&json!(format!("0x{:x}", original_height + 2))); + // 3. get_block_by_number(expected_rewind_height) -> rewound_block + asserter.push_success(&rewound_block); + + service.ensure_current_not_reorged().await?; + + let current = service.current.expect("current block should be set after rewind"); + assert_eq!(current.number, expected_rewind_height, "should rewind by reorg_rewind_depth"); + assert_eq!(current.hash, expected_rewind_hash, "should use hash of block at rewind height"); + + Ok(()) + } + + #[tokio::test] + async fn buffered_messages_trim_ranges_prior_to_cutoff() -> anyhow::Result<()> { + let (buffer_tx, buffer_rx) = mpsc::channel(8); + buffer_tx.send(40..=44).await.unwrap(); + buffer_tx.send(45..=54).await.unwrap(); + buffer_tx.send(60..=61).await.unwrap(); + drop(buffer_tx); + + let (out_tx, mut out_rx) = mpsc::channel(8); + + Service::::process_buffered_messages(buffer_rx, out_tx, 50).await; + + let mut forwarded = Vec::new(); + while let Some(result) = out_rx.recv().await { + forwarded.push(result.unwrap()); + } + + assert_eq!(forwarded, vec![50..=54, 60..=61]); + + Ok(()) + } + + #[tokio::test] + async fn forwards_errors_to_subscribers() -> anyhow::Result<()> { + let asserter = Asserter::new(); + let provider = mocked_provider(asserter); + let (mut service, _cmd) = Service::new(test_config(), provider); + + let (tx, mut rx) = mpsc::channel(1); + service.subscriber = Some(tx); + + service.send_to_subscriber(Err(Error::WebSocketConnectionFailed(4))).await; + + match rx.recv().await.expect("subscriber should stay open") { + Err(Error::WebSocketConnectionFailed(attempts)) => assert_eq!(attempts, 4), + other => panic!("unexpected message: {other:?}"), + } + + Ok(()) + } }