Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ pub enum ScannerError {

#[error("Subscription closed")]
SubscriptionClosed,

/// A subscription consumer could not keep up and some internal messages were skipped.
///
/// The contained value is the number of skipped messages reported by the underlying channel.
/// After emitting this error, the subscription stream may continue with newer items.
#[error("Subscription lagged")]
Lagged(u64),
}

impl From<RobustProviderError> for ScannerError {
Expand Down
70 changes: 68 additions & 2 deletions src/event_scanner/scanner/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,9 @@
break;
}
Err(RecvError::Lagged(skipped)) => {
debug!("Channel lagged, skipped {skipped} messages");
tx.send(Err(ScannerError::Lagged(skipped)))
.await
.expect("receiver dropped only if we exit this loop");
}
}
}
Expand All @@ -169,7 +171,7 @@
}

#[must_use]
fn spawn_log_consumers_in_collection_mode<N: Network>(

Check failure on line 174 in src/event_scanner/scanner/common.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

this function has too many lines (101/100)
provider: &RobustProvider<N>,
listeners: &[EventListener],
range_tx: &Sender<BlockScannerResult>,
Expand Down Expand Up @@ -300,7 +302,9 @@
break;
}
Err(RecvError::Lagged(skipped)) => {
debug!("Channel lagged, skipped {skipped} messages");
tx.send(Err(ScannerError::Lagged(skipped)))
.await
.expect("receiver dropped only if we exit this loop");
}
}
}
Expand Down Expand Up @@ -406,6 +410,14 @@

#[cfg(test)]
mod tests {
use alloy::{
network::Ethereum,
providers::{RootProvider, mock::Asserter},
rpc::client::RpcClient,
};

use crate::robust_provider::RobustProviderBuilder;

use super::*;

#[test]
Expand Down Expand Up @@ -505,4 +517,58 @@

assert_eq!(collected, vec![95, 90, 85]);
}

#[tokio::test]
async fn spawn_log_consumers_in_stream_mode_streams_lagged_error() -> anyhow::Result<()> {
let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
let provider = RobustProviderBuilder::fragile(provider).build().await?;

let (range_tx, _) = tokio::sync::broadcast::channel::<BlockScannerResult>(1);
let (sender, mut receiver) = mpsc::channel(1);
let listeners = &[EventListener { filter: EventFilter::new(), sender }];
let max_concurrent_fetches = 1;

let _set = spawn_log_consumers_in_stream_mode(
&provider,
listeners,
&range_tx,
max_concurrent_fetches,
);

range_tx.send(Ok(ScannerMessage::Data(0..=1)))?;
// the next range "overfills" the channel, causing a lag
range_tx.send(Ok(ScannerMessage::Data(2..=3)))?;

assert!(matches!(receiver.recv().await.unwrap(), Err(ScannerError::Lagged(1))));

Ok(())
}

#[tokio::test]
async fn spawn_log_consumers_in_collection_mode_streams_lagged_error() -> anyhow::Result<()> {
let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
let provider = RobustProviderBuilder::fragile(provider).build().await?;

let (range_tx, _) = tokio::sync::broadcast::channel::<BlockScannerResult>(1);
let (sender, mut receiver) = mpsc::channel(1);
let listeners = &[EventListener { filter: EventFilter::new(), sender }];
let count = 5;
let max_concurrent_fetches = 1;

let _set = spawn_log_consumers_in_collection_mode(
&provider,
listeners,
&range_tx,
count,
max_concurrent_fetches,
);

range_tx.send(Ok(ScannerMessage::Data(2..=3)))?;
// the next range "overfills" the channel, causing a lag
range_tx.send(Ok(ScannerMessage::Data(0..=1)))?;

assert!(matches!(receiver.recv().await.unwrap(), Err(ScannerError::Lagged(1))));

Ok(())
}
}
Loading