Skip to content

Commit c1eb925

Browse files
committed
ref: remove lag handling instead send to reciever lag error
1 parent c3bdf54 commit c1eb925

File tree

4 files changed

+17
-131
lines changed

4 files changed

+17
-131
lines changed

src/error.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ pub enum ScannerError {
3434
#[error("Subscription closed")]
3535
SubscriptionClosed,
3636

37-
#[error("Subscriptions Lagged too often")]
38-
SubscriptionLagged,
37+
#[error("Subscription Lagged by {0}")]
38+
SubscriptionLagged(u64),
3939
}
4040

4141
impl From<RobustProviderError> for ScannerError {
@@ -45,7 +45,7 @@ impl From<RobustProviderError> for ScannerError {
4545
RobustProviderError::RpcError(err) => ScannerError::RpcError(err),
4646
RobustProviderError::BlockNotFound(block) => ScannerError::BlockNotFound(block),
4747
RobustProviderError::Closed => ScannerError::SubscriptionClosed,
48-
RobustProviderError::SubscriptionLagged => ScannerError::SubscriptionLagged,
48+
RobustProviderError::Lagged(count) => ScannerError::SubscriptionLagged(count),
4949
}
5050
}
5151
}

src/robust_provider/error.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use crate::robust_provider::subscription::MAX_LAG_COUNT;
21
use std::sync::Arc;
32

