Skip to content

Commit cf24945

Browse files
committed
ref: clean up loop logic + internal is fallback fn
1 parent 0134bf3 commit cf24945

File tree

1 file changed

+28
-26
lines changed

1 file changed

+28
-26
lines changed

src/robust_provider/subscription.rs

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -62,36 +62,33 @@ impl<N: Network> RobustSubscription<N> {
6262
/// Returns an error if all providers have been exhausted and failed.
6363
pub async fn recv(&mut self) -> Result<N::HeaderResponse, Error> {
6464
let subscription_timeout = self.robust_provider.subscription_timeout;
65-
loop {
66-
if let Some(subscription) = &mut self.subscription {
67-
let recv_result = timeout(subscription_timeout, subscription.recv()).await;
68-
match recv_result {
69-
Ok(recv_result) => match recv_result {
70-
Ok(header) => {
71-
if self.current_fallback_index.is_some() {
72-
self.try_reconnect_to_primary(false).await;
73-
}
74-
self.consecutive_lags = 0;
75-
return Ok(header);
65+
while let Some(subscription) = &mut self.subscription {
66+
let recv_result = timeout(subscription_timeout, subscription.recv()).await;
67+
match recv_result {
68+
Ok(recv_result) => match recv_result {
69+
Ok(header) => {
70+
if self.is_on_fallback() {
71+
self.try_reconnect_to_primary(false).await;
7672
}
77-
Err(recv_error) => {
78-
self.process_recv_error(recv_error).await?;
79-
}
80-
},
81-
Err(elapsed_err) => {
82-
error!(
83-
timeout_secs = subscription_timeout.as_secs(),
84-
"Subscription timeout - no block received, switching provider"
85-
);
86-
87-
self.switch_to_fallback(elapsed_err.into()).await?;
73+
self.consecutive_lags = 0;
74+
return Ok(header);
75+
}
76+
Err(recv_error) => {
77+
self.process_recv_error(recv_error).await?;
8878
}
79+
},
80+
Err(elapsed_err) => {
81+
error!(
82+
timeout_secs = subscription_timeout.as_secs(),
83+
"Subscription timeout - no block received, switching provider"
84+
);
85+
86+
self.switch_to_fallback(elapsed_err.into()).await?;
8987
}
90-
} else {
91-
// No subscription available
92-
return Err(RpcError::Transport(TransportErrorKind::BackendGone).into());
9388
}
9489
}
90+
// No subscription available
91+
Err(RpcError::Transport(TransportErrorKind::BackendGone).into())
9592
}
9693

9794
/// Process subscription receive errors and handle failover
@@ -163,7 +160,7 @@ impl<N: Network> RobustSubscription<N> {
163160

164161
async fn switch_to_fallback(&mut self, last_error: Error) -> Result<(), Error> {
165162
// If we're on a fallback, try primary first before moving to next fallback
166-
if self.current_fallback_index.is_some() && self.try_reconnect_to_primary(true).await {
163+
if self.is_on_fallback() && self.try_reconnect_to_primary(true).await {
167164
return Ok(());
168165
}
169166

@@ -195,6 +192,11 @@ impl<N: Network> RobustSubscription<N> {
195192
}
196193
}
197194

195+
/// Returns true if currently using a fallback provider
196+
fn is_on_fallback(&self) -> bool {
197+
self.current_fallback_index.is_some()
198+
}
199+
198200
/// Check if the subscription channel is empty (no pending messages)
199201
#[must_use]
200202
pub fn is_empty(&self) -> bool {

0 commit comments

Comments
 (0)