Skip to content

Commit 173d4c8

Browse files
committed
Remove all functions that takes and drops lock
1 parent fc6826d commit 173d4c8

File tree

1 file changed

+112
-163
lines changed

1 file changed

+112
-163
lines changed

crates/batcher/src/lib.rs

Lines changed: 112 additions & 163 deletions
Original file line numberDiff line numberDiff line change
@@ -282,89 +282,6 @@ impl Batcher {
282282
}
283283
}
284284

285-
// Helper methods for user_states operations with per-address locking
286-
287-
async fn get_user_nonce(&self, addr: &Address) -> Option<U256> {
288-
let user_state = self.user_states.get(addr)?;
289-
let user_state_guard = user_state.lock().await;
290-
Some(user_state_guard.nonce)
291-
}
292-
293-
async fn get_user_last_max_fee_limit(&self, addr: &Address) -> Option<U256> {
294-
let user_state = self.user_states.get(addr)?;
295-
let user_state_guard = user_state.lock().await;
296-
Some(user_state_guard.last_max_fee_limit)
297-
}
298-
299-
async fn get_user_total_fees_in_queue(&self, addr: &Address) -> Option<U256> {
300-
let user_state = self.user_states.get(addr)?;
301-
let user_state_guard = user_state.lock().await;
302-
Some(user_state_guard.total_fees_in_queue)
303-
}
304-
305-
async fn get_user_proof_count(&self, addr: &Address) -> Option<usize> {
306-
let user_state = self.user_states.get(addr)?;
307-
let user_state_guard = user_state.lock().await;
308-
Some(user_state_guard.proofs_in_batch)
309-
}
310-
311-
async fn update_user_nonce(&self, addr: &Address, new_nonce: U256) -> Option<U256> {
312-
let user_state = self.user_states.get(addr)?;
313-
let mut user_state_guard = user_state.lock().await;
314-
user_state_guard.nonce = new_nonce;
315-
Some(new_nonce)
316-
}
317-
318-
async fn update_user_max_fee_limit(&self, addr: &Address, new_max_fee_limit: U256) -> Option<U256> {
319-
let user_state = self.user_states.get(addr)?;
320-
let mut user_state_guard = user_state.lock().await;
321-
user_state_guard.last_max_fee_limit = new_max_fee_limit;
322-
Some(new_max_fee_limit)
323-
}
324-
325-
async fn update_user_proof_count(&self, addr: &Address, new_proof_count: usize) -> Option<usize> {
326-
let user_state = self.user_states.get(addr)?;
327-
let mut user_state_guard = user_state.lock().await;
328-
user_state_guard.proofs_in_batch = new_proof_count;
329-
Some(new_proof_count)
330-
}
331-
332-
async fn update_user_total_fees_in_queue(&self, addr: &Address, new_total_fees_in_queue: U256) -> Option<U256> {
333-
let user_state = self.user_states.get(addr)?;
334-
let mut user_state_guard = user_state.lock().await;
335-
user_state_guard.total_fees_in_queue = new_total_fees_in_queue;
336-
Some(new_total_fees_in_queue)
337-
}
338-
339-
async fn update_user_total_fees_in_queue_of_replacement_message(
340-
&self,
341-
addr: &Address,
342-
original_max_fee: U256,
343-
new_max_fee: U256,
344-
) -> Option<U256> {
345-
let user_state = self.user_states.get(addr)?;
346-
let mut user_state_guard = user_state.lock().await;
347-
let fee_difference = new_max_fee - original_max_fee;
348-
user_state_guard.total_fees_in_queue += fee_difference;
349-
Some(user_state_guard.total_fees_in_queue)
350-
}
351-
352-
async fn update_user_state(
353-
&self,
354-
addr: &Address,
355-
new_nonce: U256,
356-
new_max_fee_limit: U256,
357-
new_proof_count: usize,
358-
new_total_fees_in_queue: U256,
359-
) -> Option<(U256, U256, usize, U256)> {
360-
let user_state = self.user_states.get(addr)?;
361-
let mut user_state_guard = user_state.lock().await;
362-
user_state_guard.nonce = new_nonce;
363-
user_state_guard.last_max_fee_limit = new_max_fee_limit;
364-
user_state_guard.proofs_in_batch = new_proof_count;
365-
user_state_guard.total_fees_in_queue = new_total_fees_in_queue;
366-
Some((new_nonce, new_max_fee_limit, new_proof_count, new_total_fees_in_queue))
367-
}
368285

