diff --git a/src/error.rs b/src/error.rs index be57f926..a55edece 100644 --- a/src/error.rs +++ b/src/error.rs @@ -36,6 +36,13 @@ pub enum ScannerError { #[error("Subscription closed")] SubscriptionClosed, + + /// A subscription consumer could not keep up and some internal messages were skipped. + /// + /// The contained value is the number of skipped messages reported by the underlying channel. + /// After emitting this error, the subscription stream may continue with newer items. + #[error("Subscription lagged")] + Lagged(u64), } impl From for ScannerError { diff --git a/src/event_scanner/scanner/common.rs b/src/event_scanner/scanner/common.rs index 605421a2..3a7b328c 100644 --- a/src/event_scanner/scanner/common.rs +++ b/src/event_scanner/scanner/common.rs @@ -154,7 +154,9 @@ fn spawn_log_consumers_in_stream_mode( break; } Err(RecvError::Lagged(skipped)) => { - debug!("Channel lagged, skipped {skipped} messages"); + tx.send(Err(ScannerError::Lagged(skipped))) + .await + .expect("receiver dropped only if we exit this loop"); } } } @@ -300,7 +302,9 @@ fn spawn_log_consumers_in_collection_mode( break; } Err(RecvError::Lagged(skipped)) => { - debug!("Channel lagged, skipped {skipped} messages"); + tx.send(Err(ScannerError::Lagged(skipped))) + .await + .expect("receiver dropped only if we exit this loop"); } } } @@ -406,6 +410,14 @@ async fn get_logs( #[cfg(test)] mod tests { + use alloy::{ + network::Ethereum, + providers::{RootProvider, mock::Asserter}, + rpc::client::RpcClient, + }; + + use crate::robust_provider::RobustProviderBuilder; + use super::*; #[test] @@ -505,4 +517,58 @@ mod tests { assert_eq!(collected, vec![95, 90, 85]); } + + #[tokio::test] + async fn spawn_log_consumers_in_stream_mode_streams_lagged_error() -> anyhow::Result<()> { + let provider = RootProvider::::new(RpcClient::mocked(Asserter::new())); + let provider = RobustProviderBuilder::fragile(provider).build().await?; + + let (range_tx, _) = tokio::sync::broadcast::channel::(1); + let (sender, mut receiver) = mpsc::channel(1); + let listeners = &[EventListener { filter: EventFilter::new(), sender }]; + let max_concurrent_fetches = 1; + + let _set = spawn_log_consumers_in_stream_mode( + &provider, + listeners, + &range_tx, + max_concurrent_fetches, + ); + + range_tx.send(Ok(ScannerMessage::Data(0..=1)))?; + // the next range "overfills" the channel, causing a lag + range_tx.send(Ok(ScannerMessage::Data(2..=3)))?; + + assert!(matches!(receiver.recv().await.unwrap(), Err(ScannerError::Lagged(1)))); + + Ok(()) + } + + #[tokio::test] + async fn spawn_log_consumers_in_collection_mode_streams_lagged_error() -> anyhow::Result<()> { + let provider = RootProvider::::new(RpcClient::mocked(Asserter::new())); + let provider = RobustProviderBuilder::fragile(provider).build().await?; + + let (range_tx, _) = tokio::sync::broadcast::channel::(1); + let (sender, mut receiver) = mpsc::channel(1); + let listeners = &[EventListener { filter: EventFilter::new(), sender }]; + let count = 5; + let max_concurrent_fetches = 1; + + let _set = spawn_log_consumers_in_collection_mode( + &provider, + listeners, + &range_tx, + count, + max_concurrent_fetches, + ); + + range_tx.send(Ok(ScannerMessage::Data(2..=3)))?; + // the next range "overfills" the channel, causing a lag + range_tx.send(Ok(ScannerMessage::Data(0..=1)))?; + + assert!(matches!(receiver.recv().await.unwrap(), Err(ScannerError::Lagged(1)))); + + Ok(()) + } }