Skip to content

Commit ff33fc8

Browse files
committed
fix: stream Lagged errors up to the caller
1 parent e3abf2d commit ff33fc8

File tree

2 files changed

+71
-2
lines changed

2 files changed

+71
-2
lines changed

src/error.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,9 @@ pub enum ScannerError {
3636

3737
#[error("Subscription closed")]
3838
SubscriptionClosed,
39+
40+
#[error("Subscription lagged")]
41+
Lagged(u64),
3942
}
4043

4144
impl From<RobustProviderError> for ScannerError {

src/event_scanner/scanner/common.rs

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,9 @@ fn spawn_log_consumers_in_stream_mode<N: Network>(
154154
break;
155155
}
156156
Err(RecvError::Lagged(skipped)) => {
157-
debug!("Channel lagged, skipped {skipped} messages");
157+
tx.send(Err(ScannerError::Lagged(skipped)))
158+
.await
159+
.expect("receiver dropped only if we exit this loop");
158160
}
159161
}
160162
}
@@ -300,7 +302,9 @@ fn spawn_log_consumers_in_collection_mode<N: Network>(
300302
break;
301303
}
302304
Err(RecvError::Lagged(skipped)) => {
303-
debug!("Channel lagged, skipped {skipped} messages");
305+
tx.send(Err(ScannerError::Lagged(skipped)))
306+
.await
307+
.expect("receiver dropped only if we exit this loop");
304308
}
305309
}
306310
}
@@ -406,6 +410,14 @@ async fn get_logs<N: Network>(
406410

407411
#[cfg(test)]
408412
mod tests {
413+
use alloy::{
414+
network::Ethereum,
415+
providers::{RootProvider, mock::Asserter},
416+
rpc::client::RpcClient,
417+
};
418+
419+
use crate::robust_provider::RobustProviderBuilder;
420+
409421
use super::*;
410422

411423
#[test]
@@ -505,4 +517,58 @@ mod tests {
505517

506518
assert_eq!(collected, vec![95, 90, 85]);
507519
}
520+
521+
#[tokio::test]
522+
async fn spawn_log_consumers_in_stream_mode_streams_lagged_error() -> anyhow::Result<()> {
523+
let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
524+
let provider = RobustProviderBuilder::fragile(provider).build().await?;
525+
526+
let (range_tx, _) = tokio::sync::broadcast::channel::<BlockScannerResult>(1);
527+
let (sender, mut receiver) = mpsc::channel(1);
528+
let listeners = &[EventListener { filter: EventFilter::new(), sender }];
529+
let max_concurrent_fetches = 1;
530+
531+
let _set = spawn_log_consumers_in_stream_mode(
532+
&provider,
533+
listeners,
534+
&range_tx,
535+
max_concurrent_fetches,
536+
);
537+
538+
range_tx.send(Ok(ScannerMessage::Data(0..=1)))?;
539+
// the next range "overfills" the channel, causing a lag
540+
range_tx.send(Ok(ScannerMessage::Data(2..=3)))?;
541+
542+
assert!(matches!(receiver.recv().await.unwrap(), Err(ScannerError::Lagged(1))));
543+
544+
Ok(())
545+
}
546+
547+
#[tokio::test]
548+
async fn spawn_log_consumers_in_collection_mode_streams_lagged_error() -> anyhow::Result<()> {
549+
let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
550+
let provider = RobustProviderBuilder::fragile(provider).build().await?;
551+
552+
let (range_tx, _) = tokio::sync::broadcast::channel::<BlockScannerResult>(1);
553+
let (sender, mut receiver) = mpsc::channel(1);
554+
let listeners = &[EventListener { filter: EventFilter::new(), sender }];
555+
let count = 5;
556+
let max_concurrent_fetches = 1;
557+
558+
let _set = spawn_log_consumers_in_collection_mode(
559+
&provider,
560+
listeners,
561+
&range_tx,
562+
count,
563+
max_concurrent_fetches,
564+
);
565+
566+
range_tx.send(Ok(ScannerMessage::Data(2..=3)))?;
567+
// the next range "overfills" the channel, causing a lag
568+
range_tx.send(Ok(ScannerMessage::Data(0..=1)))?;
569+
570+
assert!(matches!(receiver.recv().await.unwrap(), Err(ScannerError::Lagged(1))));
571+
572+
Ok(())
573+
}
508574
}

0 commit comments

Comments
 (0)