Skip to content

Commit 0134bf3

Browse files
committed
feat: better reconnect logic + test
1 parent 25451e0 commit 0134bf3

File tree

1 file changed

+58
-2
lines changed

1 file changed

+58
-2
lines changed

src/robust_provider/subscription.rs

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,14 @@ impl<N: Network> RobustSubscription<N> {
6363
pub async fn recv(&mut self) -> Result<N::HeaderResponse, Error> {
6464
let subscription_timeout = self.robust_provider.subscription_timeout;
6565
loop {
66-
self.try_reconnect_to_primary(false).await;
67-
6866
if let Some(subscription) = &mut self.subscription {
6967
let recv_result = timeout(subscription_timeout, subscription.recv()).await;
7068
match recv_result {
7169
Ok(recv_result) => match recv_result {
7270
Ok(header) => {
71+
if self.current_fallback_index.is_some() {
72+
self.try_reconnect_to_primary(false).await;
73+
}
7374
self.consecutive_lags = 0;
7475
return Ok(header);
7576
}
@@ -550,4 +551,59 @@ mod tests {
550551

551552
Ok(())
552553
}
554+
555+
#[tokio::test]
556+
async fn subscription_periodically_reconnects_to_primary_while_on_fallback()
557+
-> anyhow::Result<()> {
558+
// Use a longer reconnect interval to make timing more predictable
559+
let reconnect_interval = Duration::from_millis(800);
560+
561+
let (_anvil_1, primary) = spawn_ws_anvil().await?;
562+
let (_anvil_2, fallback) = spawn_ws_anvil().await?;
563+
564+
let robust = RobustProviderBuilder::fragile(primary.clone())
565+
.fallback(fallback.clone())
566+
.subscription_timeout(SHORT_TIMEOUT)
567+
.reconnect_interval(reconnect_interval)
568+
.build()
569+
.await?;
570+
571+
let subscription = robust.subscribe_blocks().await?;
572+
let mut stream = subscription.into_stream();
573+
574+
// Start on primary
575+
primary.anvil_mine(Some(1), None).await?;
576+
assert_next_block!(stream, 1);
577+
578+
// PP times out -> FP (this sets last_reconnect_attempt)
579+
trigger_failover(&mut stream, fallback.clone(), 1).await?;
580+
let failover_time = Instant::now();
581+
582+
// Now on fallback - mine blocks before reconnect_interval elapses
583+
// These should stay on fallback (no reconnect attempt)
584+
fallback.anvil_mine(Some(1), None).await?;
585+
assert_next_block!(stream, 2);
586+
587+
fallback.anvil_mine(Some(1), None).await?;
588+
assert_next_block!(stream, 3);
589+
590+
// Ensure reconnect_interval has fully elapsed since failover
591+
let elapsed = failover_time.elapsed();
592+
if elapsed < reconnect_interval + BUFFER_TIME {
593+
sleep(reconnect_interval + BUFFER_TIME - elapsed).await;
594+
}
595+
596+
// Mine on fallback - receiving this block triggers try_reconnect_to_primary
597+
fallback.anvil_mine(Some(1), None).await?;
598+
assert_next_block!(stream, 4);
599+
600+
// Now we should be back on primary
601+
primary.anvil_mine(Some(1), None).await?;
602+
assert_next_block!(stream, 2);
603+
604+
primary.anvil_mine(Some(1), None).await?;
605+
assert_next_block!(stream, 3);
606+
607+
Ok(())
608+
}
553609
}

0 commit comments

Comments
 (0)