Skip to content

Commit b947b82

Browse files
committed
Merge branch 'rework_batcher_concurrency' into remove_is_recovering_from_submission_failure
2 parents 0a50a2e + 662bdd7 commit b947b82

File tree

1 file changed

+29
-27
lines changed

1 file changed

+29
-27
lines changed

crates/batcher/src/lib.rs

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,18 @@ pub struct Batcher {
9898
aggregator_fee_percentage_multiplier: u128,
9999
aggregator_gas_cost: u128,
100100

101-
// Shared state (Mutex)
102-
/// The general business rule is:
103-
/// - User processing can be done in parallel unless a batch creation is happening
104-
/// - Batch creation needs to be able to change all the states, so all processing
105-
/// needs to be stopped, and all user_states locks need to be taken
101+
// Shared state access:
102+
// Two kinds of threads interact with the shared state:
103+
// 1. User message processing threads (run in parallel)
104+
// 2. Batch creation thread (runs sequentially, includes failure recovery)
105+
//
106+
// Locking rules:
107+
// - To avoid deadlocks, always acquire `user_states` before `batch_state`.
108+
// - During failure recovery, restoring a valid state may require breaking this rule:
109+
// additional user locks might be acquired *after* the batch lock.
110+
// (See the `restore` algorithm in the `batch_queue` module.)
111+
//
112+
// Because of this exception, user message handling uses lock acquisition with timeouts.
106113
batch_state: Mutex<BatchState>,
107114

108115
user_states: Arc<RwLock<HashMap<Address, Arc<Mutex<UserState>>>>>,
@@ -845,9 +852,25 @@ impl Batcher {
845852
let is_user_in_state = self.user_states.read().await.contains_key(&addr);
846853

847854
if !is_user_in_state {
855+
// If the user state was not present, we need to get the nonce from the Ethereum contract
856+
let ethereum_user_nonce = match self.get_user_nonce_from_ethereum(addr).await {
857+
Ok(ethereum_user_nonce) => ethereum_user_nonce,
858+
Err(e) => {
859+
error!(
860+
"Failed to get user nonce from Ethereum for address {addr:?}. Error: {e:?}"
861+
);
862+
send_message(
863+
ws_conn_sink.clone(),
864+
SubmitProofResponseMessage::EthRpcError,
865+
)
866+
.await;
867+
self.metrics.user_error(&["eth_rpc_error", ""]);
868+
return Ok(());
869+
}
870+
};
848871
debug!("User state for address {addr:?} not found, creating a new one");
849872
// We add a dummy user state to grab a lock on the user state
850-
let dummy_user_state = UserState::new(U256::zero());
873+
let dummy_user_state = UserState::new(ethereum_user_nonce);
851874
self.user_states
852875
.write()
853876
.await
@@ -875,27 +898,6 @@ impl Batcher {
875898
return Ok(());
876899
};
877900

878-
// If the user state was not present, we need to get the nonce from the Ethereum contract and update the dummy user state
879-
if !is_user_in_state {
880-
let ethereum_user_nonce = match self.get_user_nonce_from_ethereum(addr).await {
881-
Ok(ethereum_user_nonce) => ethereum_user_nonce,
882-
Err(e) => {
883-
error!(
884-
"Failed to get user nonce from Ethereum for address {addr:?}. Error: {e:?}"
885-
);
886-
send_message(
887-
ws_conn_sink.clone(),
888-
SubmitProofResponseMessage::EthRpcError,
889-
)
890-
.await;
891-
self.metrics.user_error(&["eth_rpc_error", ""]);
892-
return Ok(());
893-
}
894-
};
895-
// Update the dummy user state with the correct nonce
896-
user_state_guard.nonce = ethereum_user_nonce;
897-
}
898-
899901
// * ---------------------------------------------------*
900902
// * Perform validations over user state *
901903
// * ---------------------------------------------------*

0 commit comments

Comments
 (0)