Skip to content

Commit 6cdf54e

Browse files
committed
feat: priority for batch building process when acquiring user locks
1 parent ae55834 commit 6cdf54e

File tree

1 file changed

+28
-2
lines changed

1 file changed

+28
-2
lines changed

crates/batcher/src/lib.rs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -683,7 +683,6 @@ impl Batcher {
683683
// If it was not present, then the user nonce is queried to the Aligned contract.
684684
// Lastly, we get a lock of the batch state again and insert the user state if it was still missing.
685685

686-
// Step 1: Get or insert the per-address mutex under lock
687686
let is_user_in_state: bool = {
688687
let batch_state_lock = self.batch_state.lock().await;
689688
batch_state_lock.user_states.contains_key(&addr)
@@ -739,7 +738,28 @@ impl Batcher {
739738
.or_insert_with(|| Arc::new(Mutex::new(())))
740739
.clone()
741740
};
742-
let _user_mutex = user_mutex.lock().await;
741+
742+
// This looks very ugly but basically, we are doing the following:
743+
// 1. We try to acquire the `user_mutex`: this can take some time if there is another task with it
744+
// 2. While that time that passes, the batcher might have tried to build a new batch, so we check the `batch_building_mutex`
745+
// 3. If it is taken, then release the lock so the batcher can continue building (as it is waiting for all user mutex to finish)
746+
// 4. If it isn't building then continue with the message
747+
//
748+
// This is done to give the batcher builder process priority
749+
// and prevent a situation where we are the batcher wants to build a new batch
750+
// but it has to wait for a ton of messages to be processed first
751+
// Leading to a decrease batch throughput
752+
let _user_mutex = loop {
753+
let _user_mutex = user_mutex.lock().await;
754+
let res = self.batch_building_mutex.try_lock();
755+
if res.is_ok() {
756+
break _user_mutex;
757+
} else {
758+
drop(_user_mutex);
759+
// tell the runtime to we are done for now and continue with another task
760+
tokio::task::yield_now().await;
761+
}
762+
};
743763
debug!("User mutex for {:?} acquired...", addr_in_msg);
744764

745765
let msg_max_fee = nonced_verification_data.max_fee;
@@ -815,6 +835,11 @@ impl Batcher {
815835

816836
// We check this after replacement logic because if user wants to replace a proof, their
817837
// new_max_fee must be greater or equal than old_max_fee
838+
//
839+
// Note: we don't do this before the handle_replacement_message as this operation can block for some time
840+
// this is run again in the handle_replacement_message
841+
// by enforcing stricter rules in replacements (a min bump + min fee) we can be sure this is run on valid message that user will actually pay for it
842+
// and so running the pre-verification isn't free
818843
if msg_max_fee > user_last_max_fee_limit {
819844
warn!("Invalid max fee for address {addr}, had fee limit of {user_last_max_fee_limit:?}, sent {msg_max_fee:?}");
820845
send_message(
@@ -826,6 +851,7 @@ impl Batcher {
826851
return Ok(());
827852
}
828853

854+
// When pre-verification is enabled, batcher will verify proofs for faster feedback with clients
829855
if let Err(e) = self.verify_proof(&nonced_verification_data).await {
830856
send_message(
831857
ws_conn_sink.clone(),

0 commit comments

Comments
 (0)