Skip to content

Commit b6caa39

Browse files
committed
refactor: use arc<rwlock<hashmap<...>>> instead of dashmap
1 parent c7a58ce commit b6caa39

File tree

1 file changed

+22
-23
lines changed

1 file changed

+22
-23
lines changed

crates/batcher/src/lib.rs

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ use types::batch_state::BatchState;
1717
use types::user_state::UserState;
1818

1919
use batch_queue::calculate_batch_size;
20-
use dashmap::DashMap;
2120
use std::collections::HashMap;
2221
use std::env;
2322
use std::net::SocketAddr;
@@ -111,7 +110,7 @@ pub struct Batcher {
111110
/// It's used a way to "lock" all the user_states at the same time
112111
/// If one needed is taken in the handle message it will timeout
113112
is_recovering_from_submission_failure: RwLock<bool>,
114-
user_states: DashMap<Address, Arc<Mutex<UserState>>>,
113+
user_states: Arc<tokio::sync::RwLock<HashMap<Address, Arc<Mutex<UserState>>>>>,
115114

116115
last_uploaded_batch_block: Mutex<u64>,
117116

@@ -262,7 +261,7 @@ impl Batcher {
262261
.await
263262
.expect("Failed to get fallback Service Manager contract");
264263

265-
let user_states = DashMap::new();
264+
let user_states = Arc::new(tokio::sync::RwLock::new(HashMap::new()));
266265
let batch_state = BatchState::new(config.batcher.max_queue_size);
267266
let non_paying_config = if let Some(non_paying_config) = config.batcher.non_paying {
268267
warn!("Non-paying address configuration detected. Will replace non-paying address {} with configured address.",
@@ -276,7 +275,7 @@ impl Batcher {
276275
.expect("Could not get non-paying nonce from Ethereum");
277276

278277
let non_paying_user_state = UserState::new(nonpaying_nonce);
279-
user_states.insert(
278+
user_states.write().await.insert(
280279
non_paying_config.replacement.address(),
281280
Arc::new(Mutex::new(non_paying_user_state)),
282281
);
@@ -335,7 +334,7 @@ impl Batcher {
335334
}
336335
}
337336

338-
fn update_evicted_user_state_with_lock(
337+
async fn update_evicted_user_state_with_lock(
339338
&self,
340339
removed_entry: &types::batch_queue::BatchQueueEntry,
341340
batch_queue: &types::batch_queue::BatchQueue,
@@ -350,7 +349,7 @@ impl Batcher {
350349
{
351350
Some((last_entry, _)) => last_entry.nonced_verification_data.max_fee,
352351
None => {
353-
self.user_states.remove(&addr);
352+
self.user_states.write().await.remove(&addr);
354353
return;
355354
}
356355
};
@@ -376,12 +375,12 @@ impl Batcher {
376375
{
377376
Some((last_entry, _)) => last_entry.nonced_verification_data.max_fee,
378377
None => {
379-
self.user_states.remove(&addr);
378+
self.user_states.write().await.remove(&addr);
380379
return Some(());
381380
}
382381
};
383382

384-
let user_state = self.user_states.get(&addr)?;
383+
let user_state = self.user_states.read().await.get(&addr)?.clone();
385384
let mut user_state_guard = user_state.lock().await;
386385
user_state_guard.proofs_in_batch -= 1;
387386
user_state_guard.nonce -= U256::one();
@@ -432,7 +431,7 @@ impl Batcher {
432431
where
433432
F: std::future::Future<Output = T>,
434433
{
435-
match timeout(Duration::from_secs(15), lock_future).await {
434+
match timeout(Duration::from_secs(150), lock_future).await {
436435
Ok(result) => Some(result),
437436
Err(_) => {
438437
warn!("Batch lock acquisition timed out");
@@ -735,7 +734,7 @@ impl Batcher {
735734
}
736735

737736
let cached_user_nonce = {
738-
let user_state_ref = self.user_states.get(&address);
737+
let user_state_ref = self.user_states.read().await.get(&address).cloned();
739738
match user_state_ref {
740739
Some(user_state_ref) => {
741740
let Some(user_state_guard) = self
@@ -881,18 +880,18 @@ impl Batcher {
881880
// If it was not present, then the user nonce is queried to the Aligned contract.
882881
// Lastly, we get a lock of the batch state again and insert the user state if it was still missing.
883882

884-
let is_user_in_state = self.user_states.contains_key(&addr);
883+
let is_user_in_state = self.user_states.read().await.contains_key(&addr);
885884

886885
if !is_user_in_state {
887886
warn!("User state for address {addr:?} not found, creating a new one");
888887
// We add a dummy user state to grab a lock on the user state
889888
let dummy_user_state = UserState::new(U256::zero());
890889
self.user_states
891-
.insert(addr, Arc::new(Mutex::new(dummy_user_state)));
890+
.write().await.insert(addr, Arc::new(Mutex::new(dummy_user_state)));
892891
warn!("Dummy user state for address {addr:?} created");
893892
}
894893

895-
let Some(user_state_ref) = self.user_states.get(&addr) else {
894+
let Some(user_state_ref) = self.user_states.read().await.get(&addr).cloned() else {
896895
error!("This should never happen, user state has previously been inserted if it didn't exist");
897896
send_message(
898897
ws_conn_sink.clone(),
@@ -1060,7 +1059,7 @@ impl Batcher {
10601059

10611060
// Try to find any candidate whose lock we can acquire and immediately process them
10621061
for candidate_addr in eviction_candidates {
1063-
if let Some(user_state_arc) = self.user_states.get(&candidate_addr) {
1062+
if let Some(user_state_arc) = self.user_states.read().await.get(&candidate_addr).cloned() {
10641063
if let Ok(mut user_guard) = user_state_arc.try_lock() {
10651064
// Found someone whose lock we can get - now find and remove their entry
10661065
let entries_to_check: Vec<_> = batch_state_lock
@@ -1094,7 +1093,7 @@ impl Batcher {
10941093
&removed,
10951094
&batch_state_lock.batch_queue,
10961095
&mut user_guard,
1097-
);
1096+
).await;
10981097

10991098
if let Some(ref removed_entry_ws) = removed.messaging_sink {
11001099
let ws_sink = removed_entry_ws.clone();
@@ -1553,7 +1552,7 @@ impl Batcher {
15531552
) -> Result<(), BatcherError> {
15541553
// Update each user's state with proper lock ordering
15551554
for addr in affected_users {
1556-
if let Some(user_state) = self.user_states.get(&addr) {
1555+
if let Some(user_state) = self.user_states.read().await.get(&addr).cloned() {
15571556
let mut user_state_guard = user_state.lock().await; // First: user lock
15581557
let batch_state_lock = self.batch_state.lock().await; // Second: batch lock
15591558

@@ -1588,7 +1587,7 @@ impl Batcher {
15881587
/// Cleans up user states after successful batch submission.
15891588
/// Resets last_max_fee_limit to U256::MAX for users who had proofs in the submitted batch
15901589
/// but now have no proofs left in the queue.
1591-
fn cleanup_user_states_after_successful_submission(&self, finalized_batch: &[BatchQueueEntry]) {
1590+
async fn cleanup_user_states_after_successful_submission(&self, finalized_batch: &[BatchQueueEntry]) {
15921591
use std::collections::HashSet;
15931592

15941593
// Get unique users from the submitted batch
@@ -1612,7 +1611,7 @@ impl Batcher {
16121611
for user_addr in users_in_batch {
16131612
if !current_user_states.contains_key(&user_addr) {
16141613
// User has no proofs left in queue - reset their max_fee_limit
1615-
if let Some(user_state_ref) = self.user_states.get(&user_addr) {
1614+
if let Some(user_state_ref) = self.user_states.read().await.get(&user_addr).cloned() {
16161615
if let Ok(mut user_state_guard) = user_state_ref.try_lock() {
16171616
user_state_guard.last_max_fee_limit = U256::max_value();
16181617
}
@@ -1833,7 +1832,7 @@ impl Batcher {
18331832
}
18341833

18351834
// Clean up user states for users who had proofs in this batch but now have no proofs left
1836-
self.cleanup_user_states_after_successful_submission(finalized_batch);
1835+
self.cleanup_user_states_after_successful_submission(finalized_batch).await;
18371836

18381837
connection::send_batch_inclusion_data_responses(finalized_batch, &batch_merkle_tree).await
18391838
}
@@ -1851,7 +1850,7 @@ impl Batcher {
18511850

18521851
let Some(nonpaying_replacement_addr) = self.get_nonpaying_replacement_addr() else {
18531852
batch_state_lock.batch_queue.clear();
1854-
self.user_states.clear();
1853+
self.user_states.write().await.clear();
18551854
return;
18561855
};
18571856

@@ -1863,13 +1862,13 @@ impl Batcher {
18631862
.await
18641863
else {
18651864
batch_state_lock.batch_queue.clear();
1866-
self.user_states.clear();
1865+
self.user_states.write().await.clear();
18671866
return;
18681867
};
18691868
batch_state_lock.batch_queue.clear();
1870-
self.user_states.clear();
1869+
self.user_states.write().await.clear();
18711870
let nonpaying_user_state = UserState::new(nonpaying_replacement_addr_nonce);
1872-
self.user_states.insert(
1871+
self.user_states.write().await.insert(
18731872
nonpaying_replacement_addr,
18741873
Arc::new(Mutex::new(nonpaying_user_state)),
18751874
);

0 commit comments

Comments
 (0)