43
use alloy::{
@@ -18,8 +17,8 @@ pub enum Error {
1817
BlockNotFound(BlockId),
1918
#[error("Subscription closed")]
2019
Closed,
21-
#[error("Subscription lag exceeded maximum consecutive lag count: {MAX_LAG_COUNT:?}")]
22-
SubscriptionLagged,
20+
#[error("Subscription lagged behind by: {0}")]
21+
Lagged(u64),
2322
}
2423

2524
impl From<RpcError<TransportErrorKind>> for Error {

src/robust_provider/provider.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ use tracing::{error, info};
1414

1515
use crate::robust_provider::{Error, RobustSubscription};
1616

17+
pub const MAX_CHANNEL_SIZE: usize = 128;
18+
1719
/// Provider wrapper with built-in retry and timeout mechanisms.
1820
///
1921
/// This wrapper around Alloy providers automatically handles retries,
@@ -207,7 +209,9 @@ impl<N: Network> RobustProvider<N> {
207209
info!("eth_subscribe called");
208210
let subscription = self
209211
.try_operation_with_failover(
210-
move |provider| async move { provider.subscribe_blocks().await },
212+
move |provider| async move {
213+
provider.subscribe_blocks().channel_size(MAX_CHANNEL_SIZE).await
214+
},
211215
true,
212216
)
213217
.await;

src/robust_provider/subscription.rs

Lines changed: 7 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,13 @@ use crate::robust_provider::{Error, RobustProvider};
1919
/// Default time interval between primary provider reconnection attempts
2020
pub const DEFAULT_RECONNECT_INTERVAL: Duration = Duration::from_secs(30);
2121

22-
/// Maximum number of consecutive lags before switching providers
23-
pub const MAX_LAG_COUNT: usize = 3;
24-
2522
/// A robust subscription wrapper that automatically handles provider failover
2623
/// and periodic reconnection attempts to the primary provider.
2724
#[derive(Debug)]
2825
pub struct RobustSubscription<N: Network> {
2926
subscription: Subscription<N::HeaderResponse>,
3027
robust_provider: RobustProvider<N>,
3128
last_reconnect_attempt: Option<Instant>,
32-
consecutive_lags: usize,
3329
current_fallback_index: Option<usize>,
3430
}
3531

@@ -43,7 +39,6 @@ impl<N: Network> RobustSubscription<N> {
4339
subscription,
4440
robust_provider,
4541
last_reconnect_attempt: None,
46-
consecutive_lags: 0,
4742
current_fallback_index: None,
4843
}
4944
}
@@ -63,8 +58,8 @@ impl<N: Network> RobustSubscription<N> {
6358
/// fallback provider and successfully receiving blocks. Note: The actual reconnection
6459
/// attempt occurs when a new block is received, so if blocks arrive slower than the
6560
/// reconnect interval, reconnection will be delayed until the next block.
66-
/// 2. **Forced reconnection**: Immediately when a fallback provider fails, before attempting
67-
/// the next fallback provider
61+
/// 2. **Fallback failure**: Immediately when a fallback provider fails, before attempting the
62+
/// next fallback provider
6863
///
6964
/// # Lag Handling
7065
///
@@ -87,11 +82,10 @@ impl<N: Network> RobustSubscription<N> {
8782
if self.is_on_fallback() {
8883
self.try_reconnect_to_primary(false).await;
8984
}
90-
self.consecutive_lags = 0;
9185
return Ok(header);
9286
}
9387
Err(recv_error) => {
94-
self.process_recv_error(recv_error).await?;
88+
Self::process_recv_error(&recv_error)?;
9589
}
9690
},
9791
Err(elapsed_err) => {
@@ -107,27 +101,14 @@ impl<N: Network> RobustSubscription<N> {
107101
}
108102

109103
/// Process subscription receive errors and handle failover
110-
async fn process_recv_error(&mut self, recv_error: RecvError) -> Result<(), Error> {
104+
fn process_recv_error(recv_error: &RecvError) -> Result<(), Error> {
111105
match recv_error {
112106
RecvError::Closed => {
113107
error!("Provider closed the subscription channel");
114-
return Err(Error::Closed);
115-
}
116-
RecvError::Lagged(skipped) => {
117-
self.consecutive_lags += 1;
118-
warn!(
119-
skipped = skipped,
120-
consecutive_lags = self.consecutive_lags,
121-
"Subscription lagged"
122-
);
123-
124-
if self.consecutive_lags >= MAX_LAG_COUNT {
125-
warn!("Too many consecutive lags, switching provider");
126-
self.switch_to_fallback(Error::SubscriptionLagged).await?;
127-
}
108+
Err(Error::Closed)
128109
}
110+
RecvError::Lagged(count) => Err(Error::Lagged(*count)),
129111
}
130-
Ok(())
131112
}
132113

133114
/// Try to reconnect to the primary provider if enough time has elapsed.
@@ -313,7 +294,7 @@ mod tests {
313294
Error::Closed => {
314295
panic!("Unexpected Closed error");
315296
}
316-
Error::SubscriptionLagged => {
297+
Error::Lagged(_) => {
317298
panic!("Unexpected SubscriptionLagged error");
318299
}
319300
}
@@ -519,104 +500,6 @@ mod tests {
519500
Ok(())
520501
}
521502

522-
// ----------------------------------------------------------------------------
523-
// Lag Handling Tests
524-
// ----------------------------------------------------------------------------
525-
526-
#[tokio::test]
527-
async fn test_lag_count_increments_and_resets() -> anyhow::Result<()> {
528-
// This test verifies the lag counter logic by directly manipulating the internal state
529-
// In a real scenario, RecvError::Lagged would be triggered by the broadcast channel
530-
// when the receiver can't keep up with the sender
531-
532-
let (_anvil, provider) = spawn_ws_anvil().await?;
533-
534-
let robust = RobustProviderBuilder::fragile(provider.clone())
535-
.subscription_timeout(SHORT_TIMEOUT)
536-
.build()
537-
.await?;
538-
539-
let mut subscription = robust.subscribe_blocks().await?;
540-
541-
// Verify initial state
542-
assert_eq!(subscription.consecutive_lags, 0);
543-
544-
// Simulate lag by directly calling process_recv_error
545-
// In production, this would be called by recv() when the channel lags
546-
subscription.process_recv_error(RecvError::Lagged(5)).await?;
547-
assert_eq!(subscription.consecutive_lags, 1, "Lag count should increment to 1");
548-
549-
// Another lag
550-
subscription.process_recv_error(RecvError::Lagged(3)).await?;
551-
assert_eq!(subscription.consecutive_lags, 2, "Lag count should increment to 2");
552-
553-
// Now receive a successful block - this should reset the counter
554-
provider.anvil_mine(Some(1), None).await?;
555-
let block = subscription.recv().await?;
556-
assert_eq!(block.number, 1);
557-
assert_eq!(
558-
subscription.consecutive_lags, 0,
559-
"Lag count should reset to 0 after successful recv"
560-
);
561-
562-
// Verify it stays at 0 for subsequent successful receives
563-
provider.anvil_mine(Some(1), None).await?;
564-
let block = subscription.recv().await?;
565-
assert_eq!(block.number, 2);
566-
assert_eq!(subscription.consecutive_lags, 0, "Lag count should remain 0");
567-
568-
Ok(())
569-
}
570-
571-
#[tokio::test]
572-
async fn test_lag_count_triggers_failover_at_max() -> anyhow::Result<()> {
573-
// Test that MAX_LAG_COUNT consecutive lags trigger a provider switch
574-
let (_anvil_1, primary) = spawn_ws_anvil().await?;
575-
let (_anvil_2, fallback) = spawn_ws_anvil().await?;
576-
577-
let robust = RobustProviderBuilder::fragile(primary.clone())
578-
.fallback(fallback.clone())
579-
.subscription_timeout(SHORT_TIMEOUT)
580-
.build()
581-
.await?;
582-
583-
let mut subscription = robust.subscribe_blocks().await?;
584-
585-
// Verify initial state
586-
assert_eq!(subscription.consecutive_lags, 0);
587-
assert_eq!(subscription.current_fallback_index, None, "Should start on primary");
588-
589-
// Simulate MAX_LAG_COUNT - 1 lags (should NOT trigger failover)
590-
for i in 1..MAX_LAG_COUNT {
591-
subscription.process_recv_error(RecvError::Lagged(10)).await?;
592-
assert_eq!(subscription.consecutive_lags, i, "Lag count should be {i}");
593-
assert_eq!(subscription.current_fallback_index, None, "Should still be on primary");
594-
}
595-
596-
// One more lag should trigger failover
597-
subscription.process_recv_error(RecvError::Lagged(10)).await?;
598-
assert_eq!(
599-
subscription.consecutive_lags, MAX_LAG_COUNT,
600-
"Lag count should be MAX_LAG_COUNT"
601-
);
602-
assert_eq!(
603-
subscription.current_fallback_index,
604-
Some(0),
605-
"Should have failed over to fallback[0]"
606-
);
607-
608-
// Verify fallback works
609-
fallback.anvil_mine(Some(1), None).await?;
610-
let block = subscription.recv().await?;
611-
assert_eq!(block.number, 1);
612-
assert_eq!(
613-
subscription.consecutive_lags, 0,
614-
"Lag count should reset after successful recv on fallback"
615-
);
616-
617-
Ok(())
618-
}
619-
620503
// ----------------------------------------------------------------------------
621504
// Basic Failover Tests
622505
// ----------------------------------------------------------------------------

0 commit comments

Comments
 (0)