Skip to content

Commit 3ea6a60

Browse files
committed
fix: acquire user_states lock before batch lock
refactor: add duration variable to try_batch_lock_with_timeout
1 parent 764537b commit 3ea6a60

File tree

1 file changed

+17
-11
lines changed

1 file changed

+17
-11
lines changed

crates/batcher/src/lib.rs

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ use tokio::sync::{Mutex, MutexGuard, RwLock};
5151

5252
// Message handler lock timeout
5353
const MESSAGE_HANDLER_LOCK_TIMEOUT: Duration = Duration::from_secs(10);
54+
const POLLING_EVENTS_LOCK_TIMEOUT: Duration = Duration::from_secs(300);
5455
use tokio_tungstenite::tungstenite::{Error, Message};
5556
use types::batch_queue::{self, BatchQueueEntry, BatchQueueEntryPriority};
5657
use types::errors::{BatcherError, TransactionSendError};
@@ -445,12 +446,16 @@ impl Batcher {
445446
}
446447
}
447448

448-
/// Helper to apply 15-second timeout to batch lock acquisition with consistent logging and metrics
449-
async fn try_batch_lock_with_timeout<F, T>(&self, lock_future: F) -> Option<T>
449+
/// Helper to apply `duration` timeout to batch lock acquisition with consistent logging and metrics
450+
async fn try_batch_lock_with_timeout<F, T>(
451+
&self,
452+
lock_future: F,
453+
duration: Duration,
454+
) -> Option<T>
450455
where
451456
F: std::future::Future<Output = T>,
452457
{
453-
match timeout(MESSAGE_HANDLER_LOCK_TIMEOUT, lock_future).await {
458+
match timeout(duration, lock_future).await {
454459
Ok(result) => Some(result),
455460
Err(_) => {
456461
warn!("Batch lock acquisition timed out");
@@ -647,17 +652,19 @@ impl Batcher {
647652
}
648653

649654
async fn remove_user_proofs_and_reset_state(&self, user_address: Address) {
650-
// Use timeout for batch lock
655+
let mut user_states = self.user_states.write().await;
656+
651657
let mut batch_state_guard = match self
652-
.try_batch_lock_with_timeout(self.batch_state.lock())
658+
.try_batch_lock_with_timeout(self.batch_state.lock(), POLLING_EVENTS_LOCK_TIMEOUT)
653659
.await
654660
{
655661
Some(guard) => guard,
656662
None => {
657-
warn!(
658-
"Failed to acquire batch lock for user {:?}, skipping removal",
663+
error!(
664+
"Failed to acquire batch lock when trying to remove proofs from user {:?}, skipping removal",
659665
user_address
660666
);
667+
// TODO metrics for batch lock timeout during event processing
661668
return;
662669
}
663670
};
@@ -691,8 +698,7 @@ impl Batcher {
691698
);
692699
}
693700

694-
// Remove UserState entry
695-
self.user_states.write().await.remove(&user_address);
701+
user_states.remove(&user_address);
696702
info!(
697703
"Removed UserState entry for user {:?} after processing BalanceUnlocked event",
698704
user_address
@@ -1260,7 +1266,7 @@ impl Batcher {
12601266
// * ---------------------------------------------------------------------*
12611267

12621268
let Some(mut batch_state_lock) = self
1263-
.try_batch_lock_with_timeout(self.batch_state.lock())
1269+
.try_batch_lock_with_timeout(self.batch_state.lock(), MESSAGE_HANDLER_LOCK_TIMEOUT)
12641270
.await
12651271
else {
12661272
send_message(ws_conn_sink.clone(), SubmitProofResponseMessage::ServerBusy).await;
@@ -1430,7 +1436,7 @@ impl Batcher {
14301436
let replacement_max_fee = nonced_verification_data.max_fee;
14311437
let nonce = nonced_verification_data.nonce;
14321438
let Some(mut batch_state_guard) = self
1433-
.try_batch_lock_with_timeout(self.batch_state.lock())
1439+
.try_batch_lock_with_timeout(self.batch_state.lock(), MESSAGE_HANDLER_LOCK_TIMEOUT)
14341440
.await
14351441
else {
14361442
drop(user_state_guard);

0 commit comments

Comments
 (0)