Skip to content

Commit 3c52213

Browse files
committed
Add timeouts
1 parent b947b82 commit 3c52213

File tree

2 files changed

+46
-5
lines changed

2 files changed

+46
-5
lines changed

crates/batcher/src/lib.rs

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ use lambdaworks_crypto::merkle_tree::traits::IsMerkleTreeBackend;
4747
use log::{debug, error, info, warn};
4848
use tokio::net::{TcpListener, TcpStream};
4949
use tokio::sync::{Mutex, MutexGuard, RwLock};
50+
51+
// Message handler lock timeout
52+
const MESSAGE_HANDLER_LOCK_TIMEOUT: Duration = Duration::from_secs(10);
5053
use tokio_tungstenite::tungstenite::{Error, Message};
5154
use types::batch_queue::{self, BatchQueueEntry, BatchQueueEntryPriority};
5255
use types::errors::{BatcherError, TransactionSendError};
@@ -417,7 +420,7 @@ impl Batcher {
417420
where
418421
F: std::future::Future<Output = T>,
419422
{
420-
match timeout(Duration::from_secs(15), lock_future).await {
423+
match timeout(MESSAGE_HANDLER_LOCK_TIMEOUT, lock_future).await {
421424
Ok(result) => Some(result),
422425
Err(_) => {
423426
warn!("User lock acquisition timed out for address {}", addr);
@@ -432,7 +435,7 @@ impl Batcher {
432435
where
433436
F: std::future::Future<Output = T>,
434437
{
435-
match timeout(Duration::from_secs(15), lock_future).await {
438+
match timeout(MESSAGE_HANDLER_LOCK_TIMEOUT, lock_future).await {
436439
Ok(result) => Some(result),
437440
Err(_) => {
438441
warn!("Batch lock acquisition timed out");
@@ -724,7 +727,16 @@ impl Batcher {
724727
}
725728

726729
let cached_user_nonce = {
727-
let user_state_ref = self.user_states.read().await.get(&address).cloned();
730+
let user_states_guard = match timeout(MESSAGE_HANDLER_LOCK_TIMEOUT, self.user_states.read()).await {
731+
Ok(guard) => guard,
732+
Err(_) => {
733+
warn!("User states read lock acquisition timed out");
734+
self.metrics.inc_message_handler_user_states_lock_timeouts();
735+
send_message(ws_conn_sink, GetNonceResponseMessage::ServerBusy).await;
736+
return Ok(());
737+
}
738+
};
739+
let user_state_ref = user_states_guard.get(&address).cloned();
728740
match user_state_ref {
729741
Some(user_state_ref) => {
730742
let Some(user_state_guard) = self
@@ -849,7 +861,15 @@ impl Batcher {
849861
// If it was not present, then the user nonce is queried to the Aligned contract.
850862
// Lastly, we get a lock of the batch state again and insert the user state if it was still missing.
851863

852-
let is_user_in_state = self.user_states.read().await.contains_key(&addr);
864+
let is_user_in_state = match timeout(MESSAGE_HANDLER_LOCK_TIMEOUT, self.user_states.read()).await {
865+
Ok(user_states_guard) => user_states_guard.contains_key(&addr),
866+
Err(_) => {
867+
warn!("User states read lock acquisition timed out");
868+
self.metrics.inc_message_handler_user_states_lock_timeouts();
869+
send_message(ws_conn_sink, SubmitProofResponseMessage::ServerBusy).await;
870+
return Ok(());
871+
}
872+
};
853873

854874
if !is_user_in_state {
855875
// If the user state was not present, we need to get the nonce from the Ethereum contract
@@ -878,7 +898,16 @@ impl Batcher {
878898
debug!("Dummy user state for address {addr:?} created");
879899
}
880900

881-
let Some(user_state_ref) = self.user_states.read().await.get(&addr).cloned() else {
901+
let user_state_ref = match timeout(MESSAGE_HANDLER_LOCK_TIMEOUT, self.user_states.read()).await {
902+
Ok(user_states_guard) => user_states_guard.get(&addr).cloned(),
903+
Err(_) => {
904+
warn!("User states read lock acquisition timed out");
905+
self.metrics.inc_message_handler_user_states_lock_timeouts();
906+
send_message(ws_conn_sink, SubmitProofResponseMessage::ServerBusy).await;
907+
return Ok(());
908+
}
909+
};
910+
let Some(user_state_ref) = user_state_ref else {
882911
error!("This should never happen, user state has previously been inserted if it didn't exist");
883912
send_message(
884913
ws_conn_sink.clone(),

crates/batcher/src/metrics.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ pub struct BatcherMetrics {
2929
pub batcher_gas_cost_cancel_task_total: GenericCounter<AtomicF64>,
3030
pub message_handler_user_lock_timeouts: IntCounter,
3131
pub message_handler_batch_lock_timeouts: IntCounter,
32+
pub message_handler_user_states_lock_timeouts: IntCounter,
3233
pub available_data_services: IntGauge,
3334
}
3435

@@ -97,6 +98,11 @@ impl BatcherMetrics {
9798
"Message Handler Batch Lock Timeouts"
9899
))?;
99100

101+
let message_handler_user_states_lock_timeouts = register_int_counter!(opts!(
102+
"message_handler_user_states_lock_timeouts_count",
103+
"Message Handler User States Lock Timeouts"
104+
))?;
105+
100106
registry.register(Box::new(open_connections.clone()))?;
101107
registry.register(Box::new(received_proofs.clone()))?;
102108
registry.register(Box::new(sent_batches.clone()))?;
@@ -115,6 +121,7 @@ impl BatcherMetrics {
115121
registry.register(Box::new(batcher_gas_cost_cancel_task_total.clone()))?;
116122
registry.register(Box::new(message_handler_user_lock_timeouts.clone()))?;
117123
registry.register(Box::new(message_handler_batch_lock_timeouts.clone()))?;
124+
registry.register(Box::new(message_handler_user_states_lock_timeouts.clone()))?;
118125
registry.register(Box::new(available_data_services.clone()))?;
119126

120127
let metrics_route = warp::path!("metrics")
@@ -146,6 +153,7 @@ impl BatcherMetrics {
146153
batcher_gas_cost_cancel_task_total,
147154
message_handler_user_lock_timeouts,
148155
message_handler_batch_lock_timeouts,
156+
message_handler_user_states_lock_timeouts,
149157
available_data_services,
150158
})
151159
}
@@ -188,4 +196,8 @@ impl BatcherMetrics {
188196
pub fn inc_message_handler_batch_lock_timeout(&self) {
189197
self.message_handler_batch_lock_timeouts.inc();
190198
}
199+
200+
pub fn inc_message_handler_user_states_lock_timeouts(&self) {
201+
self.message_handler_user_states_lock_timeouts.inc();
202+
}
191203
}

0 commit comments

Comments
 (0)