Skip to content

Commit e81b0c2

Browse files
committed
add retry logic
set nonce correctly
1 parent 1916d8e commit e81b0c2

File tree

2 files changed

+108
-21
lines changed

2 files changed

+108
-21
lines changed

crates/batcher/src/lib.rs

Lines changed: 60 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ use ethers::signers::Signer;
99
use retry::batcher_retryables::{
1010
cancel_create_new_task_retryable, create_new_task_retryable, get_user_balance_retryable,
1111
get_user_nonce_from_ethereum_retryable, simulate_create_new_task_retryable,
12-
user_balance_is_unlocked_retryable,
12+
user_balance_is_unlocked_retryable, get_current_block_number_retryable,
13+
query_balance_unlocked_events_retryable,
1314
};
1415
use retry::{retry_function, RetryError};
1516
use tokio::time::{timeout, Instant};
@@ -528,17 +529,11 @@ impl Batcher {
528529
let block_range = (self.balance_unlock_polling_interval_seconds / 12) * 2;
529530
let from_block = current_block.saturating_sub(U64::from(block_range));
530531

531-
// Create filter for BalanceUnlocked events
532-
let filter = self.payment_service
533-
.balance_unlocked_filter()
534-
.from_block(from_block)
535-
.to_block(current_block);
536-
537-
// Query events
538-
let events = match filter.query().await {
532+
// Query events with retry logic
533+
let events = match self.query_balance_unlocked_events(from_block, current_block).await {
539534
Ok(events) => events,
540535
Err(e) => {
541-
warn!("Failed to query BalanceUnlocked events: {:?}", e);
536+
warn!("Failed to query BalanceUnlocked events after retries: {:?}", e);
542537
return Ok(());
543538
}
544539
};
@@ -553,8 +548,15 @@ impl Batcher {
553548

554549
// Check if user has proofs in queue
555550
if self.user_has_proofs_in_queue(user_address).await {
556-
info!("User {:?} has proofs in queue, removing them and resetting UserState", user_address);
557-
self.remove_user_proofs_and_reset_state(user_address).await;
551+
info!("User {:?} has proofs in queue, verifying funds are still unlocked", user_address);
552+
553+
// Double-check that funds are still unlocked by calling the contract
554+
if self.user_balance_is_unlocked(&user_address).await {
555+
info!("User {:?} funds confirmed unlocked, removing proofs and resetting UserState", user_address);
556+
self.remove_user_proofs_and_reset_state(user_address).await;
557+
} else {
558+
info!("User {:?} funds are now locked, ignoring stale unlock event", user_address);
559+
}
558560
} else {
559561
info!("User {:?} has no proofs in queue, ignoring event", user_address);
560562
}
@@ -563,16 +565,52 @@ impl Batcher {
563565
Ok(())
564566
}
565567

568+
/// Gets the current block number from Ethereum.
569+
/// Retries on recoverable errors using exponential backoff up to `ETHEREUM_CALL_MAX_RETRIES` times:
570+
/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs).
566571
async fn get_current_block_number(&self) -> Result<U64, BatcherError> {
567-
// Try primary provider first
568-
match self.eth_http_provider.get_block_number().await {
569-
Ok(block) => Ok(block),
570-
Err(_) => {
571-
// Fallback to secondary provider
572-
self.eth_http_provider_fallback.get_block_number().await
573-
.map_err(|e| BatcherError::EthereumProviderError(e.to_string()))
574-
}
575-
}
572+
retry_function(
573+
|| {
574+
get_current_block_number_retryable(
575+
&self.eth_http_provider,
576+
&self.eth_http_provider_fallback,
577+
)
578+
},
579+
ETHEREUM_CALL_MIN_RETRY_DELAY,
580+
ETHEREUM_CALL_BACKOFF_FACTOR,
581+
ETHEREUM_CALL_MAX_RETRIES,
582+
ETHEREUM_CALL_MAX_RETRY_DELAY,
583+
)
584+
.await
585+
.map_err(|e| {
586+
error!("Failed to get current block number: {:?}", e);
587+
BatcherError::EthereumProviderError(e.inner())
588+
})
589+
}
590+
591+
/// Queries BalanceUnlocked events from the BatcherPaymentService contract.
592+
/// Retries on recoverable errors using exponential backoff up to `ETHEREUM_CALL_MAX_RETRIES` times:
593+
/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs).
594+
async fn query_balance_unlocked_events(&self, from_block: U64, to_block: U64) -> Result<Vec<aligned_sdk::eth::batcher_payment_service::BalanceUnlockedFilter>, BatcherError> {
595+
retry_function(
596+
|| {
597+
query_balance_unlocked_events_retryable(
598+
&self.payment_service,
599+
&self.payment_service_fallback,
600+
from_block,
601+
to_block,
602+
)
603+
},
604+
ETHEREUM_CALL_MIN_RETRY_DELAY,
605+
ETHEREUM_CALL_BACKOFF_FACTOR,
606+
ETHEREUM_CALL_MAX_RETRIES,
607+
ETHEREUM_CALL_MAX_RETRY_DELAY,
608+
)
609+
.await
610+
.map_err(|e| {
611+
error!("Failed to query BalanceUnlocked events: {:?}", e);
612+
BatcherError::EthereumProviderError(e.inner())
613+
})
576614
}
577615

578616
async fn user_has_proofs_in_queue(&self, user_address: Address) -> bool {
@@ -632,6 +670,7 @@ impl Batcher {
632670
// Reset UserState using timeout
633671
if let Some(user_state) = user_states.get(&user_address) {
634672
if let Some(mut user_state_guard) = self.try_user_lock_with_timeout(user_address, user_state.lock()).await {
673+
user_state_guard.nonce -= U256::from(user_state_guard.proofs_in_batch);
635674
user_state_guard.proofs_in_batch = 0;
636675
user_state_guard.total_fees_in_queue = U256::zero();
637676
user_state_guard.last_max_fee_limit = U256::max_value();

crates/batcher/src/retry/batcher_retryables.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::time::Duration;
22

33
use ethers::prelude::*;
4+
use ethers::providers::Http;
45
use log::{info, warn};
56
use tokio::time::timeout;
67

@@ -285,3 +286,50 @@ pub async fn cancel_create_new_task_retryable(
285286
"Receipt not found".to_string(),
286287
)))
287288
}
289+
290+
pub async fn get_current_block_number_retryable(
291+
eth_http_provider: &Provider<Http>,
292+
eth_http_provider_fallback: &Provider<Http>,
293+
) -> Result<U64, RetryError<String>> {
294+
if let Ok(block_number) = eth_http_provider.get_block_number().await {
295+
return Ok(block_number);
296+
}
297+
298+
eth_http_provider_fallback
299+
.get_block_number()
300+
.await
301+
.map_err(|e| {
302+
warn!("Failed to get current block number: {e}");
303+
RetryError::Transient(e.to_string())
304+
})
305+
}
306+
307+
pub async fn query_balance_unlocked_events_retryable(
308+
payment_service: &BatcherPaymentService,
309+
payment_service_fallback: &BatcherPaymentService,
310+
from_block: U64,
311+
to_block: U64,
312+
) -> Result<Vec<aligned_sdk::eth::batcher_payment_service::BalanceUnlockedFilter>, RetryError<String>>
313+
{
314+
let filter = payment_service
315+
.balance_unlocked_filter()
316+
.from_block(from_block)
317+
.to_block(to_block);
318+
319+
if let Ok(events) = filter.query().await {
320+
return Ok(events);
321+
}
322+
323+
let filter_fallback = payment_service_fallback
324+
.balance_unlocked_filter()
325+
.from_block(from_block)
326+
.to_block(to_block);
327+
328+
filter_fallback
329+
.query()
330+
.await
331+
.map_err(|e| {
332+
warn!("Failed to query BalanceUnlocked events: {e}");
333+
RetryError::Transient(e.to_string())
334+
})
335+
}

0 commit comments

Comments
 (0)