@@ -295,3 +295,182 @@ async fn scan_latest_ignores_reorg_and_returns_canonical_latest() -> anyhow::Res
295295
296296 Ok ( ( ) )
297297}
298+
299+ #[ tokio:: test]
300+ async fn scan_latest_mixed_events_and_filters_return_correct_streams ( ) -> anyhow:: Result < ( ) > {
301+ let setup = setup_scanner ( None , None , None ) . await ?;
302+ let contract = setup. contract ;
303+ let mut client = setup. client ;
304+ let mut inc_stream = setup. stream ; // CountIncreased by default
305+
306+ // Add a CountDecreased listener
307+ let filter_dec = EventFilter :: new ( )
308+ . with_contract_address ( * contract. address ( ) )
309+ . with_event ( TestCounter :: CountDecreased :: SIGNATURE ) ;
310+ let mut dec_stream = client. create_event_stream ( filter_dec) ;
311+
312+ // Sequence: inc(1), inc(2), dec(1), inc(2), dec(1)
313+ let mut inc_hashes: Vec < FixedBytes < 32 > > = Vec :: new ( ) ;
314+ let mut dec_hashes: Vec < FixedBytes < 32 > > = Vec :: new ( ) ;
315+
316+ // inc -> 1
317+ inc_hashes. push ( contract. increase ( ) . send ( ) . await ?. get_receipt ( ) . await ?. transaction_hash ) ;
318+ // inc -> 2
319+ inc_hashes. push ( contract. increase ( ) . send ( ) . await ?. get_receipt ( ) . await ?. transaction_hash ) ;
320+ // dec -> 1
321+ dec_hashes. push ( contract. decrease ( ) . send ( ) . await ?. get_receipt ( ) . await ?. transaction_hash ) ;
322+ // inc -> 2
323+ inc_hashes. push ( contract. increase ( ) . send ( ) . await ?. get_receipt ( ) . await ?. transaction_hash ) ;
324+ // dec -> 1
325+ dec_hashes. push ( contract. decrease ( ) . send ( ) . await ?. get_receipt ( ) . await ?. transaction_hash ) ;
326+
327+ client. scan_latest ( 5 , BlockNumberOrTag :: Earliest , BlockNumberOrTag :: Latest ) . await ?;
328+
329+ let inc_logs = collect_events ( & mut inc_stream) . await ;
330+ let dec_logs = collect_events ( & mut dec_stream) . await ;
331+
332+ assert_eq ! ( inc_logs. len( ) , 3 ) ;
333+ assert_eq ! ( dec_logs. len( ) , 2 ) ;
334+
335+ // Validate increases: counts [1,2,2] with matching hashes
336+ let expected_inc_counts = [ 1u64 , 2 , 2 ] ;
337+ for ( ( log, & expected_hash) , & expected_count) in
338+ inc_logs. iter ( ) . zip ( inc_hashes. iter ( ) ) . zip ( expected_inc_counts. iter ( ) )
339+ {
340+ let ev = log. log_decode :: < TestCounter :: CountIncreased > ( ) ?;
341+ assert_eq ! ( & ev. address( ) , contract. address( ) ) ;
342+ assert_eq ! ( ev. transaction_hash. unwrap( ) , expected_hash) ;
343+ assert_eq ! ( ev. inner. newCount, U256 :: from( expected_count) ) ;
344+ }
345+
346+ // Validate decreases: counts [1,1] with matching hashes
347+ let expected_dec_counts = [ 1u64 , 1 ] ;
348+ for ( ( log, & expected_hash) , & expected_count) in
349+ dec_logs. iter ( ) . zip ( dec_hashes. iter ( ) ) . zip ( expected_dec_counts. iter ( ) )
350+ {
351+ let ev = log. log_decode :: < TestCounter :: CountDecreased > ( ) ?;
352+ assert_eq ! ( & ev. address( ) , contract. address( ) ) ;
353+ assert_eq ! ( ev. transaction_hash. unwrap( ) , expected_hash) ;
354+ assert_eq ! ( ev. inner. newCount, U256 :: from( expected_count) ) ;
355+ }
356+
357+ Ok ( ( ) )
358+ }
359+
360+ #[ tokio:: test]
361+ async fn scan_latest_cross_contract_filtering ( ) -> anyhow:: Result < ( ) > {
362+ // Manual setup to deploy two contracts
363+ let anvil = spawn_anvil ( None ) ?;
364+ let provider: RootProvider = build_provider ( & anvil) . await ?;
365+ let contract_a = deploy_counter ( Arc :: new ( provider. clone ( ) ) ) . await ?;
366+ let contract_b = deploy_counter ( Arc :: new ( provider. clone ( ) ) ) . await ?;
367+
368+ // Listener only for contract A CountIncreased
369+ let filter_a = EventFilter :: new ( )
370+ . with_contract_address ( * contract_a. address ( ) )
371+ . with_event ( TestCounter :: CountIncreased :: SIGNATURE ) ;
372+
373+ let mut client = event_scanner:: event_scanner:: EventScanner :: new ( )
374+ . connect_ws :: < Ethereum > ( anvil. ws_endpoint_url ( ) )
375+ . await ?;
376+ let mut stream_a = client. create_event_stream ( filter_a) ;
377+
378+ // Emit interleaved events from A and B: A(1), B(1), A(2), B(2), A(3)
379+ let mut a_hashes: Vec < FixedBytes < 32 > > = Vec :: new ( ) ;
380+ a_hashes. push ( contract_a. increase ( ) . send ( ) . await ?. get_receipt ( ) . await ?. transaction_hash ) ;
381+ let _ = contract_b. increase ( ) . send ( ) . await ?. get_receipt ( ) . await ?; // ignored by filter
382+ a_hashes. push ( contract_a. increase ( ) . send ( ) . await ?. get_receipt ( ) . await ?. transaction_hash ) ;
383+ let _ = contract_b. increase ( ) . send ( ) . await ?. get_receipt ( ) . await ?; // ignored by filter
384+ a_hashes. push ( contract_a. increase ( ) . send ( ) . await ?. get_receipt ( ) . await ?. transaction_hash ) ;
385+
386+ client. scan_latest ( 5 , BlockNumberOrTag :: Earliest , BlockNumberOrTag :: Latest ) . await ?;
387+
388+ let logs_a = collect_events ( & mut stream_a) . await ;
389+ assert_eq ! ( logs_a. len( ) , 3 ) ;
390+
391+ // Validate only contract A logs with counts 1,2,3
392+ for ( ( log, & expected_hash) , expected_count) in logs_a. iter ( ) . zip ( a_hashes. iter ( ) ) . zip ( 1u64 ..=3 )
393+ {
394+ let ev = log. log_decode :: < TestCounter :: CountIncreased > ( ) ?;
395+ assert_eq ! ( & ev. address( ) , contract_a. address( ) ) ;
396+ assert_eq ! ( ev. transaction_hash. unwrap( ) , expected_hash) ;
397+ assert_eq ! ( ev. inner. newCount, U256 :: from( expected_count) ) ;
398+ }
399+
400+ Ok ( ( ) )
401+ }
402+
403+ #[ tokio:: test]
404+ async fn scan_latest_large_gaps_and_empty_ranges ( ) -> anyhow:: Result < ( ) > {
405+ // Manual setup to mine empty blocks
406+ let anvil = spawn_anvil ( None ) ?;
407+ let provider: RootProvider = build_provider ( & anvil) . await ?;
408+ let contract = deploy_counter ( Arc :: new ( provider. clone ( ) ) ) . await ?;
409+
410+ let filter = EventFilter :: new ( )
411+ . with_contract_address ( * contract. address ( ) )
412+ . with_event ( TestCounter :: CountIncreased :: SIGNATURE ) ;
413+
414+ let mut client = event_scanner:: event_scanner:: EventScanner :: new ( )
415+ . connect_ws :: < Ethereum > ( anvil. ws_endpoint_url ( ) )
416+ . await ?;
417+ let mut stream = client. create_event_stream ( filter) ;
418+
419+ // Emit 2 events
420+ let mut hashes: Vec < FixedBytes < 32 > > = Vec :: new ( ) ;
421+ for _ in 0 ..2u8 {
422+ hashes. push ( contract. increase ( ) . send ( ) . await ?. get_receipt ( ) . await ?. transaction_hash ) ;
423+ }
424+ // Mine 10 empty blocks
425+ provider. anvil_mine ( Some ( 10 ) , None ) . await ?;
426+ // Emit 1 more event
427+ hashes. push ( contract. increase ( ) . send ( ) . await ?. get_receipt ( ) . await ?. transaction_hash ) ;
428+
429+ let head = provider. get_block_number ( ) . await ?;
430+ let start = BlockNumberOrTag :: from ( head - 12 ) ;
431+ let end = BlockNumberOrTag :: from ( head) ;
432+
433+ client. scan_latest ( 5 , start, end) . await ?;
434+ let logs = collect_events ( & mut stream) . await ;
435+
436+ assert_eq ! ( logs. len( ) , 3 ) ;
437+ // Expect counts 1,2,3 and hashes in order
438+ assert_ordering ( logs, 1 , hashes, contract. address ( ) ) ;
439+
440+ Ok ( ( ) )
441+ }
442+
443+ #[ tokio:: test]
444+ async fn scan_latest_boundary_range_single_block ( ) -> anyhow:: Result < ( ) > {
445+ let setup = setup_scanner ( None , None , None ) . await ?;
446+ let provider = setup. provider ;
447+ let contract = setup. contract ;
448+ let client = setup. client ;
449+ let mut stream = setup. stream ;
450+
451+ // Each tx auto-mines a block: we will target the middle block specifically
452+ let mut receipt_blocks: Vec < ( FixedBytes < 32 > , u64 ) > = Vec :: new ( ) ;
453+ for _ in 0 ..3u8 {
454+ let r = contract. increase ( ) . send ( ) . await ?. get_receipt ( ) . await ?;
455+ // fetch the mined block number from provider to be exact
456+ let tx = provider. get_transaction_by_hash ( r. transaction_hash ) . await ?. unwrap ( ) ;
457+ let block_num = tx. block_number . unwrap ( ) ;
458+ receipt_blocks. push ( ( r. transaction_hash , block_num) ) ;
459+ }
460+
461+ // Pick the middle tx's block number
462+ let ( _mid_hash, mid_block) = receipt_blocks[ 1 ] ;
463+ let start = BlockNumberOrTag :: from ( mid_block) ;
464+ let end = BlockNumberOrTag :: from ( mid_block) ;
465+
466+ client. scan_latest ( 5 , start, end) . await ?;
467+ let logs = collect_events ( & mut stream) . await ;
468+
469+ // Expect exactly the middle event only, with count 2
470+ assert_eq ! ( logs. len( ) , 1 ) ;
471+ let ev = logs[ 0 ] . log_decode :: < TestCounter :: CountIncreased > ( ) ?;
472+ assert_eq ! ( & ev. address( ) , contract. address( ) ) ;
473+ assert_eq ! ( ev. inner. newCount, U256 :: from( 2u64 ) ) ;
474+
475+ Ok ( ( ) )
476+ }
0 commit comments