369286
fn get_user_min_fee_in_batch(&self, addr: &Address, batch_queue: &types::batch_queue::BatchQueue) -> U256 {
370287
batch_queue
@@ -699,7 +616,16 @@ impl Batcher {
699616
address = replacement_addr;
700617
}
701618

702-
let cached_user_nonce = self.get_user_nonce(&address).await;
619+
let cached_user_nonce = {
620+
let user_state = self.user_states.get(&address);
621+
match user_state {
622+
Some(user_state) => {
623+
let user_state_guard = user_state.lock().await;
624+
Some(user_state_guard.nonce)
625+
}
626+
None => None,
627+
}
628+
};
703629

704630
let user_nonce = if let Some(user_nonce) = cached_user_nonce {
705631
user_nonce
@@ -887,7 +813,17 @@ impl Batcher {
887813
};
888814

889815
let msg_max_fee = nonced_verification_data.max_fee;
890-
let Some(user_last_max_fee_limit) = self.get_user_last_max_fee_limit(&addr).await else {
816+
let user_last_max_fee_limit = {
817+
let user_state = self.user_states.get(&addr);
818+
match user_state {
819+
Some(user_state) => {
820+
let user_state_guard = user_state.lock().await;
821+
Some(user_state_guard.last_max_fee_limit)
822+
}
823+
None => None,
824+
}
825+
};
826+
let Some(user_last_max_fee_limit) = user_last_max_fee_limit else {
891827
send_message(
892828
ws_conn_sink.clone(),
893829
SubmitProofResponseMessage::AddToBatchError,
@@ -897,7 +833,17 @@ impl Batcher {
897833
return Ok(());
898834
};
899835

900-
let Some(user_accumulated_fee) = self.get_user_total_fees_in_queue(&addr).await else {
836+
let user_accumulated_fee = {
837+
let user_state = self.user_states.get(&addr);
838+
match user_state {
839+
Some(user_state) => {
840+
let user_state_guard = user_state.lock().await;
841+
Some(user_state_guard.total_fees_in_queue)
842+
}
843+
None => None,
844+
}
845+
};
846+
let Some(user_accumulated_fee) = user_accumulated_fee else {
901847
send_message(
902848
ws_conn_sink.clone(),
903849
SubmitProofResponseMessage::AddToBatchError,
@@ -917,7 +863,16 @@ impl Batcher {
917863
return Ok(());
918864
}
919865

920-
let cached_user_nonce = self.get_user_nonce(&addr).await;
866+
let cached_user_nonce = {
867+
let user_state = self.user_states.get(&addr);
868+
match user_state {
869+
Some(user_state) => {
870+
let user_state_guard = user_state.lock().await;
871+
Some(user_state_guard.nonce)
872+
}
873+
None => None,
874+
}
875+
};
921876

922877
let Some(expected_nonce) = cached_user_nonce else {
923878
error!("Failed to get cached user nonce: User not found in user states, but it should have been already inserted");
@@ -1171,39 +1126,46 @@ impl Batcher {
11711126

11721127
// update max_fee_limit
11731128
let updated_max_fee_limit_in_batch = self.get_user_min_fee_in_batch(&addr, &batch_state_lock.batch_queue);
1174-
if self
1175-
.update_user_max_fee_limit(&addr, updated_max_fee_limit_in_batch)
1176-
.await
1177-
.is_none()
11781129
{
1179-
std::mem::drop(batch_state_lock);
1180-
warn!("User state for address {addr:?} was not present in batcher user states, but it should be");
1181-
send_message(
1182-
ws_conn_sink.clone(),
1183-
SubmitProofResponseMessage::AddToBatchError,
1184-
)
1185-
.await;
1186-
return;
1187-
};
1130+
let user_state = self.user_states.get(&addr);
1131+
match user_state {
1132+
Some(user_state) => {
1133+
let mut user_state_guard = user_state.lock().await;
1134+
user_state_guard.last_max_fee_limit = updated_max_fee_limit_in_batch;
1135+
}
1136+
None => {
1137+
std::mem::drop(batch_state_lock);
1138+
warn!("User state for address {addr:?} was not present in batcher user states, but it should be");
1139+
send_message(
1140+
ws_conn_sink.clone(),
1141+
SubmitProofResponseMessage::AddToBatchError,
1142+
)
1143+
.await;
1144+
return;
1145+
}
1146+
}
1147+
}
11881148

11891149
// update total_fees_in_queue
1190-
if self
1191-
.update_user_total_fees_in_queue_of_replacement_message(
1192-
&addr,
1193-
original_max_fee,
1194-
replacement_max_fee,
1195-
)
1196-
.await
1197-
.is_none()
11981150
{
1199-
std::mem::drop(batch_state_lock);
1200-
warn!("User state for address {addr:?} was not present in batcher user states, but it should be");
1201-
send_message(
1202-
ws_conn_sink.clone(),
1203-
SubmitProofResponseMessage::AddToBatchError,
1204-
)
1205-
.await;
1206-
};
1151+
let user_state = self.user_states.get(&addr);
1152+
match user_state {
1153+
Some(user_state) => {
1154+
let mut user_state_guard = user_state.lock().await;
1155+
let fee_difference = replacement_max_fee - original_max_fee;
1156+
user_state_guard.total_fees_in_queue += fee_difference;
1157+
}
1158+
None => {
1159+
std::mem::drop(batch_state_lock);
1160+
warn!("User state for address {addr:?} was not present in batcher user states, but it should be");
1161+
send_message(
1162+
ws_conn_sink.clone(),
1163+
SubmitProofResponseMessage::AddToBatchError,
1164+
)
1165+
.await;
1166+
}
1167+
}
1168+
}
12071169
}
12081170

12091171
async fn disabled_verifiers(&self) -> Result<U256, ContractError<SignerMiddlewareT>> {
@@ -1275,40 +1237,26 @@ impl Batcher {
12751237

12761238
info!("Current batch queue length: {}", queue_len);
12771239

1278-
let Some(user_proof_count) = self.get_user_proof_count(&proof_submitter_addr).await else {
1279-
error!("User state of address {proof_submitter_addr} was not found when trying to update user state. This user state should have been present");
1280-
std::mem::drop(batch_state_lock);
1281-
return Err(BatcherError::AddressNotFoundInUserStates(
1282-
proof_submitter_addr,
1283-
));
1284-
};
1285-
1286-
let Some(current_total_fees_in_queue) = self.get_user_total_fees_in_queue(&proof_submitter_addr).await else {
1287-
error!("User state of address {proof_submitter_addr} was not found when trying to update user state. This user state should have been present");
1288-
std::mem::drop(batch_state_lock);
1289-
return Err(BatcherError::AddressNotFoundInUserStates(
1290-
proof_submitter_addr,
1291-
));
1292-
};
1293-
12941240
// User state is updated
1295-
if self
1296-
.update_user_state(
1297-
&proof_submitter_addr,
1298-
nonce + U256::one(),
1299-
max_fee,
1300-
user_proof_count + 1,
1301-
current_total_fees_in_queue + max_fee,
1302-
)
1303-
.await
1304-
.is_none()
13051241
{
1306-
error!("User state of address {proof_submitter_addr} was not found when trying to update user state. This user state should have been present");
1307-
std::mem::drop(batch_state_lock);
1308-
return Err(BatcherError::AddressNotFoundInUserStates(
1309-
proof_submitter_addr,
1310-
));
1311-
};
1242+
let user_state = self.user_states.get(&proof_submitter_addr);
1243+
match user_state {
1244+
Some(user_state) => {
1245+
let mut user_state_guard = user_state.lock().await;
1246+
user_state_guard.nonce = nonce + U256::one();
1247+
user_state_guard.last_max_fee_limit = max_fee;
1248+
user_state_guard.proofs_in_batch += 1;
1249+
user_state_guard.total_fees_in_queue += max_fee;
1250+
}
1251+
None => {
1252+
error!("User state of address {proof_submitter_addr} was not found when trying to update user state. This user state should have been present");
1253+
std::mem::drop(batch_state_lock);
1254+
return Err(BatcherError::AddressNotFoundInUserStates(
1255+
proof_submitter_addr,
1256+
));
1257+
}
1258+
}
1259+
}
13121260

13131261
Ok(())
13141262
}
@@ -1420,21 +1368,22 @@ impl Batcher {
14201368
// informative error.
14211369

14221370
// Now we update the user states related to the batch (proof count in batch and min fee in batch)
1423-
self.update_user_proof_count(addr, *proof_count)
1424-
.await
1425-
.ok_or(BatcherError::QueueRemoveError(
1426-
"Could not update_user_proof_count".into(),
1427-
))?;
1428-
self.update_user_max_fee_limit(addr, *max_fee_limit)
1429-
.await
1430-
.ok_or(BatcherError::QueueRemoveError(
1431-
"Could not update_user_max_fee_limit".into(),
1432-
))?;
1433-
self.update_user_total_fees_in_queue(addr, *total_fees_in_queue)
1434-
.await
1435-
.ok_or(BatcherError::QueueRemoveError(
1436-
"Could not update_user_total_fees_in_queue".into(),
1437-
))?;
1371+
{
1372+
let user_state = self.user_states.get(addr);
1373+
match user_state {
1374+
Some(user_state) => {
1375+
let mut user_state_guard = user_state.lock().await;
1376+
user_state_guard.proofs_in_batch = *proof_count;
1377+
user_state_guard.last_max_fee_limit = *max_fee_limit;
1378+
user_state_guard.total_fees_in_queue = *total_fees_in_queue;
1379+
}
1380+
None => {
1381+
return Err(BatcherError::QueueRemoveError(
1382+
"Could not update user state".into(),
1383+
));
1384+
}
1385+
}
1386+
}
14381387
}
14391388

14401389
// Update metrics

0 commit comments

Comments
 (0)