@@ -7,10 +7,10 @@ use std::{
77} ;
88
99use crate :: common:: { TestCounter , build_provider, deploy_counter, spawn_anvil} ;
10- use alloy:: { eips:: BlockNumberOrTag , network:: Ethereum , rpc :: types :: Log , sol_types:: SolEvent } ;
10+ use alloy:: { eips:: BlockNumberOrTag , network:: Ethereum , sol_types:: SolEvent } ;
1111use event_scanner:: {
1212 event_filter:: EventFilter ,
13- event_scanner:: { EventScanner , EventScannerError , EventScannerMessage } ,
13+ event_scanner:: { EventScanner , EventScannerMessage } ,
1414} ;
1515use tokio:: time:: timeout;
1616use tokio_stream:: { StreamExt , wrappers:: ReceiverStream } ;
@@ -44,17 +44,18 @@ async fn basic_single_event_scanning() -> anyhow::Result<()> {
4444 let mut expected_new_count = 1 ;
4545 while let Some ( message) = stream. next ( ) . await {
4646 match message {
47- EventScannerMessage :: Logs ( logs) => {
47+ EventScannerMessage :: Message ( logs) => {
4848 event_count_clone. fetch_add ( logs. len ( ) , Ordering :: SeqCst ) ;
4949
5050 for log in logs {
51- let TestCounter :: CountIncreased { newCount } = log. log_decode ( ) . unwrap ( ) . inner . data ;
51+ let TestCounter :: CountIncreased { newCount } =
52+ log. log_decode ( ) . unwrap ( ) . inner . data ;
5253 assert_eq ! ( newCount, expected_new_count) ;
5354 expected_new_count += 1 ;
5455 }
5556 }
5657 EventScannerMessage :: Error ( e) => {
57- panic ! ( "Received error: {}" , e ) ;
58+ panic ! ( "panicked with error: {e}" ) ;
5859 }
5960 EventScannerMessage :: Info ( _) => {
6061 // Handle info if needed
@@ -103,42 +104,40 @@ async fn multiple_contracts_same_event_isolate_callbacks() -> anyhow::Result<()>
103104 b. increase ( ) . send ( ) . await ?. watch ( ) . await ?;
104105 }
105106
106- let make_assertion =
107- async |stream : ReceiverStream < EventScannerMessage > ,
108- expected_events| {
109- let mut stream = stream. take ( expected_events) ;
110-
111- let count = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
112- let count_clone = Arc :: clone ( & count) ;
113-
114- let event_counting = async move {
115- let mut expected_new_count = 1 ;
116- while let Some ( message) = stream. next ( ) . await {
117- match message {
118- EventScannerMessage :: Logs ( logs) => {
119- count_clone. fetch_add ( logs. len ( ) , Ordering :: SeqCst ) ;
120-
121- for log in logs {
122- let TestCounter :: CountIncreased { newCount } =
123- log. log_decode ( ) . unwrap ( ) . inner . data ;
124- assert_eq ! ( newCount, expected_new_count) ;
125- expected_new_count += 1 ;
126- }
127- }
128- EventScannerMessage :: Error ( e) => {
129- panic ! ( "Received error: {}" , e) ;
130- }
131- EventScannerMessage :: Info ( _) => {
132- // Handle info if needed
107+ let make_assertion = async |stream : ReceiverStream < EventScannerMessage > , expected_events| {
108+ let mut stream = stream. take ( expected_events) ;
109+
110+ let count = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
111+ let count_clone = Arc :: clone ( & count) ;
112+
113+ let event_counting = async move {
114+ let mut expected_new_count = 1 ;
115+ while let Some ( message) = stream. next ( ) . await {
116+ match message {
117+ EventScannerMessage :: Message ( logs) => {
118+ count_clone. fetch_add ( logs. len ( ) , Ordering :: SeqCst ) ;
119+
120+ for log in logs {
121+ let TestCounter :: CountIncreased { newCount } =
122+ log. log_decode ( ) . unwrap ( ) . inner . data ;
123+ assert_eq ! ( newCount, expected_new_count) ;
124+ expected_new_count += 1 ;
133125 }
134126 }
127+ EventScannerMessage :: Error ( e) => {
128+ panic ! ( "panicked with error: {e}" ) ;
129+ }
130+ EventScannerMessage :: Info ( _) => {
131+ // Handle info if needed
132+ }
135133 }
136- } ;
137-
138- _ = timeout ( Duration :: from_secs ( 1 ) , event_counting) . await ;
139- assert_eq ! ( count. load( Ordering :: SeqCst ) , expected_events) ;
134+ }
140135 } ;
141136
137+ _ = timeout ( Duration :: from_secs ( 1 ) , event_counting) . await ;
138+ assert_eq ! ( count. load( Ordering :: SeqCst ) , expected_events) ;
139+ } ;
140+
142141 make_assertion ( a_stream, expected_events_a) . await ;
143142 make_assertion ( b_stream, expected_events_b) . await ;
144143
@@ -189,17 +188,18 @@ async fn multiple_events_same_contract() -> anyhow::Result<()> {
189188 // process CountIncreased
190189 while let Some ( message) = incr_stream. next ( ) . await {
191190 match message {
192- EventScannerMessage :: Logs ( logs) => {
191+ EventScannerMessage :: Message ( logs) => {
193192 incr_count_clone. fetch_add ( logs. len ( ) , Ordering :: SeqCst ) ;
194193
195194 for log in logs {
196195 expected_new_count += 1 ;
197- let TestCounter :: CountIncreased { newCount } = log. log_decode ( ) . unwrap ( ) . inner . data ;
196+ let TestCounter :: CountIncreased { newCount } =
197+ log. log_decode ( ) . unwrap ( ) . inner . data ;
198198 assert_eq ! ( newCount, expected_new_count) ;
199199 }
200200 }
201201 EventScannerMessage :: Error ( e) => {
202- panic ! ( "Received error: {}" , e ) ;
202+ panic ! ( "panicked with error {e}" ) ;
203203 }
204204 EventScannerMessage :: Info ( _) => {
205205 // Handle info if needed
@@ -212,17 +212,18 @@ async fn multiple_events_same_contract() -> anyhow::Result<()> {
212212 // process CountDecreased
213213 while let Some ( message) = decr_stream. next ( ) . await {
214214 match message {
215- EventScannerMessage :: Logs ( logs) => {
215+ EventScannerMessage :: Message ( logs) => {
216216 decr_count_clone. fetch_add ( logs. len ( ) , Ordering :: SeqCst ) ;
217217
218218 for log in logs {
219- let TestCounter :: CountDecreased { newCount } = log. log_decode ( ) . unwrap ( ) . inner . data ;
219+ let TestCounter :: CountDecreased { newCount } =
220+ log. log_decode ( ) . unwrap ( ) . inner . data ;
220221 assert_eq ! ( newCount, expected_new_count) ;
221222 expected_new_count -= 1 ;
222223 }
223224 }
224225 EventScannerMessage :: Error ( e) => {
225- panic ! ( "Received error: {}" , e ) ;
226+ panic ! ( "panicked with error {e}" ) ;
226227 }
227228 EventScannerMessage :: Info ( _) => {
228229 // Handle info if needed
0 commit comments