Skip to content

Commit 3492e7e

Browse files
JuArceMauroToscano
andauthored
fix(batcher): remove is_recovering_from_submission_failure (#2056)
Co-authored-by: MauroFab <[email protected]>
1 parent 8adf57f commit 3492e7e

File tree

4 files changed

+96
-52
lines changed

4 files changed

+96
-52
lines changed

crates/batcher/src/lib.rs

Lines changed: 56 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ use lambdaworks_crypto::merkle_tree::traits::IsMerkleTreeBackend;
4747
use log::{debug, error, info, warn};
4848
use tokio::net::{TcpListener, TcpStream};
4949
use tokio::sync::{Mutex, MutexGuard, RwLock};
50+
51+
// Message handler lock timeout
52+
const MESSAGE_HANDLER_LOCK_TIMEOUT: Duration = Duration::from_secs(10);
5053
use tokio_tungstenite::tungstenite::{Error, Message};
5154
use types::batch_queue::{self, BatchQueueEntry, BatchQueueEntryPriority};
5255
use types::errors::{BatcherError, TransactionSendError};
@@ -112,11 +115,6 @@ pub struct Batcher {
112115
// Because of this exception, user message handling uses lock acquisition with timeouts.
113116
batch_state: Mutex<BatchState>,
114117

115-
/// Flag to indicate when recovery is in progress
116-
/// When true, message handlers will return ServerBusy responses
117-
/// It's used a way to "lock" all the user_states at the same time
118-
/// If one needed is taken in the handle message it will time out
119-
is_recovering_from_submission_failure: RwLock<bool>,
120118
user_states: Arc<RwLock<HashMap<Address, Arc<Mutex<UserState>>>>>,
121119

122120
last_uploaded_batch_block: Mutex<u64>,
@@ -335,7 +333,6 @@ impl Batcher {
335333
batch_state: Mutex::new(batch_state),
336334
user_states,
337335
disabled_verifiers: Mutex::new(disabled_verifiers),
338-
is_recovering_from_submission_failure: RwLock::new(false),
339336
metrics,
340337
telemetry,
341338
}
@@ -423,7 +420,7 @@ impl Batcher {
423420
where
424421
F: std::future::Future<Output = T>,
425422
{
426-
match timeout(Duration::from_secs(15), lock_future).await {
423+
match timeout(MESSAGE_HANDLER_LOCK_TIMEOUT, lock_future).await {
427424
Ok(result) => Some(result),
428425
Err(_) => {
429426
warn!("User lock acquisition timed out for address {}", addr);
@@ -438,7 +435,7 @@ impl Batcher {
438435
where
439436
F: std::future::Future<Output = T>,
440437
{
441-
match timeout(Duration::from_secs(15), lock_future).await {
438+
match timeout(MESSAGE_HANDLER_LOCK_TIMEOUT, lock_future).await {
442439
Ok(result) => Some(result),
443440
Err(_) => {
444441
warn!("Batch lock acquisition timed out");
@@ -709,17 +706,6 @@ impl Batcher {
709706
mut address: Address,
710707
ws_conn_sink: WsMessageSink,
711708
) -> Result<(), Error> {
712-
// Check if restoration is in progress
713-
if *self.is_recovering_from_submission_failure.read().await {
714-
warn!(
715-
"Rejecting nonce request from {} during restoration",
716-
address
717-
);
718-
let response = GetNonceResponseMessage::ServerBusy;
719-
send_message(ws_conn_sink, response).await;
720-
return Ok(());
721-
}
722-
723709
// If the address is not paying, we will return the nonce of the aligned_payment_address
724710
if !self.has_to_pay(&address) {
725711
info!("Handling nonpaying message");
@@ -741,7 +727,21 @@ impl Batcher {
741727
}
742728

743729
let cached_user_nonce = {
744-
let user_state_ref = self.user_states.read().await.get(&address).cloned();
730+
let user_states_guard = match timeout(
731+
MESSAGE_HANDLER_LOCK_TIMEOUT,
732+
self.user_states.read(),
733+
)
734+
.await
735+
{
736+
Ok(guard) => guard,
737+
Err(_) => {
738+
warn!("User states read lock acquisition timed out in handle_get_nonce_for_address_msg");
739+
self.metrics.inc_message_handler_user_states_lock_timeouts();
740+
send_message(ws_conn_sink, GetNonceResponseMessage::ServerBusy).await;
741+
return Ok(());
742+
}
743+
};
744+
let user_state_ref = user_states_guard.get(&address).cloned();
745745
match user_state_ref {
746746
Some(user_state_ref) => {
747747
let Some(user_state_guard) = self
@@ -804,21 +804,6 @@ impl Batcher {
804804
debug!("Received message with nonce: {msg_nonce:?}");
805805
self.metrics.received_proofs.inc();
806806

807-
// Check if restoration is in progress
808-
if *self.is_recovering_from_submission_failure.read().await {
809-
warn!(
810-
"Rejecting proof submission from {} during restoration (nonce: {})",
811-
client_msg
812-
.verification_data
813-
.verification_data
814-
.proof_generator_addr,
815-
msg_nonce
816-
);
817-
let response = SubmitProofResponseMessage::ServerBusy;
818-
send_message(ws_conn_sink, response).await;
819-
return Ok(());
820-
}
821-
822807
// * ---------------------------------------------------*
823808
// * Perform validations over the message *
824809
// * ---------------------------------------------------*
@@ -881,7 +866,17 @@ impl Batcher {
881866
// If it was not present, then the user nonce is queried to the Aligned contract.
882867
// Lastly, we get a lock of the batch state again and insert the user state if it was still missing.
883868

884-
let is_user_in_state = self.user_states.read().await.contains_key(&addr);
869+
let is_user_in_state = match timeout(MESSAGE_HANDLER_LOCK_TIMEOUT, self.user_states.read())
870+
.await
871+
{
872+
Ok(user_states_guard) => user_states_guard.contains_key(&addr),
873+
Err(_) => {
874+
warn!("User states read lock acquisition timed out in handle_submit_proof_msg (user check)");
875+
self.metrics.inc_message_handler_user_states_lock_timeouts();
876+
send_message(ws_conn_sink, SubmitProofResponseMessage::ServerBusy).await;
877+
return Ok(());
878+
}
879+
};
885880

886881
if !is_user_in_state {
887882
// If the user state was not present, we need to get the nonce from the Ethereum contract
@@ -903,14 +898,32 @@ impl Batcher {
903898
debug!("User state for address {addr:?} not found, creating a new one");
904899
// We add a dummy user state to grab a lock on the user state
905900
let dummy_user_state = UserState::new(ethereum_user_nonce);
906-
self.user_states
907-
.write()
908-
.await
909-
.insert(addr, Arc::new(Mutex::new(dummy_user_state)));
901+
match timeout(MESSAGE_HANDLER_LOCK_TIMEOUT, self.user_states.write()).await {
902+
Ok(mut user_states_guard) => {
903+
user_states_guard.insert(addr, Arc::new(Mutex::new(dummy_user_state)));
904+
}
905+
Err(_) => {
906+
warn!("User states write lock acquisition timed out in handle_submit_proof_msg (user creation)");
907+
self.metrics.inc_message_handler_user_states_lock_timeouts();
908+
send_message(ws_conn_sink, SubmitProofResponseMessage::ServerBusy).await;
909+
return Ok(());
910+
}
911+
};
910912
debug!("Dummy user state for address {addr:?} created");
911913
}
912914

913-
let Some(user_state_ref) = self.user_states.read().await.get(&addr).cloned() else {
915+
let user_state_ref = match timeout(MESSAGE_HANDLER_LOCK_TIMEOUT, self.user_states.read())
916+
.await
917+
{
918+
Ok(user_states_guard) => user_states_guard.get(&addr).cloned(),
919+
Err(_) => {
920+
warn!("User states read lock acquisition timed out in handle_submit_proof_msg (user retrieval)");
921+
self.metrics.inc_message_handler_user_states_lock_timeouts();
922+
send_message(ws_conn_sink, SubmitProofResponseMessage::ServerBusy).await;
923+
return Ok(());
924+
}
925+
};
926+
let Some(user_state_ref) = user_state_ref else {
914927
error!("This should never happen, user state has previously been inserted if it didn't exist");
915928
send_message(
916929
ws_conn_sink.clone(),
@@ -1621,9 +1634,7 @@ impl Batcher {
16211634
failed_batch.len()
16221635
);
16231636

1624-
// Set restoration flag to stop handling new user messages
1625-
*self.is_recovering_from_submission_failure.write().await = true;
1626-
1637+
let user_states_lock = self.user_states.write().await;
16271638
let mut batch_state_lock = self.batch_state.lock().await;
16281639
let mut restored_entries = Vec::new();
16291640

@@ -1689,8 +1700,8 @@ impl Batcher {
16891700
// Only auxiliary user data (max_min_fee) can be "inconsistent"
16901701
// but we can keep updating it without locking the queue
16911702
info!("Queue recovered from submission failure, resuming user processing and updating user states metadata");
1703+
std::mem::drop(user_states_lock);
16921704
std::mem::drop(batch_state_lock);
1693-
*self.is_recovering_from_submission_failure.write().await = false;
16941705

16951706
info!("Updating user states after proof restoration...");
16961707
if let Err(e) = self

crates/batcher/src/metrics.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ pub struct BatcherMetrics {
2929
pub batcher_gas_cost_cancel_task_total: GenericCounter<AtomicF64>,
3030
pub message_handler_user_lock_timeouts: IntCounter,
3131
pub message_handler_batch_lock_timeouts: IntCounter,
32+
pub message_handler_user_states_lock_timeouts: IntCounter,
3233
pub available_data_services: IntGauge,
3334
}
3435

@@ -97,6 +98,11 @@ impl BatcherMetrics {
9798
"Message Handler Batch Lock Timeouts"
9899
))?;
99100

101+
let message_handler_user_states_lock_timeouts = register_int_counter!(opts!(
102+
"message_handler_user_states_lock_timeouts_count",
103+
"Message Handler User States Lock Timeouts"
104+
))?;
105+
100106
registry.register(Box::new(open_connections.clone()))?;
101107
registry.register(Box::new(received_proofs.clone()))?;
102108
registry.register(Box::new(sent_batches.clone()))?;
@@ -115,6 +121,7 @@ impl BatcherMetrics {
115121
registry.register(Box::new(batcher_gas_cost_cancel_task_total.clone()))?;
116122
registry.register(Box::new(message_handler_user_lock_timeouts.clone()))?;
117123
registry.register(Box::new(message_handler_batch_lock_timeouts.clone()))?;
124+
registry.register(Box::new(message_handler_user_states_lock_timeouts.clone()))?;
118125
registry.register(Box::new(available_data_services.clone()))?;
119126

120127
let metrics_route = warp::path!("metrics")
@@ -146,6 +153,7 @@ impl BatcherMetrics {
146153
batcher_gas_cost_cancel_task_total,
147154
message_handler_user_lock_timeouts,
148155
message_handler_batch_lock_timeouts,
156+
message_handler_user_states_lock_timeouts,
149157
available_data_services,
150158
})
151159
}
@@ -188,4 +196,8 @@ impl BatcherMetrics {
188196
pub fn inc_message_handler_batch_lock_timeout(&self) {
189197
self.message_handler_batch_lock_timeouts.inc();
190198
}
199+
200+
pub fn inc_message_handler_user_states_lock_timeouts(&self) {
201+
self.message_handler_user_states_lock_timeouts.inc();
202+
}
191203
}

crates/batcher/src/types/batch_queue.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,9 @@ pub(crate) fn extract_batch_directly(
170170
let (rejected_entry, rejected_priority) = batch_queue.pop().unwrap();
171171

172172
// Update batch size
173-
let verification_data_size = rejected_entry.nonced_verification_data.cbor_size_upper_bound();
173+
let verification_data_size = rejected_entry
174+
.nonced_verification_data
175+
.cbor_size_upper_bound();
174176
batch_size -= verification_data_size;
175177

176178
rejected_entries.push((rejected_entry, rejected_priority));

grafana/provisioning/dashboards/aligned/aggregator_batcher.json

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
"editable": true,
1919
"fiscalYearStartMonth": 0,
2020
"graphTooltip": 0,
21-
"id": 4,
21+
"id": 2,
2222
"links": [],
2323
"liveNow": false,
2424
"panels": [
@@ -4018,9 +4018,26 @@
40184018
"range": true,
40194019
"refId": "B",
40204020
"useBackend": false
4021+
},
4022+
{
4023+
"datasource": {
4024+
"type": "prometheus",
4025+
"uid": "prometheus"
4026+
},
4027+
"disableTextWrap": false,
4028+
"editorMode": "builder",
4029+
"expr": " floor(increase(message_handler_user_states_lock_timeouts_count{job=\"aligned-batcher\"}[$__range]))",
4030+
"fullMetaSearch": false,
4031+
"hide": false,
4032+
"includeNullMetadata": true,
4033+
"instant": false,
4034+
"legendFormat": "__auto",
4035+
"range": true,
4036+
"refId": "C",
4037+
"useBackend": false
40214038
}
40224039
],
4023-
"title": "Message Handler - (Batch | User) Lock Timeout",
4040+
"title": "Message Handler - (Batch | User | User State Map) Lock Timeout",
40244041
"type": "stat"
40254042
},
40264043
{
@@ -4287,7 +4304,8 @@
42874304
"mode": "absolute",
42884305
"steps": [
42894306
{
4290-
"color": "green"
4307+
"color": "green",
4308+
"value": null
42914309
},
42924310
{
42934311
"color": "red",
@@ -4418,7 +4436,8 @@
44184436
"mode": "absolute",
44194437
"steps": [
44204438
{
4421-
"color": "green"
4439+
"color": "green",
4440+
"value": null
44224441
},
44234442
{
44244443
"color": "red",
@@ -4483,6 +4502,6 @@
44834502
"timezone": "browser",
44844503
"title": "System Data",
44854504
"uid": "aggregator",
4486-
"version": 20,
4505+
"version": 9,
44874506
"weekStart": ""
4488-
}
4507+
}

0 commit comments

Comments
 (0)