Skip to content

Commit 4378c9e

Browse files
committed
ref: simplify test and add better timeout logic
1 parent d50aa75 commit 4378c9e

File tree

2 files changed

+29
-39
lines changed

2 files changed

+29
-39
lines changed

src/robust_provider/provider.rs

Lines changed: 20 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -482,9 +482,8 @@ mod tests {
482482
ws_provider.anvil_mine(Some(1), None).await?;
483483
assert_eq!(2, subscription.recv().await?.number());
484484

485-
drop(anvil_1);
486-
487-
sleep(Duration::from_millis(100)).await;
485+
// simulate ws stream gone via no blocks mined > sub timeout
486+
sleep(Duration::from_millis(600)).await;
488487

489488
http_provider.anvil_mine(Some(1), None).await?;
490489

@@ -532,12 +531,8 @@ mod tests {
532531
let block = stream.next().await.unwrap()?;
533532
assert_eq!(2, block.number());
534533

535-
// Drop the primary provider to trigger failover
536-
drop(anvil_1);
537-
538-
// Wait for subscription timeout to occur and switch to fallback
539-
// The subscription will timeout after 500ms of inactivity, then switch to fallback
540-
sleep(Duration::from_millis(800)).await;
534+
// simulate ws stream gone via no blocks mined > sub timeout
535+
sleep(Duration::from_millis(600)).await;
541536

542537
// Now mine blocks on fallback - the subscription should be connected to fallback now
543538
ws_provider_2.anvil_mine(Some(1), None).await?;
@@ -555,8 +550,6 @@ mod tests {
555550
#[tokio::test]
556551
async fn test_subscription_reconnects_to_primary() -> anyhow::Result<()> {
557552
let anvil_1 = Anvil::new().try_spawn()?;
558-
let anvil_1_port = anvil_1.port();
559-
560553
let ws_provider_1 =
561554
ProviderBuilder::new().connect(anvil_1.ws_endpoint_url().as_str()).await?;
562555

@@ -566,45 +559,36 @@ mod tests {
566559

567560
let robust = RobustProviderBuilder::fragile(ws_provider_1.clone())
568561
.fallback(ws_provider_2.clone())
569-
.subscription_timeout(Duration::from_secs(5))
562+
.subscription_timeout(Duration::from_millis(500))
570563
.reconnect_interval(Duration::from_secs(2))
571564
.build()
572565
.await?;
573566

574567
let subscription = robust.subscribe_blocks().await?;
575-
576568
let mut stream = subscription.into_stream();
577569

570+
// Verify primary works
578571
ws_provider_1.anvil_mine(Some(1), None).await?;
579572
let block = stream.next().await.unwrap()?;
580573
assert_eq!(1, block.number());
581574

582-
drop(anvil_1);
583-
584-
// Wait for subscription to detect failure and switch to fallback
585-
// (subscription_timeout is 5 seconds)
586-
sleep(Duration::from_millis(5500)).await;
575+
sleep(Duration::from_millis(600)).await;
587576

577+
// Verify fallback works
588578
ws_provider_2.anvil_mine(Some(1), None).await?;
589579
let block = stream.next().await.unwrap()?;
590580
assert_eq!(1, block.number());
591581

592-
// Spawn new anvil on the same port as primary (simulating primary coming back)
593-
let anvil_3 = Anvil::new().port(anvil_1_port).try_spawn()?;
594-
595-
let ws_provider_1 =
596-
ProviderBuilder::new().connect(anvil_3.ws_endpoint_url().as_str()).await?;
597-
598-
// Wait for reconnect interval to elapse (2 seconds) plus buffer
599-
sleep(Duration::from_millis(2200)).await;
582+
for _ in 0..30 {
583+
ws_provider_2.anvil_mine(Some(10), None).await?;
584+
let _ = stream.next().await.unwrap()?;
585+
sleep(Duration::from_millis(100)).await;
586+
// Mine on primary - should reconnect and receive from primary
587+
ws_provider_1.anvil_mine(Some(1), None).await?;
588+
}
600589

601-
ws_provider_1.anvil_mine(Some(1), None).await?;
602590
let block = stream.next().await.unwrap()?;
603-
assert_eq!(1, block.number());
604-
605-
ws_provider_1.anvil_mine(Some(1), None).await?;
606-
let block = stream.next().await.unwrap()?;
607-
assert_eq!(2, block.number());
591+
assert_eq!(31 + 1, block.number());
608592

609593
Ok(())
610594
}
@@ -638,16 +622,14 @@ mod tests {
638622
let block = stream.next().await.unwrap()?;
639623
assert_eq!(1, block.number());
640624

641-
drop(anvil_1);
642-
625+
// simulate ws stream gone via no blocks mined > sub timeout
643626
sleep(Duration::from_millis(600)).await;
644627

645628
ws_provider_2.anvil_mine(Some(1), None).await?;
646629
let block = stream.next().await.unwrap()?;
647630
assert_eq!(1, block.number());
648631

649-
drop(anvil_2);
650-
632+
// simulate ws stream gone via no blocks mined > sub timeout
651633
sleep(Duration::from_millis(600)).await;
652634

653635
ws_provider_3.anvil_mine(Some(1), None).await?;
@@ -669,12 +651,11 @@ mod tests {
669651

670652
let mut subscription = robust.subscribe_blocks().await?;
671653

672-
// Receive initial block successfully
654+
// simulate ws stream gone via no blocks mined > sub timeout
673655
ws_provider.anvil_mine(Some(1), None).await?;
674656
let _block = subscription.recv().await?;
675657

676-
drop(anvil);
677-
658+
// simulate ws stream gone via no blocks mined > sub timeout
678659
sleep(Duration::from_millis(600)).await;
679660

680661
let err = subscription.recv().await.unwrap_err();

src/robust_provider/subscription.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,14 @@ impl<N: Network> RobustSubscription<N> {
8888
"Subscription timeout - no block received, switching provider"
8989
);
9090

91+
// If we're on a fallback, try reconnecting to primary one more time
92+
// before switching to the next fallback
93+
if self.current_fallback_index.is_some() &&
94+
self.try_reconnect_to_primary().await
95+
{
96+
continue;
97+
}
98+
9199
self.switch_to_fallback(elapsed_err.into()).await?;
92100
}
93101
}
@@ -156,6 +164,7 @@ impl<N: Network> RobustSubscription<N> {
156164
true
157165
}
158166
Err(e) => {
167+
println!("what");
159168
warn!(error = %e, "Failed to reconnect to primary provider");
160169
false
161170
}

0 commit comments

Comments
 (0)