Skip to content

Commit dc94aa9

Browse files
committed
ref: move tests to robust sub
1 parent 0dddabd commit dc94aa9

File tree

2 files changed

+290
-275
lines changed

2 files changed

+290
-275
lines changed

src/robust_provider/provider.rs

Lines changed: 4 additions & 275 deletions
Original file line numberDiff line numberDiff line change
@@ -318,15 +318,13 @@ impl<N: Network> RobustProvider<N> {
318318
mod tests {
319319
use super::*;
320320
use crate::robust_provider::{
321-
RobustProviderBuilder,
322-
builder::DEFAULT_SUBSCRIPTION_TIMEOUT,
323-
subscription::{DEFAULT_RECONNECT_INTERVAL, RobustSubscriptionStream},
321+
RobustProviderBuilder, builder::DEFAULT_SUBSCRIPTION_TIMEOUT,
322+
subscription::DEFAULT_RECONNECT_INTERVAL,
324323
};
325-
use alloy::providers::{ProviderBuilder, WsConnect, ext::AnvilApi};
326-
use alloy_node_bindings::{Anvil, AnvilInstance};
324+
use alloy::providers::{ProviderBuilder, WsConnect};
325+
use alloy_node_bindings::Anvil;
327326
use std::sync::atomic::{AtomicUsize, Ordering};
328327
use tokio::time::sleep;
329-
use tokio_stream::StreamExt;
330328

331329
fn test_provider(timeout: u64, max_retries: usize, min_delay: u64) -> RobustProvider {
332330
RobustProvider {
@@ -340,103 +338,6 @@ mod tests {
340338
}
341339
}
342340

343-
const SHORT_TIMEOUT: Duration = Duration::from_millis(300);
344-
const RECONNECT_INTERVAL: Duration = Duration::from_millis(500);
345-
const BUFFER_TIME: Duration = Duration::from_millis(100);
346-
347-
async fn spawn_ws_anvil() -> anyhow::Result<(AnvilInstance, RootProvider)> {
348-
let anvil = Anvil::new().try_spawn()?;
349-
let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?;
350-
Ok((anvil, provider.root().to_owned()))
351-
}
352-
353-
fn assert_backend_gone_or_timeout(err: Error) {
354-
match err {
355-
Error::Timeout => {}
356-
Error::RpcError(e) => {
357-
assert!(
358-
matches!(e.as_ref(), RpcError::Transport(TransportErrorKind::BackendGone)),
359-
"Expected BackendGone error, got: {e:?}",
360-
);
361-
}
362-
Error::BlockNotFound(_) => {
363-
panic!("Unexpected BlockNotFound error");
364-
}
365-
}
366-
}
367-
368-
#[macro_export]
369-
macro_rules! assert_stream_finished {
370-
($stream: expr) => {
371-
$crate::assert_stream_finished!($stream, finish_secs = 3)
372-
};
373-
($stream: expr, finish_secs = $finish: expr) => {{
374-
let next_item = tokio_stream::StreamExt::next(&mut $stream).await;
375-
match next_item {
376-
Some(Ok(item)) => panic!("Expected no item during quiet window, got: {:?}", item),
377-
None => {}
378-
Some(Err(e)) => {
379-
assert!(matches!(e, Error::Timeout), "Expected Timeout error, got: {:?}", e);
380-
381-
let second = tokio::time::timeout(
382-
std::time::Duration::from_secs($finish),
383-
tokio_stream::StreamExt::next(&mut $stream),
384-
)
385-
.await
386-
.expect("expected stream to finish after quiet window");
387-
assert!(second.is_none(), "Expected stream to be finished, got: {:?}", second);
388-
}
389-
}
390-
}};
391-
}
392-
393-
#[macro_export]
394-
macro_rules! assert_next_block {
395-
($stream: expr, $expected: expr) => {
396-
assert_next_block!($stream, $expected, timeout = 5)
397-
};
398-
($stream: expr, $expected: expr, timeout = $secs: expr) => {
399-
let message = tokio::time::timeout(
400-
std::time::Duration::from_secs($secs),
401-
tokio_stream::StreamExt::next(&mut $stream),
402-
)
403-
.await
404-
.expect("timed out");
405-
if let Some(block) = message {
406-
match block {
407-
Ok(block) => assert_eq!(block.number, $expected),
408-
Err(e) => panic!("Got err {e:?}"),
409-
}
410-
} else {
411-
panic!("Expected block {:?}, got: {message:?}", $expected)
412-
}
413-
};
414-
}
415-
416-
/// Waits for current provider to timeout, then mines on `next_provider` to trigger failover.
417-
async fn trigger_failover_with_delay(
418-
stream: &mut RobustSubscriptionStream<alloy::network::Ethereum>,
419-
next_provider: RootProvider,
420-
expected_block: u64,
421-
extra_delay: Duration,
422-
) -> anyhow::Result<()> {
423-
let task = tokio::spawn(async move {
424-
sleep(SHORT_TIMEOUT + extra_delay + BUFFER_TIME).await;
425-
next_provider.anvil_mine(Some(1), None).await.unwrap();
426-
});
427-
assert_next_block!(*stream, expected_block);
428-
task.await?;
429-
Ok(())
430-
}
431-
432-
async fn trigger_failover(
433-
stream: &mut RobustSubscriptionStream<alloy::network::Ethereum>,
434-
next_provider: RootProvider,
435-
expected_block: u64,
436-
) -> anyhow::Result<()> {
437-
trigger_failover_with_delay(stream, next_provider, expected_block, Duration::ZERO).await
438-
}
439-
440341
#[tokio::test]
441342
async fn test_retry_with_timeout_succeeds_on_first_attempt() {
442343
let provider = test_provider(100, 3, 10);
@@ -567,176 +468,4 @@ mod tests {
567468

568469
Ok(())
569470
}
570-
571-
#[tokio::test]
572-
async fn ws_fails_http_fallback_returns_primary_error() -> anyhow::Result<()> {
573-
// Setup: Create WS primary and HTTP fallback
574-
let anvil_1 = Anvil::new().try_spawn()?;
575-
let ws_provider =
576-
ProviderBuilder::new().connect(anvil_1.ws_endpoint_url().as_str()).await?;
577-
578-
let anvil_2 = Anvil::new().try_spawn()?;
579-
let http_provider = ProviderBuilder::new().connect_http(anvil_2.endpoint_url());
580-
581-
let robust = RobustProviderBuilder::fragile(ws_provider.clone())
582-
.fallback(http_provider.clone())
583-
.subscription_timeout(Duration::from_secs(1))
584-
.build()
585-
.await?;
586-
587-
// Test: Verify subscription works on primary
588-
let subscription = robust.subscribe_blocks().await?;
589-
let mut stream = subscription.into_stream();
590-
591-
ws_provider.anvil_mine(Some(1), None).await?;
592-
assert_next_block!(stream, 1);
593-
594-
ws_provider.anvil_mine(Some(1), None).await?;
595-
assert_next_block!(stream, 2);
596-
597-
// Verify: HTTP fallback can't provide subscription, so we get an error
598-
let task = tokio::spawn(async move {
599-
sleep(SHORT_TIMEOUT + BUFFER_TIME).await;
600-
http_provider.anvil_mine(Some(1), None).await.unwrap();
601-
});
602-
let err = stream.next().await.unwrap().unwrap_err();
603-
task.await?;
604-
assert_backend_gone_or_timeout(err);
605-
606-
let next = stream.next().await;
607-
assert!(next.is_none(), "Expected stream to be finished, got: {next:?}");
608-
609-
Ok(())
610-
}
611-
612-
#[tokio::test]
613-
async fn robust_subscription_stream_with_failover() -> anyhow::Result<()> {
614-
let (_anvil_1, primary) = spawn_ws_anvil().await?;
615-
let (_anvil_2, fallback) = spawn_ws_anvil().await?;
616-
617-
let robust = RobustProviderBuilder::fragile(primary.clone())
618-
.fallback(fallback.clone())
619-
.subscription_timeout(SHORT_TIMEOUT)
620-
.build()
621-
.await?;
622-
623-
let subscription = robust.subscribe_blocks().await?;
624-
let mut stream = subscription.into_stream();
625-
626-
// Test: Primary works initially
627-
primary.anvil_mine(Some(1), None).await?;
628-
assert_next_block!(stream, 1);
629-
630-
primary.anvil_mine(Some(1), None).await?;
631-
assert_next_block!(stream, 2);
632-
633-
// After timeout, should failover to fallback provider
634-
trigger_failover(&mut stream, fallback.clone(), 1).await?;
635-
636-
fallback.anvil_mine(Some(1), None).await?;
637-
assert_next_block!(stream, 2);
638-
639-
Ok(())
640-
}
641-
642-
#[tokio::test]
643-
async fn subscription_reconnects_to_primary() -> anyhow::Result<()> {
644-
let (_anvil_1, primary) = spawn_ws_anvil().await?;
645-
let (_anvil_2, fallback) = spawn_ws_anvil().await?;
646-
647-
let robust = RobustProviderBuilder::fragile(primary.clone())
648-
.fallback(fallback.clone())
649-
.subscription_timeout(SHORT_TIMEOUT)
650-
.reconnect_interval(RECONNECT_INTERVAL)
651-
.build()
652-
.await?;
653-
654-
let subscription = robust.subscribe_blocks().await?;
655-
let mut stream = subscription.into_stream();
656-
657-
// Start on primary
658-
primary.anvil_mine(Some(1), None).await?;
659-
assert_next_block!(stream, 1);
660-
661-
// PP times out -> FP1
662-
trigger_failover(&mut stream, fallback.clone(), 1).await?;
663-
664-
fallback.anvil_mine(Some(1), None).await?;
665-
assert_next_block!(stream, 2);
666-
667-
// FP1 times out -> PP (reconnect succeeds)
668-
trigger_failover(&mut stream, primary.clone(), 2).await?;
669-
670-
// PP times out -> FP1 (fallback index was reset)
671-
trigger_failover(&mut stream, fallback.clone(), 3).await?;
672-
673-
fallback.anvil_mine(Some(1), None).await?;
674-
assert_next_block!(stream, 4);
675-
676-
Ok(())
677-
}
678-
679-
#[tokio::test]
680-
async fn subscription_cycles_through_multiple_fallbacks() -> anyhow::Result<()> {
681-
let (anvil_pp, primary) = spawn_ws_anvil().await?;
682-
let (_anvil_1, fb_1) = spawn_ws_anvil().await?;
683-
let (_anvil_2, fb_2) = spawn_ws_anvil().await?;
684-
685-
let robust = RobustProviderBuilder::fragile(primary.clone())
686-
.fallback(fb_1.clone())
687-
.fallback(fb_2.clone())
688-
.subscription_timeout(SHORT_TIMEOUT)
689-
.call_timeout(SHORT_TIMEOUT)
690-
.build()
691-
.await?;
692-
693-
let subscription = robust.subscribe_blocks().await?;
694-
let mut stream = subscription.into_stream();
695-
696-
// Start on primary
697-
primary.anvil_mine(Some(1), None).await?;
698-
assert_next_block!(stream, 1);
699-
700-
// Kill primary - all future PP reconnection attempts will fail
701-
drop(anvil_pp);
702-
703-
// PP times out -> FP1
704-
trigger_failover(&mut stream, fb_1.clone(), 1).await?;
705-
706-
// FP1 times out -> tries PP (fails, takes call_timeout) -> FP2
707-
trigger_failover_with_delay(&mut stream, fb_2.clone(), 1, SHORT_TIMEOUT).await?;
708-
709-
fb_2.anvil_mine(Some(1), None).await?;
710-
assert_next_block!(stream, 2);
711-
712-
// FP2 times out -> tries PP (fails) -> no more fallbacks -> error
713-
sleep(SHORT_TIMEOUT * 2 + BUFFER_TIME).await;
714-
let err = stream.next().await.unwrap().unwrap_err();
715-
assert_backend_gone_or_timeout(err);
716-
717-
Ok(())
718-
}
719-
720-
#[tokio::test]
721-
async fn subscription_fails_with_no_fallbacks() -> anyhow::Result<()> {
722-
let (_anvil, provider) = spawn_ws_anvil().await?;
723-
724-
let robust = RobustProviderBuilder::fragile(provider.clone())
725-
.subscription_timeout(SHORT_TIMEOUT)
726-
.build()
727-
.await?;
728-
729-
let subscription = robust.subscribe_blocks().await?;
730-
let mut stream = subscription.into_stream();
731-
732-
provider.anvil_mine(Some(1), None).await?;
733-
assert_next_block!(stream, 1);
734-
735-
// No fallback available - should error after timeout
736-
sleep(SHORT_TIMEOUT + BUFFER_TIME).await;
737-
let err = stream.next().await.unwrap().unwrap_err();
738-
assert_backend_gone_or_timeout(err);
739-
740-
Ok(())
741-
}
742471
}

0 commit comments

Comments
 (0)