Skip to content

Commit 6ec1dc6

Browse files
fix(batcher): stop listening to blocks when one of the rpcs disconnects (#1961)
1 parent ef6a851 commit 6ec1dc6

File tree

2 files changed

+74
-33
lines changed

2 files changed

+74
-33
lines changed

batcher/aligned-batcher/src/lib.rs

Lines changed: 73 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ use aws_sdk_s3::client::Client as S3Client;
4141
use eth::payment_service::{BatcherPaymentService, CreateNewTaskFeeParams, SignerMiddlewareT};
4242
use ethers::prelude::{Middleware, Provider};
4343
use ethers::types::{Address, Signature, TransactionReceipt, U256};
44-
use futures_util::{future, SinkExt, StreamExt, TryStreamExt};
44+
use futures_util::{future, join, SinkExt, StreamExt, TryStreamExt};
4545
use lambdaworks_crypto::merkle_tree::merkle::MerkleTree;
4646
use lambdaworks_crypto::merkle_tree::traits::IsMerkleTreeBackend;
4747
use log::{debug, error, info, warn};
@@ -322,39 +322,80 @@ impl Batcher {
322322
pub async fn listen_new_blocks_retryable(
323323
self: Arc<Self>,
324324
) -> Result<(), RetryError<BatcherError>> {
325-
let eth_ws_provider = Provider::connect(&self.eth_ws_url).await.map_err(|e| {
326-
warn!("Failed to instantiate Ethereum websocket provider");
327-
RetryError::Transient(BatcherError::EthereumSubscriptionError(e.to_string()))
328-
})?;
325+
// Try to connect at least to one of the nodes (main or fallback)
326+
let eth_ws_provider = Provider::connect(&self.eth_ws_url).await.ok();
327+
let eth_ws_provider_fallback = Provider::connect(&self.eth_ws_url_fallback).await.ok();
328+
if eth_ws_provider.is_none() {
329+
warn!("Failed to instantiate Ethereum main websocket provider");
330+
}
331+
if eth_ws_provider_fallback.is_none() {
332+
warn!("Failed to instantiate fallback Ethereum websocket provider");
333+
}
334+
if eth_ws_provider.is_none() && eth_ws_provider_fallback.is_none() {
335+
return Err(RetryError::Transient(
336+
BatcherError::EthereumSubscriptionError(
337+
"Both Ethereum websocket providers failed to connect".to_string(),
338+
),
339+
));
340+
}
329341

330-
let eth_ws_provider_fallback =
331-
Provider::connect(&self.eth_ws_url_fallback)
332-
.await
333-
.map_err(|e| {
334-
warn!("Failed to instantiate fallback Ethereum websocket provider");
335-
RetryError::Transient(BatcherError::EthereumSubscriptionError(e.to_string()))
336-
})?;
337-
338-
let mut stream = eth_ws_provider.subscribe_blocks().await.map_err(|e| {
339-
warn!("Error subscribing to blocks.");
340-
RetryError::Transient(BatcherError::EthereumSubscriptionError(e.to_string()))
341-
})?;
342-
343-
let mut stream_fallback =
344-
eth_ws_provider_fallback
345-
.subscribe_blocks()
346-
.await
347-
.map_err(|e| {
348-
warn!("Error subscribing to blocks.");
349-
RetryError::Transient(BatcherError::EthereumSubscriptionError(e.to_string()))
350-
})?;
342+
// Try to connect to one stream (main or fallback)
343+
let mut stream = match &eth_ws_provider {
344+
Some(provider) => match provider.subscribe_blocks().await {
345+
Ok(s) => Some(s),
346+
Err(e) => {
347+
warn!("Error subscribing to blocks on primary provider: {:?}", e);
348+
None
349+
}
350+
},
351+
None => None,
352+
};
353+
let mut stream_fallback = match &eth_ws_provider_fallback {
354+
Some(provider) => match provider.subscribe_blocks().await {
355+
Ok(s) => Some(s),
356+
Err(e) => {
357+
warn!("Error subscribing to blocks on fallback provider: {:?}", e);
358+
None
359+
}
360+
},
361+
None => None,
362+
};
363+
if stream.is_none() && stream_fallback.is_none() {
364+
return Err(RetryError::Transient(
365+
BatcherError::EthereumSubscriptionError(
366+
"Both Ethereum block subscriptions failed".to_string(),
367+
),
368+
));
369+
}
351370

352371
let last_seen_block = Mutex::<u64>::new(0);
353372

354-
while let Some(block) = tokio::select! {
355-
block = stream.next() => block,
356-
block = stream_fallback.next() => block,
357-
} {
373+
loop {
374+
// Wait for both responses
375+
let (block_main, block_fallback) = join!(
376+
async {
377+
match stream.as_mut() {
378+
Some(s) => s.next().await,
379+
None => None,
380+
}
381+
},
382+
async {
383+
match stream_fallback.as_mut() {
384+
Some(s) => s.next().await,
385+
None => None,
386+
}
387+
}
388+
);
389+
390+
let block = if let Some(block) = block_main {
391+
block
392+
} else if let Some(block) = block_fallback {
393+
block
394+
} else {
395+
// Both rpc failed to respond, break and try to reconnect
396+
break;
397+
};
398+
358399
let batcher = self.clone();
359400
let block_number = block.number.unwrap_or_default();
360401
let block_number = u64::try_from(block_number).unwrap_or_default();
@@ -371,10 +412,10 @@ impl Batcher {
371412
tokio::spawn(async move {
372413
if let Err(e) = batcher.handle_new_block(block_number).await {
373414
error!("Error when handling new block: {:?}", e);
374-
};
415+
}
375416
});
376417
}
377-
error!("Failed to fetch blocks");
418+
error!("Both main and fallback Ethereum WS clients subscriptions have disconnected, will try to reconnect...");
378419

379420
Err(RetryError::Transient(
380421
BatcherError::EthereumSubscriptionError("Could not get new blocks".to_string()),

batcher/aligned-sdk/src/common/constants.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ pub const DEFAULT_MAX_FEE_BATCH_SIZE: usize = 10;
4141
pub const ETHEREUM_CALL_MIN_RETRY_DELAY: u64 = 500; // milliseconds
4242
pub const ETHEREUM_CALL_MAX_RETRIES: usize = 5;
4343
pub const ETHEREUM_CALL_BACKOFF_FACTOR: f32 = 2.0;
44-
pub const ETHEREUM_CALL_MAX_RETRY_DELAY: u64 = 3600; // seconds
44+
pub const ETHEREUM_CALL_MAX_RETRY_DELAY: u64 = 60; // seconds
4545

4646
/// Ethereum transaction retry constants
4747
pub const BUMP_MIN_RETRY_DELAY: u64 = 500; // milliseconds

0 commit comments

Comments
 (0)