Skip to content

Commit 8837ea4

Browse files
committed
refactor: remove building_batch_mutex and sync between proof processing and batch building
1 parent a46bf2c commit 8837ea4

File tree

1 file changed

+2
-50
lines changed

1 file changed

+2
-50
lines changed

crates/batcher/src/lib.rs

Lines changed: 2 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -89,15 +89,7 @@ pub struct Batcher {
8989
/// We should consider splitting the user state and the queue into separate mutexes
9090
/// to improve concurrency.
9191
batch_state: Mutex<BatchState>,
92-
/// A mutex that signals an ongoing batch building process.
93-
/// It remains locked until the batch has been fully built and is ready to be submitted.
94-
///
95-
/// When a new proof message arrives, before processing it
96-
/// we check that this mutex isn't locked and if it is we wait until unlocked
97-
///
98-
/// This check covers the case where a new user submits a message while a batch is in construction
99-
/// Used to synchronize the processing of proofs during batch construction.
100-
building_batch_mutex: Mutex<()>,
92+
10193
/// A map of per-user mutexes used to synchronize proof processing.
10294
/// It allows us to mutate the users state atomically,
10395
/// while avoiding the need to lock the entire [`batch_state`] structure.
@@ -295,7 +287,6 @@ impl Batcher {
295287
aggregator_gas_cost: config.batcher.aggregator_gas_cost,
296288
batch_state: Mutex::new(batch_state),
297289
user_proof_processing_mutexes: Mutex::new(HashMap::new()),
298-
building_batch_mutex: Mutex::new(()),
299290
disabled_verifiers: Mutex::new(disabled_verifiers),
300291
metrics,
301292
telemetry,
@@ -632,11 +623,6 @@ impl Batcher {
632623
debug!("Received message with nonce: {msg_nonce:?}");
633624
self.metrics.received_proofs.inc();
634625

635-
// Make sure there are no batches being built before processing the message
636-
debug!("Checking if there is an ongoing batch before processing the message...");
637-
let _ = self.building_batch_mutex.lock().await;
638-
debug!("Batch building mutex acquired. Proceeding with message processing.");
639-
640626
// * ---------------------------------------------------*
641627
// * Perform validations over the message *
642628
// * ---------------------------------------------------*
@@ -754,28 +740,7 @@ impl Batcher {
754740
.or_insert_with(|| Arc::new(Mutex::new(())))
755741
.clone()
756742
};
757-
758-
// This looks very ugly but basically, we are doing the following:
759-
// 1. We try to acquire the `user_mutex`: this can take some time if there is another task with it
760-
// 2. While that time that passes, the batcher might have tried to build a new batch, so we check the `building_batch_mutex`
761-
// 3. If it is taken, then release the lock so the batcher can continue building (as it is waiting for all user mutex to finish)
762-
// 4. If it isn't building then continue with the message
763-
//
764-
// This is done to give the batcher builder process priority
765-
// and prevent a situation where the batcher wants to build a new batch
766-
// but it has to wait for a ton of messages to be processed first
767-
// Leading to a decrease in batch throughput
768-
let _user_mutex = loop {
769-
let _user_mutex = user_mutex.lock().await;
770-
let res = self.building_batch_mutex.try_lock();
771-
if res.is_ok() {
772-
break _user_mutex;
773-
} else {
774-
drop(_user_mutex);
775-
// tell the runtime to we are done for now and continue with another task
776-
tokio::task::yield_now().await;
777-
}
778-
};
743+
let _user_mutex = user_mutex.lock().await;
779744
debug!("User mutex for {:?} acquired...", addr_in_msg);
780745

781746
let msg_max_fee = nonced_verification_data.max_fee;
@@ -1316,19 +1281,6 @@ impl Batcher {
13161281
return None;
13171282
}
13181283

1319-
info!("Batch building: started, acquiring lock to stop processing new messages...");
1320-
let _building_batch_mutex = self.building_batch_mutex.lock().await;
1321-
1322-
info!("Batch building: waiting until all the user messages and proofs get processed");
1323-
let mutexes: Vec<Arc<Mutex<()>>> = {
1324-
let user_proofs_lock = self.user_proof_processing_mutexes.lock().await;
1325-
user_proofs_lock.values().cloned().collect()
1326-
};
1327-
for user_mutex in mutexes {
1328-
let _ = user_mutex.lock().await;
1329-
}
1330-
info!("Batch building: all user locks acquired, proceeding to build batch");
1331-
13321284
let batch_queue_copy = {
13331285
let batch_state_lock = self.batch_state.lock().await;
13341286
batch_state_lock.batch_queue.clone()

0 commit comments

Comments
 (0)