Skip to content
61 changes: 35 additions & 26 deletions crates/batcher/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use types::batch_state::BatchState;
use types::user_state::UserState;

use batch_queue::calculate_batch_size;
use dashmap::DashMap;
use std::collections::HashMap;
use std::env;
use std::net::SocketAddr;
Expand Down Expand Up @@ -109,9 +108,9 @@ pub struct Batcher {
/// Flag to indicate when recovery is in progress
/// When true, message handlers will return ServerBusy responses
/// It's used a way to "lock" all the user_states at the same time
/// If one needed is taken in the handle message it will timeout
/// If one needed is taken in the handle message it will time out
is_recovering_from_submission_failure: RwLock<bool>,
user_states: DashMap<Address, Arc<Mutex<UserState>>>,
user_states: Arc<RwLock<HashMap<Address, Arc<Mutex<UserState>>>>>,

last_uploaded_batch_block: Mutex<u64>,

Expand Down Expand Up @@ -181,7 +180,7 @@ impl Batcher {
let deployment_output =
ContractDeploymentOutput::new(config.aligned_layer_deployment_config_file_path);

log::info!(
info!(
"Starting metrics server on port {}",
config.batcher.metrics_port
);
Expand Down Expand Up @@ -262,7 +261,7 @@ impl Batcher {
.await
.expect("Failed to get fallback Service Manager contract");

let user_states = DashMap::new();
let user_states = Arc::new(RwLock::new(HashMap::new()));
let batch_state = BatchState::new(config.batcher.max_queue_size);
let non_paying_config = if let Some(non_paying_config) = config.batcher.non_paying {
warn!("Non-paying address configuration detected. Will replace non-paying address {} with configured address.",
Expand All @@ -276,7 +275,7 @@ impl Batcher {
.expect("Could not get non-paying nonce from Ethereum");

let non_paying_user_state = UserState::new(nonpaying_nonce);
user_states.insert(
user_states.write().await.insert(
non_paying_config.replacement.address(),
Arc::new(Mutex::new(non_paying_user_state)),
);
Expand Down Expand Up @@ -335,7 +334,7 @@ impl Batcher {
}
}

fn update_evicted_user_state_with_lock(
async fn update_evicted_user_state_with_lock(
&self,
removed_entry: &types::batch_queue::BatchQueueEntry,
batch_queue: &types::batch_queue::BatchQueue,
Expand All @@ -350,7 +349,7 @@ impl Batcher {
{
Some((last_entry, _)) => last_entry.nonced_verification_data.max_fee,
None => {
self.user_states.remove(&addr);
self.user_states.write().await.remove(&addr);
return;
}
};
Expand All @@ -376,12 +375,12 @@ impl Batcher {
{
Some((last_entry, _)) => last_entry.nonced_verification_data.max_fee,
None => {
self.user_states.remove(&addr);
self.user_states.write().await.remove(&addr);
return Some(());
}
};

let user_state = self.user_states.get(&addr)?;
let user_state = self.user_states.read().await.get(&addr)?.clone();
let mut user_state_guard = user_state.lock().await;
user_state_guard.proofs_in_batch -= 1;
user_state_guard.nonce -= U256::one();
Expand All @@ -392,7 +391,7 @@ impl Batcher {

fn calculate_new_user_states_data(
&self,
batch_queue: &types::batch_queue::BatchQueue,
batch_queue: &batch_queue::BatchQueue,
) -> HashMap<Address, (usize, U256, U256)> {
let mut updated_user_states = HashMap::new();
for (entry, _) in batch_queue.iter() {
Expand Down Expand Up @@ -735,7 +734,7 @@ impl Batcher {
}

let cached_user_nonce = {
let user_state_ref = self.user_states.get(&address);
let user_state_ref = self.user_states.read().await.get(&address).cloned();
match user_state_ref {
Some(user_state_ref) => {
let Some(user_state_guard) = self
Expand Down Expand Up @@ -875,16 +874,20 @@ impl Batcher {
// If it was not present, then the user nonce is queried to the Aligned contract.
// Lastly, we get a lock of the batch state again and insert the user state if it was still missing.

let is_user_in_state = self.user_states.contains_key(&addr);
let is_user_in_state = self.user_states.read().await.contains_key(&addr);

if !is_user_in_state {
debug!("User state for address {addr:?} not found, creating a new one");
// We add a dummy user state to grab a lock on the user state
let dummy_user_state = UserState::new(U256::zero());
self.user_states
.write()
.await
.insert(addr, Arc::new(Mutex::new(dummy_user_state)));
debug!("Dummy user state for address {addr:?} created");
}

let Some(user_state_ref) = self.user_states.get(&addr) else {
let Some(user_state_ref) = self.user_states.read().await.get(&addr).cloned() else {
error!("This should never happen, user state has previously been inserted if it didn't exist");
send_message(
ws_conn_sink.clone(),
Expand Down Expand Up @@ -1042,7 +1045,9 @@ impl Batcher {

// Try to find any candidate whose lock we can acquire and immediately process them
for candidate_addr in eviction_candidates {
if let Some(user_state_arc) = self.user_states.get(&candidate_addr) {
if let Some(user_state_arc) =
self.user_states.read().await.get(&candidate_addr).cloned()
{
if let Ok(mut user_guard) = user_state_arc.try_lock() {
// Found someone whose lock we can get - now find and remove their entry
let entries_to_check: Vec<_> = batch_state_lock
Expand Down Expand Up @@ -1076,7 +1081,8 @@ impl Batcher {
&removed,
&batch_state_lock.batch_queue,
&mut user_guard,
);
)
.await;

if let Some(ref removed_entry_ws) = removed.messaging_sink {
let ws_sink = removed_entry_ws.clone();
Expand Down Expand Up @@ -1145,8 +1151,6 @@ impl Batcher {
user_state_guard.last_max_fee_limit = max_fee;
user_state_guard.proofs_in_batch += 1;
user_state_guard.total_fees_in_queue += max_fee;

info!("Verification data message handled");
Ok(())
}

Expand Down Expand Up @@ -1530,7 +1534,7 @@ impl Batcher {
) -> Result<(), BatcherError> {
// Update each user's state with proper lock ordering
for addr in affected_users {
if let Some(user_state) = self.user_states.get(&addr) {
if let Some(user_state) = self.user_states.read().await.get(&addr).cloned() {
let mut user_state_guard = user_state.lock().await; // First: user lock
let batch_state_lock = self.batch_state.lock().await; // Second: batch lock

Expand Down Expand Up @@ -1565,7 +1569,10 @@ impl Batcher {
/// Cleans up user states after successful batch submission.
/// Resets last_max_fee_limit to U256::MAX for users who had proofs in the submitted batch
/// but now have no proofs left in the queue.
fn cleanup_user_states_after_successful_submission(&self, finalized_batch: &[BatchQueueEntry]) {
async fn cleanup_user_states_after_successful_submission(
&self,
finalized_batch: &[BatchQueueEntry],
) {
use std::collections::HashSet;

// Get unique users from the submitted batch
Expand All @@ -1589,7 +1596,8 @@ impl Batcher {
for user_addr in users_in_batch {
if !current_user_states.contains_key(&user_addr) {
// User has no proofs left in queue - reset their max_fee_limit
if let Some(user_state_ref) = self.user_states.get(&user_addr) {
if let Some(user_state_ref) = self.user_states.read().await.get(&user_addr).cloned()
{
if let Ok(mut user_state_guard) = user_state_ref.try_lock() {
user_state_guard.last_max_fee_limit = U256::max_value();
}
Expand Down Expand Up @@ -1810,7 +1818,8 @@ impl Batcher {
}

// Clean up user states for users who had proofs in this batch but now have no proofs left
self.cleanup_user_states_after_successful_submission(finalized_batch);
self.cleanup_user_states_after_successful_submission(finalized_batch)
.await;

connection::send_batch_inclusion_data_responses(finalized_batch, &batch_merkle_tree).await
}
Expand All @@ -1828,7 +1837,7 @@ impl Batcher {

let Some(nonpaying_replacement_addr) = self.get_nonpaying_replacement_addr() else {
batch_state_lock.batch_queue.clear();
self.user_states.clear();
self.user_states.write().await.clear();
return;
};

Expand All @@ -1840,13 +1849,13 @@ impl Batcher {
.await
else {
batch_state_lock.batch_queue.clear();
self.user_states.clear();
self.user_states.write().await.clear();
return;
};
batch_state_lock.batch_queue.clear();
self.user_states.clear();
self.user_states.write().await.clear();
let nonpaying_user_state = UserState::new(nonpaying_replacement_addr_nonce);
self.user_states.insert(
self.user_states.write().await.insert(
nonpaying_replacement_addr,
Arc::new(Mutex::new(nonpaying_user_state)),
);
Expand Down
Loading