Skip to content

Commit ac8dc95

Browse files
committed
Add timeouts to lock in handle_messages
1 parent 7f2d1ce commit ac8dc95

File tree

7 files changed

+97
-4
lines changed

7 files changed

+97
-4
lines changed

crates/batcher/src/lib.rs

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,36 @@ impl Batcher {
374374
updated_user_states
375375
}
376376

377+
/// Helper to apply 15-second timeout to user lock acquisition with consistent logging and metrics
378+
async fn try_user_lock_with_timeout<F, T>(&self, addr: Address, lock_future: F) -> Option<T>
379+
where
380+
F: std::future::Future<Output = T>,
381+
{
382+
match timeout(Duration::from_secs(15), lock_future).await {
383+
Ok(result) => Some(result),
384+
Err(_) => {
385+
warn!("User lock acquisition timed out for address {}", addr);
386+
self.metrics.inc_message_handler_user_lock_timeout();
387+
None
388+
}
389+
}
390+
}
391+
392+
/// Helper to apply 15-second timeout to batch lock acquisition with consistent logging and metrics
393+
async fn try_batch_lock_with_timeout<F, T>(&self, lock_future: F) -> Option<T>
394+
where
395+
F: std::future::Future<Output = T>,
396+
{
397+
match timeout(Duration::from_secs(15), lock_future).await {
398+
Ok(result) => Some(result),
399+
Err(_) => {
400+
warn!("Batch lock acquisition timed out");
401+
self.metrics.inc_message_handler_batch_lock_timeout();
402+
None
403+
}
404+
}
405+
}
406+
377407
pub async fn listen_connections(self: Arc<Self>, address: &str) -> Result<(), BatcherError> {
378408
// Create the event loop and TCP listener we'll accept connections on.
379409
let listener = TcpListener::bind(address)
@@ -659,7 +689,14 @@ impl Batcher {
659689
let user_state_ref = self.user_states.get(&address);
660690
match user_state_ref {
661691
Some(user_state_ref) => {
662-
let user_state_guard = user_state_ref.lock().await;
692+
let Some(user_state_guard) = self
693+
.try_user_lock_with_timeout(address, user_state_ref.lock())
694+
.await
695+
else {
696+
send_message(ws_conn_sink.clone(), GetNonceResponseMessage::ServerBusy)
697+
.await;
698+
return Ok(());
699+
};
663700
Some(user_state_guard.nonce)
664701
}
665702
None => None,
@@ -795,7 +832,13 @@ impl Batcher {
795832
};
796833

797834
// We acquire the lock on the user state, now everything will be processed sequentially
798-
let mut user_state_guard = user_state_ref.lock().await;
835+
let Some(mut user_state_guard) = self
836+
.try_user_lock_with_timeout(addr, user_state_ref.lock())
837+
.await
838+
else {
839+
send_message(ws_conn_sink.clone(), SubmitProofResponseMessage::ServerBusy).await;
840+
return Ok(());
841+
};
799842

800843
// If the user state was not present, we need to get the nonce from the Ethereum contract and update the dummy user state
801844
if !is_user_in_state {
@@ -907,7 +950,13 @@ impl Batcher {
907950
// * Perform validation over batcher queue *
908951
// * ---------------------------------------------------------------------*
909952

910-
let mut batch_state_lock = self.batch_state.lock().await;
953+
let Some(mut batch_state_lock) = self
954+
.try_batch_lock_with_timeout(self.batch_state.lock())
955+
.await
956+
else {
957+
send_message(ws_conn_sink.clone(), SubmitProofResponseMessage::ServerBusy).await;
958+
return Ok(());
959+
};
911960
if batch_state_lock.is_queue_full() {
912961
debug!("Batch queue is full. Evaluating if the incoming proof can replace a lower-priority entry.");
913962

@@ -1070,7 +1119,14 @@ impl Batcher {
10701119
) {
10711120
let replacement_max_fee = nonced_verification_data.max_fee;
10721121
let nonce = nonced_verification_data.nonce;
1073-
let mut batch_state_guard = self.batch_state.lock().await; // Second: batch lock
1122+
let Some(mut batch_state_guard) = self
1123+
.try_batch_lock_with_timeout(self.batch_state.lock())
1124+
.await
1125+
else {
1126+
drop(user_state_guard);
1127+
send_message(ws_conn_sink.clone(), SubmitProofResponseMessage::ServerBusy).await;
1128+
return;
1129+
};
10741130
let Some(entry) = batch_state_guard.get_entry(addr, nonce) else {
10751131
drop(batch_state_guard);
10761132
drop(user_state_guard);

crates/batcher/src/metrics.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ pub struct BatcherMetrics {
2727
pub cancel_create_new_task_duration: IntGauge,
2828
pub batcher_gas_cost_create_task_total: GenericCounter<AtomicF64>,
2929
pub batcher_gas_cost_cancel_task_total: GenericCounter<AtomicF64>,
30+
pub message_handler_user_lock_timeouts: IntCounter,
31+
pub message_handler_batch_lock_timeouts: IntCounter,
3032
}
3133

3234
impl BatcherMetrics {
@@ -80,6 +82,16 @@ impl BatcherMetrics {
8082
"Batcher Gas Cost Cancel Task Total"
8183
))?;
8284

85+
let message_handler_user_lock_timeouts = register_int_counter!(opts!(
86+
"message_handler_user_lock_timeouts_count",
87+
"Message Handler User Lock Timeouts"
88+
))?;
89+
90+
let message_handler_batch_lock_timeouts = register_int_counter!(opts!(
91+
"message_handler_batch_lock_timeouts_count",
92+
"Message Handler Batch Lock Timeouts"
93+
))?;
94+
8395
registry.register(Box::new(open_connections.clone()))?;
8496
registry.register(Box::new(received_proofs.clone()))?;
8597
registry.register(Box::new(sent_batches.clone()))?;
@@ -96,6 +108,8 @@ impl BatcherMetrics {
96108
registry.register(Box::new(cancel_create_new_task_duration.clone()))?;
97109
registry.register(Box::new(batcher_gas_cost_create_task_total.clone()))?;
98110
registry.register(Box::new(batcher_gas_cost_cancel_task_total.clone()))?;
111+
registry.register(Box::new(message_handler_user_lock_timeouts.clone()))?;
112+
registry.register(Box::new(message_handler_batch_lock_timeouts.clone()))?;
99113

100114
let metrics_route = warp::path!("metrics")
101115
.and(warp::any().map(move || registry.clone()))
@@ -124,6 +138,8 @@ impl BatcherMetrics {
124138
cancel_create_new_task_duration,
125139
batcher_gas_cost_create_task_total,
126140
batcher_gas_cost_cancel_task_total,
141+
message_handler_user_lock_timeouts,
142+
message_handler_batch_lock_timeouts,
127143
})
128144
}
129145

@@ -158,4 +174,12 @@ impl BatcherMetrics {
158174
self.queue_len.set(queue_len);
159175
self.queue_size_bytes.set(queue_size);
160176
}
177+
178+
pub fn inc_message_handler_user_lock_timeout(&self) {
179+
self.message_handler_user_lock_timeouts.inc();
180+
}
181+
182+
pub fn inc_message_handler_batch_lock_timeout(&self) {
183+
self.message_handler_batch_lock_timeouts.inc();
184+
}
161185
}

crates/cli/src/main.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,9 @@ async fn main() -> Result<(), AlignedError> {
567567
aligned_sdk::common::errors::GetNonceError::UnexpectedResponse(e) => {
568568
SubmitError::UnexpectedBatcherResponse(e)
569569
}
570+
aligned_sdk::common::errors::GetNonceError::GenericError(e) => {
571+
SubmitError::GenericError(e)
572+
}
570573
})?,
571574
};
572575

crates/sdk/src/common/errors.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@ pub enum GetNonceError {
251251
UnexpectedResponse(String),
252252
InvalidRequest(String),
253253
ProtocolMismatch { current: u16, expected: u16 },
254+
GenericError(String),
254255
}
255256

256257
#[derive(Debug)]

crates/sdk/src/common/types.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,13 +401,15 @@ pub enum SubmitProofResponseMessage {
401401
EthRpcError,
402402
InvalidPaymentServiceAddress(Address, Address),
403403
UnderpricedProof,
404+
ServerBusy,
404405
}
405406

406407
#[derive(Debug, Clone, Serialize, Deserialize)]
407408
pub enum GetNonceResponseMessage {
408409
Nonce(U256),
409410
EthRpcError(String),
410411
InvalidRequest(String),
412+
ServerBusy,
411413
}
412414

413415
#[derive(Debug, Clone)]

crates/sdk/src/communication/messaging.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,10 @@ async fn handle_batcher_response(msg: Message) -> Result<BatchInclusionData, Sub
269269
error!("Batcher responded with error: queue limit has been exceeded. Funds have not been spent.");
270270
Err(SubmitError::BatchQueueLimitExceededError)
271271
}
272+
Ok(SubmitProofResponseMessage::ServerBusy) => {
273+
error!("Server is busy processing requests, please retry. Funds have not been spent.");
274+
Err(SubmitError::GenericError("Server is busy processing requests, please retry".to_string()))
275+
}
272276
Err(e) => {
273277
error!(
274278
"Error while deserializing batch inclusion data: {}. Funds have not been spent.",

crates/sdk/src/verification_layer/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -584,6 +584,9 @@ pub async fn get_nonce_from_batcher(
584584
Ok(GetNonceResponseMessage::Nonce(nonce)) => Ok(nonce),
585585
Ok(GetNonceResponseMessage::EthRpcError(e)) => Err(GetNonceError::EthRpcError(e)),
586586
Ok(GetNonceResponseMessage::InvalidRequest(e)) => Err(GetNonceError::InvalidRequest(e)),
587+
Ok(GetNonceResponseMessage::ServerBusy) => Err(GetNonceError::GenericError(
588+
"Server is busy processing requests, please retry".to_string(),
589+
)),
587590
Err(_) => Err(GetNonceError::SerializationError(
588591
"Failed to deserialize batcher message".to_string(),
589592
)),

0 commit comments

Comments
 (0)