Skip to content
215 changes: 213 additions & 2 deletions src/block_range_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -733,13 +733,147 @@ impl BlockRangeScannerClient {

#[cfg(test)]
mod tests {
use alloy::network::Ethereum;
use alloy::{
eips::BlockNumberOrTag,
network::Ethereum,
primitives::{B256, keccak256},
rpc::{
client::RpcClient,
types::{Block as RpcBlock, Header, Transaction},
},
transports::mock::Asserter,
};
use alloy_node_bindings::Anvil;

use serde_json::{Value, json};
use tokio::sync::mpsc;
use tokio_stream::StreamExt;

use super::*;

fn test_config() -> Config {
Config { blocks_read_per_epoch: 5, reorg_rewind_depth: 5, block_confirmations: 0 }
}

fn mocked_provider(asserter: Asserter) -> RootProvider<Ethereum> {
RootProvider::new(RpcClient::mocked(asserter))
}

fn mock_block(number: u64, hash: B256) -> RpcBlock<Transaction, Header> {
let mut block: RpcBlock<Transaction, Header> = RpcBlock::default();
block.header.hash = hash;
block.header.number = number;
block
}

#[test]
fn block_range_scanner_defaults_match_constants() {
let scanner = BlockRangeScanner::new();

assert_eq!(scanner.blocks_read_per_epoch, DEFAULT_BLOCKS_READ_PER_EPOCH);
assert_eq!(scanner.reorg_rewind_depth, DEFAULT_REORG_REWIND_DEPTH);
assert_eq!(scanner.block_confirmations, DEFAULT_BLOCK_CONFIRMATIONS);
}

#[test]
fn builder_methods_update_configuration() {
let blocks_read_per_epoch = 42;
let reorg_rewind_depth = 12;
let block_confirmations = 7;

let scanner = BlockRangeScanner::new()
.with_blocks_read_per_epoch(blocks_read_per_epoch)
.with_reorg_rewind_depth(reorg_rewind_depth)
.with_block_confirmations(block_confirmations);

assert_eq!(scanner.blocks_read_per_epoch, blocks_read_per_epoch);
assert_eq!(scanner.block_confirmations, block_confirmations);
}

#[test]
fn service_status_reflects_internal_state() {
let asserter = Asserter::new();
let provider = mocked_provider(asserter);
let (mut service, _cmd) = Service::new(test_config(), provider);

let processed_count = 7;
let error_count = 2;
service.processed_count = processed_count;
service.error_count = error_count;
let hash = keccak256(b"random");
let block_number = 99;
service.current = Some(BlockHashAndNumber { hash, number: block_number });
service.websocket_connected = true;
service.subscriber = Some(mpsc::channel(1).0);

let status = service.get_status();

assert!(status.is_subscribed);
assert!(status.websocket_connected);
assert_eq!(status.processed_count, processed_count);
assert_eq!(status.error_count, error_count);
let last = status.last_synced_block.expect("last synced block is set");
assert_eq!(last.number, block_number);
assert_eq!(last.hash, hash);
}

#[tokio::test]
async fn send_to_subscriber_increments_processed_count() -> anyhow::Result<()> {
let asserter = Asserter::new();
let provider = mocked_provider(asserter);
let (mut service, _cmd) = Service::new(test_config(), provider);

let (tx, mut rx) = mpsc::channel(1);
service.subscriber = Some(tx);

let expected_range = 10..=11;
service.send_to_subscriber(Ok(expected_range.clone())).await;

assert_eq!(service.processed_count, 1);
assert!(service.subscriber.is_some());

let received = rx.recv().await.expect("range received")?;
assert_eq!(received, expected_range);

Ok(())
}

#[tokio::test]
async fn send_to_subscriber_removes_closed_channel() -> anyhow::Result<()> {
let asserter = Asserter::new();
let provider = mocked_provider(asserter);
let (mut service, _cmd) = Service::new(test_config(), provider);

let (tx, rx) = mpsc::channel(1);
service.websocket_connected = true;
service.subscriber = Some(tx);
// channel is closed
drop(rx);

service.send_to_subscriber(Ok(15..=15)).await;

assert!(service.subscriber.is_none());
assert!(!service.websocket_connected);
assert_eq!(service.processed_count, 0);

Ok(())
}

#[test]
fn handle_unsubscribe_clears_subscriber() {
let asserter = Asserter::new();
let provider = mocked_provider(asserter);
let (mut service, _cmd) = Service::new(test_config(), provider);

let (tx, _rx) = mpsc::channel(1);
service.websocket_connected = true;
service.subscriber = Some(tx);

service.handle_unsubscribe();

assert!(service.subscriber.is_none());
assert!(!service.websocket_connected);
}

#[tokio::test]
async fn live_mode_processes_all_blocks() -> anyhow::Result<()> {
let anvil = Anvil::new().block_time_f64(0.01).try_spawn()?;
Expand Down Expand Up @@ -779,4 +913,81 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn rewinds_on_detected_reorg() -> anyhow::Result<()> {
let asserter = Asserter::new();
let provider = mocked_provider(asserter.clone());

let mut config = test_config();
config.reorg_rewind_depth = 6;
let (mut service, _cmd) = Service::new(config.clone(), provider);

let original_height = 10;
let original_hash = keccak256(b"original block");
let original_block = mock_block(original_height, original_hash);
service.current =
Some(BlockHashAndNumber::from_header::<Ethereum>(original_block.header()));

let expected_rewind_height = original_height - config.reorg_rewind_depth;
let expected_rewind_hash = keccak256(b"rewound block");
let rewound_block = mock_block(expected_rewind_height, expected_rewind_hash);

// Mock provider responses for reorg detection and rewind:
// 1. get_block_by_hash(original_hash) -> None (block not found = reorg detected)
asserter.push_success(&Value::Null);
// 2. get_block_number() -> 12 (current chain head is at 12)
asserter.push_success(&json!(format!("0x{:x}", original_height + 2)));
// 3. get_block_by_number(expected_rewind_height) -> rewound_block
asserter.push_success(&rewound_block);

service.ensure_current_not_reorged().await?;

let current = service.current.expect("current block should be set after rewind");
assert_eq!(current.number, expected_rewind_height, "should rewind by reorg_rewind_depth");
assert_eq!(current.hash, expected_rewind_hash, "should use hash of block at rewind height");

Ok(())
}

#[tokio::test]
async fn buffered_messages_trim_ranges_prior_to_cutoff() -> anyhow::Result<()> {
let (buffer_tx, buffer_rx) = mpsc::channel(8);
buffer_tx.send(40..=44).await.unwrap();
buffer_tx.send(45..=54).await.unwrap();
buffer_tx.send(60..=61).await.unwrap();
drop(buffer_tx);

let (out_tx, mut out_rx) = mpsc::channel(8);

Service::<Ethereum>::process_buffered_messages(buffer_rx, out_tx, 50).await;

let mut forwarded = Vec::new();
while let Some(result) = out_rx.recv().await {
forwarded.push(result.unwrap());
}

assert_eq!(forwarded, vec![50..=54, 60..=61]);

Ok(())
}

#[tokio::test]
async fn forwards_errors_to_subscribers() -> anyhow::Result<()> {
let asserter = Asserter::new();
let provider = mocked_provider(asserter);
let (mut service, _cmd) = Service::new(test_config(), provider);

let (tx, mut rx) = mpsc::channel(1);
service.subscriber = Some(tx);

service.send_to_subscriber(Err(Error::WebSocketConnectionFailed(4))).await;

match rx.recv().await.expect("subscriber should stay open") {
Err(Error::WebSocketConnectionFailed(attempts)) => assert_eq!(attempts, 4),
other => panic!("unexpected message: {other:?}"),
}

Ok(())
}
}