@@ -20,7 +20,7 @@ use tokio::{
2020 task:: JoinSet ,
2121} ;
2222use tokio_stream:: { Stream , wrappers:: ReceiverStream } ;
23- use tracing:: { debug, error, info, warn} ;
23+ use tracing:: { debug, error, info, trace , warn} ;
2424
2525#[ derive( Copy , Clone , Debug ) ]
2626pub ( crate ) enum ConsumerMode {
@@ -66,9 +66,13 @@ pub(crate) async fn handle_stream<N: Network, S: Stream<Item = BlockScannerResul
6666 & range_tx,
6767 max_concurrent_fetches,
6868 ) ,
69- ConsumerMode :: CollectLatest { count } => {
70- spawn_log_consumers_in_collection_mode ( provider, listeners, & range_tx, count)
71- }
69+ ConsumerMode :: CollectLatest { count } => spawn_log_consumers_in_collection_mode (
70+ provider,
71+ listeners,
72+ & range_tx,
73+ count,
74+ max_concurrent_fetches,
75+ ) ,
7276 } ;
7377
7478 while let Some ( message) = stream. next ( ) . await {
@@ -170,6 +174,7 @@ fn spawn_log_consumers_in_collection_mode<N: Network>(
170174 listeners : & [ EventListener ] ,
171175 range_tx : & Sender < BlockScannerResult > ,
172176 count : usize ,
177+ max_concurrent_fetches : usize ,
173178) -> JoinSet < ( ) > {
174179 listeners. iter ( ) . cloned ( ) . fold ( JoinSet :: new ( ) , |mut set, listener| {
175180 let EventListener { filter, sender } = listener;
@@ -179,123 +184,164 @@ fn spawn_log_consumers_in_collection_mode<N: Network>(
179184 let mut range_rx = range_tx. subscribe ( ) ;
180185
181186 set. spawn ( async move {
182- // Only used for CollectLatest
183- let mut collected = Vec :: with_capacity ( count) ;
184-
185- // Tracks common ancestor block during reorg recovery for proper log ordering
186- let mut reorg_ancestor: Option < u64 > = None ;
187+ // We use a channel and convert the receiver to a stream because it already has a
188+ // convenience function `buffered` for concurrently handling block ranges, while
189+ // outputting results in the same order as they were received.
190+ let ( tx, rx) = mpsc:: channel :: < BlockScannerResult > ( max_concurrent_fetches) ;
187191
188- loop {
189- match range_rx. recv ( ) . await {
190- Ok ( message) => match message {
192+ // Process block ranges concurrently in a separate thread so that the current thread can
193+ // continue receiving and buffering subsequent block ranges while the previous ones are
194+ // being processed.
195+ tokio:: spawn ( async move {
196+ let mut stream = ReceiverStream :: new ( rx)
197+ . map ( async |message| match message {
191198 Ok ( ScannerMessage :: Data ( range) ) => {
192- let range_end = * range. end ( ) ;
193- match get_logs ( range, & filter, & base_filter, & provider) . await {
194- Ok ( logs) => {
195- if logs. is_empty ( ) {
196- continue ;
197- }
198-
199- // Check if in reorg recovery and past the reorg range
200- if reorg_ancestor. is_some_and ( |a| range_end <= a) {
201- info ! (
202- ancestor = reorg_ancestor,
203- range_end = range_end,
204- "Reorg recovery complete, resuming normal log collection"
205- ) ;
206- reorg_ancestor = None ;
207- }
208-
209- let should_prepend = reorg_ancestor. is_some ( ) ;
210- if collect_logs ( & mut collected, logs, count, should_prepend) {
211- break ;
212- }
213- }
214- Err ( e) => {
215- error ! ( error = ?e, "Received error message" ) ;
216- if !sender. try_stream ( e) . await {
217- return ; // channel closed
218- }
219- }
199+ get_logs ( range, & filter, & base_filter, & provider)
200+ . await
201+ . map ( Message :: from)
202+ . map_err ( ScannerError :: from)
203+ }
204+ Ok ( ScannerMessage :: Notification ( notification) ) => Ok ( notification. into ( ) ) ,
205+ // No need to stop the stream on an error, because that decision is up to
206+ // the caller.
207+ Err ( e) => Err ( e) ,
208+ } )
209+ . buffered ( max_concurrent_fetches) ;
210+
211+ let mut collected = Vec :: with_capacity ( count) ;
212+
213+ // Tracks common ancestor block during reorg recovery for proper log ordering
214+ let mut reorg_ancestor: Option < u64 > = None ;
215+
216+ // process all of the buffered results
217+ while let Some ( result) = stream. next ( ) . await {
218+ match result {
219+ Ok ( ScannerMessage :: Data ( logs) ) => {
220+ if logs. is_empty ( ) {
221+ continue ;
222+ }
223+
224+ let last_log_block_num = logs
225+ . last ( )
226+ . expect ( "logs is not empty" )
227+ . block_number
228+ . expect ( "pending blocks not supported" ) ;
229+ // Check if in reorg recovery and past the reorg range
230+ if reorg_ancestor. is_some_and ( |a| last_log_block_num <= a) {
231+ debug ! (
232+ ancestor = reorg_ancestor,
233+ "Reorg recovery complete, resuming normal log collection"
234+ ) ;
235+ reorg_ancestor = None ;
236+ }
237+
238+ let should_prepend = reorg_ancestor. is_some ( ) ;
239+ if collect_logs ( & mut collected, logs, count, should_prepend) {
240+ break ;
220241 }
221242 }
243+
222244 Ok ( ScannerMessage :: Notification ( Notification :: ReorgDetected {
223245 common_ancestor,
224246 } ) ) => {
225- info ! (
247+ debug ! (
226248 common_ancestor = common_ancestor,
227249 "Received ReorgDetected notification"
228250 ) ;
229-
230- // Invalidate logs from reorged blocks
231- // Logs are ordered newest -> oldest, so skip logs with
232- // block_number > common_ancestor at the front
233- // NOTE: Pending logs are not supported therefore this filter
234- // works for now (may need to update once they are). Tracked in
235- // <https://github.com/OpenZeppelin/Event-Scanner/issues/244>
236- let before_count = collected. len ( ) ;
237- collected = collected
238- . into_iter ( )
239- . skip_while ( |log| {
240- // Pending blocks aren't supported therefore this filter
241- // works for now (may need to update once they are).
242- // Tracked in <https://github.com/OpenZeppelin/Event-Scanner/issues/244>
243- log. block_number . is_some_and ( |n| n > common_ancestor)
244- } )
245- . collect ( ) ;
246- let removed_count = before_count - collected. len ( ) ;
247- if removed_count > 0 {
248- info ! (
249- removed_count = removed_count,
250- remaining_count = collected. len( ) ,
251- "Invalidated logs from reorged blocks"
252- ) ;
253- }
254-
255251 // Track reorg state for proper log ordering
256252 reorg_ancestor = Some ( common_ancestor) ;
257253
254+ collected =
255+ discard_logs_from_orphaned_blocks ( collected, common_ancestor) ;
256+
258257 // Don't forward the notification to the user in CollectLatest mode
259258 // since logs haven't been sent yet
260259 }
261260 Ok ( ScannerMessage :: Notification ( notification) ) => {
262- info ! ( notification = ?notification, "Received notification" ) ;
261+ debug ! ( notification = ?notification, "Received notification" ) ;
263262 if !sender. try_stream ( notification) . await {
264263 return ;
265264 }
266265 }
267266 Err ( e) => {
268- error ! ( error = ?e, "Received error message" ) ;
269267 if !sender. try_stream ( e) . await {
270268 return ;
271269 }
272270 }
273- } ,
271+ }
272+ }
273+
274+ if collected. is_empty ( ) {
275+ debug ! ( "No logs found" ) ;
276+ _ = sender. try_stream ( Notification :: NoPastLogsFound ) . await ;
277+ return ;
278+ }
279+
280+ trace ! ( count = collected. len( ) , "Logs found" ) ;
281+ collected. reverse ( ) ; // restore chronological order
282+
283+ trace ! ( "Sending collected logs to consumer" ) ;
284+ _ = sender. try_stream ( collected) . await ;
285+ } ) ;
286+
287+ // Receive block ranges from the broadcast channel and send them to the range processor
288+ // for parallel processing.
289+ loop {
290+ match range_rx. recv ( ) . await {
291+ Ok ( message) => {
292+ if tx. send ( message) . await . is_err ( ) {
293+ // range processor has streamed the expected number of logs, stop
294+ // sending ranges
295+ break ;
296+ }
297+ }
274298 Err ( RecvError :: Closed ) => {
275- info ! ( "No block ranges to receive, dropping receiver. " ) ;
299+ debug ! ( "No more block ranges to receive" ) ;
276300 break ;
277301 }
278- Err ( RecvError :: Lagged ( _) ) => { }
302+ Err ( RecvError :: Lagged ( skipped) ) => {
303+ debug ! ( "Channel lagged, skipped {skipped} messages" ) ;
304+ }
279305 }
280306 }
281307
282- if collected. is_empty ( ) {
283- info ! ( "No logs found" ) ;
284- _ = sender. try_stream ( Notification :: NoPastLogsFound ) . await ;
285- return ;
286- }
287-
288- info ! ( count = collected. len( ) , "Logs found" ) ;
289- collected. reverse ( ) ; // restore chronological order
290-
291- info ! ( "Sending collected logs to consumer" ) ;
292- _ = sender. try_stream ( collected) . await ;
308+ // Drop the local channel sender to signal to the range processor that streaming is
309+ // done.
310+ drop ( tx) ;
293311 } ) ;
294312
295313 set
296314 } )
297315}
298316
317+ fn discard_logs_from_orphaned_blocks ( collected : Vec < Log > , common_ancestor : u64 ) -> Vec < Log > {
318+ // Invalidate logs from reorged blocks
319+ // Logs are ordered newest -> oldest, so skip logs with
320+ // block_number > common_ancestor at the front
321+ // NOTE: Pending logs are not supported therefore this filter
322+ // works for now (may need to update once they are). Tracked in
323+ // <https://github.com/OpenZeppelin/Event-Scanner/issues/244>
324+ let before_count = collected. len ( ) ;
325+ let collected = collected
326+ . into_iter ( )
327+ . skip_while ( |log| {
328+ // Pending blocks aren't supported therefore this filter
329+ // works for now (may need to update once they are).
330+ // Tracked in <https://github.com/OpenZeppelin/Event-Scanner/issues/244>
331+ log. block_number . is_some_and ( |n| n > common_ancestor)
332+ } )
333+ . collect :: < Vec < _ > > ( ) ;
334+ let removed_count = before_count - collected. len ( ) ;
335+ if removed_count > 0 {
336+ debug ! (
337+ removed_count = removed_count,
338+ remaining_count = collected. len( ) ,
339+ "Invalidated logs from reorged blocks"
340+ ) ;
341+ }
342+ collected
343+ }
344+
299345/// Collects logs into the buffer, either prepending (reorg recovery) or appending (normal).
300346/// Returns `true` if collection is complete (reached count limit).
301347fn collect_logs < T > ( collected : & mut Vec < T > , logs : Vec < T > , count : usize , prepend : bool ) -> bool {
0 commit comments