Skip to content

Commit 289fc10

Browse files
committed
Revert "fix: stream Lagged errors up to the caller"
This reverts commit ec2d928.
1 parent 0953acb commit 289fc10

File tree

2 files changed

+2
-75
lines changed

2 files changed

+2
-75
lines changed

src/error.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,6 @@ pub enum ScannerError {
5252
/// A block subscription ended (for example, the underlying WebSocket subscription closed).
5353
#[error("Subscription closed")]
5454
SubscriptionClosed,
55-
56-
/// A subscription consumer could not keep up and some internal messages were skipped.
57-
///
58-
/// The contained value is the number of skipped messages reported by the underlying channel.
59-
/// After emitting this error, the subscription stream may continue with newer items.
60-
#[error("Subscription lagged")]
61-
Lagged(u64),
6255
}
6356

6457
impl From<RobustProviderError> for ScannerError {

src/event_scanner/scanner/common.rs

Lines changed: 2 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -156,9 +156,7 @@ fn spawn_log_consumers_in_stream_mode<N: Network>(
156156
break;
157157
}
158158
Err(RecvError::Lagged(skipped)) => {
159-
tx.send(Err(ScannerError::Lagged(skipped)))
160-
.await
161-
.expect("receiver dropped only if we exit this loop");
159+
debug!("Channel lagged, skipped {skipped} messages");
162160
}
163161
}
164162
}
@@ -302,9 +300,7 @@ fn spawn_log_consumers_in_collection_mode<N: Network>(
302300
break;
303301
}
304302
Err(RecvError::Lagged(skipped)) => {
305-
tx.send(Err(ScannerError::Lagged(skipped)))
306-
.await
307-
.expect("receiver dropped only if we exit this loop");
303+
debug!("Channel lagged, skipped {skipped} messages");
308304
}
309305
}
310306
}
@@ -410,14 +406,6 @@ async fn get_logs<N: Network>(
410406

411407
#[cfg(test)]
412408
mod tests {
413-
use alloy::{
414-
network::Ethereum,
415-
providers::{RootProvider, mock::Asserter},
416-
rpc::client::RpcClient,
417-
};
418-
419-
use crate::robust_provider::RobustProviderBuilder;
420-
421409
use super::*;
422410

423411
#[test]
@@ -517,58 +505,4 @@ mod tests {
517505

518506
assert_eq!(collected, vec![95, 90, 85]);
519507
}
520-
521-
#[tokio::test]
522-
async fn spawn_log_consumers_in_stream_mode_streams_lagged_error() -> anyhow::Result<()> {
523-
let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
524-
let provider = RobustProviderBuilder::fragile(provider).build().await?;
525-
526-
let (range_tx, _) = tokio::sync::broadcast::channel::<BlockScannerResult>(1);
527-
let (sender, mut receiver) = mpsc::channel(1);
528-
let listeners = &[EventListener { filter: EventFilter::new(), sender }];
529-
let max_concurrent_fetches = 1;
530-
531-
let _set = spawn_log_consumers_in_stream_mode(
532-
&provider,
533-
listeners,
534-
&range_tx,
535-
max_concurrent_fetches,
536-
);
537-
538-
range_tx.send(Ok(ScannerMessage::Data(0..=1)))?;
539-
// the next range "overfills" the channel, causing a lag
540-
range_tx.send(Ok(ScannerMessage::Data(2..=3)))?;
541-
542-
assert!(matches!(receiver.recv().await.unwrap(), Err(ScannerError::Lagged(1))));
543-
544-
Ok(())
545-
}
546-
547-
#[tokio::test]
548-
async fn spawn_log_consumers_in_collection_mode_streams_lagged_error() -> anyhow::Result<()> {
549-
let provider = RootProvider::<Ethereum>::new(RpcClient::mocked(Asserter::new()));
550-
let provider = RobustProviderBuilder::fragile(provider).build().await?;
551-
552-
let (range_tx, _) = tokio::sync::broadcast::channel::<BlockScannerResult>(1);
553-
let (sender, mut receiver) = mpsc::channel(1);
554-
let listeners = &[EventListener { filter: EventFilter::new(), sender }];
555-
let count = 5;
556-
let max_concurrent_fetches = 1;
557-
558-
let _set = spawn_log_consumers_in_collection_mode(
559-
&provider,
560-
listeners,
561-
&range_tx,
562-
count,
563-
max_concurrent_fetches,
564-
);
565-
566-
range_tx.send(Ok(ScannerMessage::Data(2..=3)))?;
567-
// the next range "overfills" the channel, causing a lag
568-
range_tx.send(Ok(ScannerMessage::Data(0..=1)))?;
569-
570-
assert!(matches!(receiver.recv().await.unwrap(), Err(ScannerError::Lagged(1))));
571-
572-
Ok(())
573-
}
574508
}

0 commit comments

Comments
 (0)