11use std:: ops:: RangeInclusive ;
22
33use crate :: {
4- Notification , ScannerMessage ,
4+ Message , Notification , ScannerError , ScannerMessage ,
55 block_range_scanner:: { BlockScannerResult , MAX_BUFFERED_MESSAGES } ,
66 event_scanner:: { filter:: EventFilter , listener:: EventListener } ,
77 robust_provider:: { RobustProvider , provider:: Error as RobustProviderError } ,
@@ -11,15 +11,19 @@ use alloy::{
1111 network:: Network ,
1212 rpc:: types:: { Filter , Log } ,
1313} ;
14+ use futures:: StreamExt ;
1415use tokio:: {
15- sync:: broadcast:: { self , Sender , error:: RecvError } ,
16+ sync:: {
17+ broadcast:: { self , Sender , error:: RecvError } ,
18+ mpsc,
19+ } ,
1620 task:: JoinSet ,
1721} ;
18- use tokio_stream:: { Stream , StreamExt } ;
19- use tracing:: { error, info, warn} ;
22+ use tokio_stream:: { Stream , wrappers :: ReceiverStream } ;
23+ use tracing:: { debug , error, info, warn} ;
2024
2125#[ derive( Copy , Clone , Debug ) ]
22- pub enum ConsumerMode {
26+ pub ( crate ) enum ConsumerMode {
2327 Stream ,
2428 CollectLatest { count : usize } ,
2529}
@@ -46,16 +50,22 @@ pub enum ConsumerMode {
4650/// # Note
4751///
4852/// Assumes it is running in a separate tokio task, so as to be non-blocking.
49- pub async fn handle_stream < N : Network , S : Stream < Item = BlockScannerResult > + Unpin > (
53+ pub ( crate ) async fn handle_stream < N : Network , S : Stream < Item = BlockScannerResult > + Unpin > (
5054 mut stream : S ,
5155 provider : & RobustProvider < N > ,
5256 listeners : & [ EventListener ] ,
5357 mode : ConsumerMode ,
58+ max_concurrent_fetches : usize ,
5459) {
5560 let ( range_tx, _) = broadcast:: channel :: < BlockScannerResult > ( MAX_BUFFERED_MESSAGES ) ;
5661
5762 let consumers = match mode {
58- ConsumerMode :: Stream => spawn_log_consumers_in_stream_mode ( provider, listeners, & range_tx) ,
63+ ConsumerMode :: Stream => spawn_log_consumers_in_stream_mode (
64+ provider,
65+ listeners,
66+ & range_tx,
67+ max_concurrent_fetches,
68+ ) ,
5969 ConsumerMode :: CollectLatest { count } => {
6070 spawn_log_consumers_in_collection_mode ( provider, listeners, & range_tx, count)
6171 }
@@ -76,10 +86,11 @@ pub async fn handle_stream<N: Network, S: Stream<Item = BlockScannerResult> + Un
7686}
7787
7888#[ must_use]
79- pub fn spawn_log_consumers_in_stream_mode < N : Network > (
89+ fn spawn_log_consumers_in_stream_mode < N : Network > (
8090 provider : & RobustProvider < N > ,
8191 listeners : & [ EventListener ] ,
8292 range_tx : & Sender < BlockScannerResult > ,
93+ max_concurrent_fetches : usize ,
8394) -> JoinSet < ( ) > {
8495 listeners. iter ( ) . cloned ( ) . fold ( JoinSet :: new ( ) , |mut set, listener| {
8596 let EventListener { filter, sender } = listener;
@@ -89,56 +100,72 @@ pub fn spawn_log_consumers_in_stream_mode<N: Network>(
89100 let mut range_rx = range_tx. subscribe ( ) ;
90101
91102 set. spawn ( async move {
92- loop {
93- match range_rx. recv ( ) . await {
94- Ok ( message) => match message {
103+ // We use a channel and convert the receiver to a stream because it already has a
104+ // convenience function `buffered` for concurrently handling block ranges, while
105+ // outputting results in the same order as they were received.
106+ let ( tx, rx) = mpsc:: channel :: < BlockScannerResult > ( max_concurrent_fetches) ;
107+
108+ // Process block ranges concurrently in a separate thread so that the current thread can
109+ // continue receiving and buffering subsequent block ranges while the previous ones are
110+ // being processed.
111+ tokio:: spawn ( async move {
112+ let mut stream = ReceiverStream :: new ( rx)
113+ . map ( async |message| match message {
95114 Ok ( ScannerMessage :: Data ( range) ) => {
96- match get_logs ( range, & filter, & base_filter, & provider) . await {
97- Ok ( logs) => {
98- if logs. is_empty ( ) {
99- continue ;
100- }
101-
102- if !sender. try_stream ( logs) . await {
103- return ;
104- }
105- }
106- Err ( e) => {
107- error ! ( error = ?e, "Received error message" ) ;
108- if !sender. try_stream ( e) . await {
109- return ;
110- }
111- }
112- }
113- }
114- Ok ( ScannerMessage :: Notification ( notification) ) => {
115- info ! ( notification = ?notification, "Received notification" ) ;
116- if !sender. try_stream ( notification) . await {
117- return ;
118- }
115+ get_logs ( range, & filter, & base_filter, & provider)
116+ . await
117+ . map ( Message :: from)
118+ . map_err ( ScannerError :: from)
119119 }
120- Err ( e) => {
121- error ! ( error = ?e, "Received error message" ) ;
122- if !sender. try_stream ( e) . await {
123- return ;
124- }
125- }
126- } ,
120+ Ok ( ScannerMessage :: Notification ( notification) ) => Ok ( notification. into ( ) ) ,
121+ // No need to stop the stream on an error, because that decision is up to
122+ // the caller.
123+ Err ( e) => Err ( e) ,
124+ } )
125+ . buffered ( max_concurrent_fetches) ;
126+
127+ // process all of the buffered results
128+ while let Some ( result) = stream. next ( ) . await {
129+ if let Ok ( ScannerMessage :: Data ( logs) ) = result. as_ref ( ) &&
130+ logs. is_empty ( )
131+ {
132+ continue ;
133+ }
134+
135+ if !sender. try_stream ( result) . await {
136+ return ;
137+ }
138+ }
139+ } ) ;
140+
141+ // Receive block ranges from the broadcast channel and send them to the range processor
142+ // for parallel processing.
143+ loop {
144+ match range_rx. recv ( ) . await {
145+ Ok ( message) => {
146+ tx. send ( message) . await . expect ( "receiver dropped only if we exit this loop" ) ;
147+ }
127148 Err ( RecvError :: Closed ) => {
128- info ! ( "No block ranges to receive, dropping receiver. " ) ;
149+ debug ! ( "No more block ranges to receive" ) ;
129150 break ;
130151 }
131- Err ( RecvError :: Lagged ( _) ) => { }
152+ Err ( RecvError :: Lagged ( skipped) ) => {
153+ debug ! ( "Channel lagged, skipped {skipped} messages" ) ;
154+ }
132155 }
133156 }
157+
158+ // Drop the local channel sender to signal to the range processor that streaming is
159+ // done.
160+ drop ( tx) ;
134161 } ) ;
135162
136163 set
137164 } )
138165}
139166
140167#[ must_use]
141- pub fn spawn_log_consumers_in_collection_mode < N : Network > (
168+ fn spawn_log_consumers_in_collection_mode < N : Network > (
142169 provider : & RobustProvider < N > ,
143170 listeners : & [ EventListener ] ,
144171 range_tx : & Sender < BlockScannerResult > ,
0 commit comments