@@ -3,17 +3,15 @@ use alloy::{
33 rpc:: types:: anvil:: { ReorgOptions , TransactionData } ,
44} ;
55use std:: { sync:: Arc , time:: Duration } ;
6+ use tokio_stream:: StreamExt ;
67
7- use tokio:: sync:: Mutex ;
8+ use tokio:: { sync:: Mutex , time :: timeout } ;
89
9- use crate :: {
10- common:: { TestCounter , build_provider, deploy_counter, spawn_anvil} ,
11- mock_callbacks:: BlockOrderingCallback ,
12- } ;
10+ use crate :: common:: { TestCounter , build_provider, deploy_counter, spawn_anvil} ;
1311use alloy:: {
1412 eips:: BlockNumberOrTag , network:: Ethereum , providers:: ext:: AnvilApi , sol_types:: SolEvent ,
1513} ;
16- use event_scanner:: { event_filter:: EventFilter , event_scanner:: EventScannerBuilder } ;
14+ use event_scanner:: { event_filter:: EventFilter , event_scanner:: EventScanner } ;
1715
1816#[ tokio:: test]
1917async fn reorg_rescans_events_with_rewind_depth ( ) -> anyhow:: Result < ( ) > {
@@ -22,21 +20,17 @@ async fn reorg_rescans_events_with_rewind_depth() -> anyhow::Result<()> {
2220
2321 let contract = deploy_counter ( provider. clone ( ) ) . await ?;
2422 let contract_address = * contract. address ( ) ;
25- let blocks = Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ;
26- let callback = Arc :: new ( BlockOrderingCallback { blocks : Arc :: clone ( & blocks) } ) ;
23+
2724 let filter = EventFilter {
2825 contract_address : Some ( contract_address) ,
2926 event : Some ( TestCounter :: CountIncreased :: SIGNATURE . to_owned ( ) ) ,
30- callback,
3127 } ;
3228
33- let mut scanner = EventScannerBuilder :: new ( )
34- . with_event_filter ( filter)
35- . with_reorg_rewind_depth ( 6 )
36- . connect_ws :: < Ethereum > ( anvil. ws_endpoint_url ( ) )
37- . await ?;
29+ let mut client = EventScanner :: new ( ) . connect_ws :: < Ethereum > ( anvil. ws_endpoint_url ( ) ) . await ?;
30+
31+ let mut stream = client. create_event_stream ( filter) ;
3832
39- tokio:: spawn ( async move { scanner . start ( BlockNumberOrTag :: Latest , None ) . await } ) ;
33+ tokio:: spawn ( async move { client . start_scanner ( BlockNumberOrTag :: Latest , None ) . await } ) ;
4034
4135 let mut expected_event_block_numbers = vec ! [ ] ;
4236
@@ -71,29 +65,22 @@ async fn reorg_rescans_events_with_rewind_depth() -> anyhow::Result<()> {
7165 . unwrap ( ) ;
7266 assert_eq ! ( new_block. transactions. len( ) , num_new_events) ;
7367
74- let event_blocks_clone = Arc :: clone ( & blocks) ;
75-
76- let post_reorg_processing = async move {
77- loop {
78- let blocks = event_blocks_clone. lock ( ) . await ;
79- if blocks. len ( ) >= initial_events + num_new_events {
80- break ;
68+ let event_block_count = Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ;
69+ let event_block_count_clone = Arc :: clone ( & event_block_count) ;
70+ let event_counting = async move {
71+ while let Some ( Ok ( logs) ) = stream. next ( ) . await {
72+ let mut guard = event_block_count_clone. lock ( ) . await ;
73+ for log in logs {
74+ if let Some ( n) = log. block_number {
75+ guard. push ( n) ;
76+ }
8177 }
82- drop ( blocks) ;
83- tokio:: time:: sleep ( Duration :: from_millis ( 100 ) ) . await ;
8478 }
8579 } ;
8680
87- if tokio:: time:: timeout ( Duration :: from_secs ( 5 ) , post_reorg_processing) . await . is_err ( ) {
88- let current_len = blocks. lock ( ) . await . len ( ) ;
89- panic ! (
90- "Post-reorg events not rescanned in time. Expected at least {}, got {}" ,
91- initial_events + num_new_events,
92- current_len
93- ) ;
94- }
81+ _ = timeout ( Duration :: from_secs ( 2 ) , event_counting) . await ;
9582
96- let final_blocks: Vec < _ > = blocks . lock ( ) . await . clone ( ) ;
83+ let final_blocks: Vec < _ > = event_block_count . lock ( ) . await . clone ( ) ;
9784 assert ! ( final_blocks. len( ) == initial_events + num_new_events) ;
9885 assert_eq ! ( final_blocks, expected_event_block_numbers) ;
9986 // sanity check that the block number after the reorg is smaller than the previous block
0 commit comments