Skip to content

Commit 938903b

Browse files
committed
fix: address comments
1 parent e773606 commit 938903b

File tree

1 file changed

+31
-84
lines changed

1 file changed

+31
-84
lines changed

crates/batcher/src/lib.rs

Lines changed: 31 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -520,33 +520,19 @@ impl Batcher {
520520

521521
async fn process_balance_unlocked_events(&self) -> Result<(), BatcherError> {
522522
// Get current block number using HTTP providers
523-
let current_block = match self.get_current_block_number().await {
524-
Ok(block) => block,
525-
Err(e) => {
526-
warn!("Failed to get current block number: {:?}", e);
527-
return Ok(());
528-
}
529-
};
523+
let current_block = self.get_current_block_number().await
524+
.map_err(|e| BatcherError::EthereumProviderError(format!("Failed to get current block number: {:?}", e)))?;
530525

531526
// Calculate the block range based on polling interval
532527
// Formula: interval / 12 * 2 (assuming 12-second block times, look back 2x the interval)
533528
let block_range = (self.balance_unlock_polling_interval_seconds / 12) * 2;
534529
let from_block = current_block.saturating_sub(U64::from(block_range));
535530

536531
// Query events with retry logic
537-
let events = match self
532+
let events = self
538533
.query_balance_unlocked_events(from_block, current_block)
539534
.await
540-
{
541-
Ok(events) => events,
542-
Err(e) => {
543-
warn!(
544-
"Failed to query BalanceUnlocked events after retries: {:?}",
545-
e
546-
);
547-
return Ok(());
548-
}
549-
};
535+
.map_err(|e| BatcherError::EthereumProviderError(format!("Failed to query BalanceUnlocked events: {:?}", e)))?;
550536

551537
info!(
552538
"Found {} BalanceUnlocked events in blocks {} to {}",
@@ -558,33 +544,24 @@ impl Batcher {
558544
// Process each event
559545
for event in events {
560546
let user_address = event.user;
561-
info!(
547+
debug!(
562548
"Processing BalanceUnlocked event for user: {:?}",
563549
user_address
564550
);
565551

566552
// Check if user has proofs in queue
567-
if self.user_has_proofs_in_queue(user_address).await {
568-
info!(
569-
"User {:?} has proofs in queue, verifying funds are still unlocked",
570-
user_address
571-
);
572-
573-
// Double-check that funds are still unlocked by calling the contract
574-
if self.user_balance_is_unlocked(&user_address).await {
575-
info!("User {:?} funds confirmed unlocked, removing proofs and resetting UserState", user_address);
576-
self.remove_user_proofs_and_reset_state(user_address).await;
577-
} else {
578-
info!(
579-
"User {:?} funds are now locked, ignoring stale unlock event",
580-
user_address
581-
);
582-
}
583-
} else {
553+
// Double-check that funds are still unlocked by calling the contract
554+
// This is necessary because we query events over a block range, and the
555+
// user’s state may have changed (e.g., funds could be locked again) after
556+
// the event was emitted. Verifying on-chain ensures we don’t act on stale data.
557+
if self.user_has_proofs_in_queue(user_address).await
558+
&& self.user_balance_is_unlocked(&user_address).await
559+
{
584560
info!(
585-
"User {:?} has no proofs in queue, ignoring event",
561+
"User {:?} has proofs in queue and funds are unlocked, proceeding to remove proofs and resetting UserState",
586562
user_address
587563
);
564+
self.remove_user_proofs_and_reset_state(user_address).await;
588565
}
589566
}
590567

@@ -594,7 +571,7 @@ impl Batcher {
594571
/// Gets the current block number from Ethereum.
595572
/// Retries on recoverable errors using exponential backoff up to `ETHEREUM_CALL_MAX_RETRIES` times:
596573
/// (0,5 secs - 1 secs - 2 secs - 4 secs - 8 secs).
597-
async fn get_current_block_number(&self) -> Result<U64, BatcherError> {
574+
async fn get_current_block_number(&self) -> Result<U64, RetryError<String>> {
598575
retry_function(
599576
|| {
600577
get_current_block_number_retryable(
@@ -608,10 +585,6 @@ impl Batcher {
608585
ETHEREUM_CALL_MAX_RETRY_DELAY,
609586
)
610587
.await
611-
.map_err(|e| {
612-
error!("Failed to get current block number: {:?}", e);
613-
BatcherError::EthereumProviderError(e.inner())
614-
})
615588
}
616589

617590
/// Queries BalanceUnlocked events from the BatcherPaymentService contract.
@@ -621,8 +594,10 @@ impl Batcher {
621594
&self,
622595
from_block: U64,
623596
to_block: U64,
624-
) -> Result<Vec<aligned_sdk::eth::batcher_payment_service::BalanceUnlockedFilter>, BatcherError>
625-
{
597+
) -> Result<
598+
Vec<aligned_sdk::eth::batcher_payment_service::BalanceUnlockedFilter>,
599+
RetryError<String>,
600+
> {
626601
retry_function(
627602
|| {
628603
query_balance_unlocked_events_retryable(
@@ -638,10 +613,6 @@ impl Batcher {
638613
ETHEREUM_CALL_MAX_RETRY_DELAY,
639614
)
640615
.await
641-
.map_err(|e| {
642-
error!("Failed to query BalanceUnlocked events: {:?}", e);
643-
BatcherError::EthereumProviderError(e.inner())
644-
})
645616
}
646617

647618
async fn user_has_proofs_in_queue(&self, user_address: Address) -> bool {
@@ -661,11 +632,8 @@ impl Batcher {
661632
}
662633

663634
async fn remove_user_proofs_and_reset_state(&self, user_address: Address) {
664-
// Follow locking rules: acquire user_states before batch_state to avoid deadlocks
665-
let user_states = self.user_states.write().await;
666-
667635
// Use timeout for batch lock
668-
let batch_state_guard = match self
636+
let mut batch_state_guard = match self
669637
.try_batch_lock_with_timeout(self.batch_state.lock())
670638
.await
671639
{
@@ -679,23 +647,18 @@ impl Batcher {
679647
}
680648
};
681649

682-
let mut batch_state_guard = batch_state_guard;
683-
684-
// Process all entries for this user
685-
while let Some(entry) = batch_state_guard
650+
let removed_entries = batch_state_guard
686651
.batch_queue
687-
.iter()
688-
.find(|(entry, _)| entry.sender == user_address)
689-
.map(|(entry, _)| entry.clone())
690-
{
691-
// Notify user via websocket before removing the proof
652+
.extract_if(|entry, _| entry.sender == user_address);
653+
654+
// Notify user via websocket before removing the proofs
655+
for (entry, _) in removed_entries {
692656
if let Some(ws_sink) = entry.messaging_sink.as_ref() {
693657
send_message(
694658
ws_sink.clone(),
695659
SubmitProofResponseMessage::UserFundsUnlocked,
696660
)
697661
.await;
698-
699662
// Close websocket connection
700663
let mut sink_guard = ws_sink.write().await;
701664
if let Err(e) = sink_guard.close().await {
@@ -707,34 +670,18 @@ impl Batcher {
707670
info!("Closed websocket connection for user {:?}", user_address);
708671
}
709672
}
710-
711-
// Remove the entry from batch queue
712-
batch_state_guard.batch_queue.remove(&entry);
713673
info!(
714674
"Removed proof with nonce {} for user {:?} from batch queue",
715675
entry.nonced_verification_data.nonce, user_address
716676
);
717677
}
718678

719-
// Reset UserState using timeout
720-
if let Some(user_state) = user_states.get(&user_address) {
721-
if let Some(mut user_state_guard) = self
722-
.try_user_lock_with_timeout(user_address, user_state.lock())
723-
.await
724-
{
725-
let proofs_count = user_state_guard.proofs_in_batch;
726-
user_state_guard.nonce -= U256::from(proofs_count);
727-
user_state_guard.proofs_in_batch = 0;
728-
user_state_guard.total_fees_in_queue = U256::zero();
729-
user_state_guard.last_max_fee_limit = U256::max_value();
730-
info!("Reset UserState for user {:?}", user_address);
731-
} else {
732-
warn!(
733-
"Failed to acquire user lock for {:?}, skipping UserState reset",
734-
user_address
735-
);
736-
}
737-
}
679+
// Remove UserState entry
680+
self.user_states.write().await.remove(&user_address);
681+
info!(
682+
"Removed UserState entry for user {:?} after processing BalanceUnlocked event",
683+
user_address
684+
);
738685
}
739686

740687
pub async fn listen_new_blocks_retryable(

0 commit comments

Comments
 (0)