1- use std:: sync:: Arc ;
1+ use std:: { ops :: RangeInclusive , sync:: Arc } ;
22
33use crate :: {
44 block_range_scanner:: {
@@ -12,7 +12,7 @@ use crate::{
1212use alloy:: {
1313 eips:: BlockNumberOrTag ,
1414 network:: Network ,
15- providers:: Provider ,
15+ providers:: { Provider , RootProvider } ,
1616 rpc:: types:: { Filter , Log } ,
1717 transports:: { RpcError , TransportErrorKind , http:: reqwest:: Url } ,
1818} ;
@@ -158,63 +158,7 @@ impl<N: Network> ConnectedEventScanner<N> {
158158 loop {
159159 match sub. recv ( ) . await {
160160 Ok ( BlockRangeMessage :: Data ( range) ) => {
161- let ( from_block, to_block) = ( * range. start ( ) , * range. end ( ) ) ;
162-
163- let mut log_filter =
164- Filter :: new ( ) . from_block ( from_block) . to_block ( to_block) ;
165-
166- if let Some ( contract_address) = filter. contract_address {
167- log_filter = log_filter. address ( contract_address) ;
168- }
169-
170- if let Some ( ref event_signature) = filter. event {
171- log_filter = log_filter. event ( event_signature. as_str ( ) ) ;
172- }
173-
174- let contract_display = filter. contract_address . map_or_else (
175- || "all contracts" . to_string ( ) ,
176- |addr| format ! ( "{addr:?}" ) ,
177- ) ;
178- let event_display = filter. event . as_deref ( ) . map_or ( "all events" , |s| s) ;
179-
180- match provider. get_logs ( & log_filter) . await {
181- Ok ( logs) => {
182- if logs. is_empty ( ) {
183- continue ;
184- }
185-
186- info ! (
187- contract = %contract_display,
188- event = %event_display,
189- log_count = logs. len( ) ,
190- from_block,
191- to_block,
192- "found logs for event in block range"
193- ) ;
194-
195- if let Err ( e) =
196- sender. send ( EventScannerMessage :: Data ( logs) ) . await
197- {
198- error ! ( contract = %contract_display, event = %event_display, error = %e, "failed to enqueue log for processing" ) ;
199- }
200- }
201- Err ( e) => {
202- error ! (
203- contract = %contract_display,
204- event = %event_display,
205- error = %e,
206- from_block,
207- to_block,
208- "failed to get logs for block range"
209- ) ;
210-
211- if let Err ( send_err) =
212- sender. send ( EventScannerMessage :: Error ( e. into ( ) ) ) . await
213- {
214- error ! ( event = %event_display, error = %send_err, "failed to enqueue error for processing" ) ;
215- }
216- }
217- }
161+ Self :: process_range ( range, & filter, & provider, & sender) . await ;
218162 }
219163 Ok ( BlockRangeMessage :: Error ( e) ) => {
220164 if let Err ( err) = sender. send ( ScannerMessage :: Error ( e. into ( ) ) ) . await {
@@ -235,6 +179,77 @@ impl<N: Network> ConnectedEventScanner<N> {
235179 }
236180 }
237181
182+ async fn process_range (
183+ range : RangeInclusive < u64 > ,
184+ filter : & EventFilter ,
185+ provider : & RootProvider < N > ,
186+ sender : & mpsc:: Sender < EventScannerMessage > ,
187+ ) {
188+ let ( from_block, to_block) = ( * range. start ( ) , * range. end ( ) ) ;
189+
190+ let ( log_filter, contract_display, event_display) =
191+ Self :: build_log_filter ( from_block, to_block, filter) ;
192+
193+ match provider. get_logs ( & log_filter) . await {
194+ Ok ( logs) => {
195+ if logs. is_empty ( ) {
196+ return ;
197+ }
198+
199+ info ! (
200+ contract = %contract_display,
201+ event = %event_display,
202+ log_count = logs. len( ) ,
203+ from_block,
204+ to_block,
205+ "found logs for event in block range"
206+ ) ;
207+
208+ if let Err ( e) = sender. send ( EventScannerMessage :: Data ( logs) ) . await {
209+ error ! ( contract = %contract_display, event = %event_display, error = %e, "failed to enqueue log for processing" ) ;
210+ }
211+ }
212+ Err ( e) => {
213+ error ! (
214+ contract = %contract_display,
215+ event = %event_display,
216+ error = %e,
217+ from_block,
218+ to_block,
219+ "failed to get logs for block range"
220+ ) ;
221+
222+ if let Err ( send_err) = sender. send ( EventScannerMessage :: Error ( e. into ( ) ) ) . await {
223+ error ! ( event = %event_display, error = %send_err, "failed to enqueue error for processing" ) ;
224+ }
225+ }
226+ }
227+ }
228+
229+ fn build_log_filter (
230+ from_block : u64 ,
231+ to_block : u64 ,
232+ filter : & EventFilter ,
233+ ) -> ( Filter , String , String ) {
234+ let mut log_filter = Filter :: new ( ) . from_block ( from_block) . to_block ( to_block) ;
235+
236+ if let Some ( contract_address) = filter. contract_address {
237+ log_filter = log_filter. address ( contract_address) ;
238+ }
239+
240+ if let Some ( ref event_signature) = filter. event {
241+ log_filter = log_filter. event ( event_signature. as_str ( ) ) ;
242+ }
243+
244+ let contract_display = filter
245+ . contract_address
246+ . map_or_else ( || "all contracts" . to_string ( ) , |addr| format ! ( "{addr:?}" ) ) ;
247+
248+ let event_display = filter. event . as_deref ( ) . map_or ( "all events" , |s| s) . to_string ( ) ;
249+
250+ ( log_filter, contract_display, event_display)
251+ }
252+
238253 fn add_event_listener ( & mut self , event_listener : EventListener ) {
239254 self . event_listeners . push ( event_listener) ;
240255 }
0 commit comments