Skip to content

Commit 44c36e6

Browse files
committed
feat: batch building mutex
1 parent f48157d commit 44c36e6

File tree

1 file changed

+11
-3
lines changed

1 file changed

+11
-3
lines changed

crates/batcher/src/lib.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ pub struct Batcher {
8686
service_manager: ServiceManager,
8787
service_manager_fallback: ServiceManager,
8888
batch_state: Mutex<BatchState>,
89+
batch_building_mutex: Mutex<()>,
8990
user_mutexes: Mutex<HashMap<Address, Arc<Mutex<()>>>>,
9091
min_block_interval: u64,
9192
transaction_wait_timeout: u64,
@@ -278,6 +279,7 @@ impl Batcher {
278279
posting_batch: Mutex::new(false),
279280
batch_state: Mutex::new(batch_state),
280281
user_mutexes: Mutex::new(HashMap::new()),
282+
batch_building_mutex: Mutex::new(()),
281283
disabled_verifiers: Mutex::new(disabled_verifiers),
282284
metrics,
283285
telemetry,
@@ -616,8 +618,9 @@ impl Batcher {
616618
debug!("Received message with nonce: {msg_nonce:?}");
617619
self.metrics.received_proofs.inc();
618620

619-
// TODO: check if the user is already being attended
620-
// TODO: check if a batch is being built
621+
// if this is locked, then it means that the a batch is being built
622+
// so we need to stop the processing
623+
self.batch_building_mutex.lock().await;
621624

622625
// * ---------------------------------------------------*
623626
// * Perform validations over the message *
@@ -908,7 +911,7 @@ impl Batcher {
908911
// * Add message data into the queue and update user state *
909912
// * ---------------------------------------------------------------------*
910913

911-
let mut batch_state_lock = self.batch_state.lock().await;
914+
let batch_state_lock = self.batch_state.lock().await;
912915
if let Err(e) = self
913916
.add_to_batch(
914917
batch_state_lock,
@@ -1228,7 +1231,12 @@ impl Batcher {
12281231
block_number: u64,
12291232
gas_price: U256,
12301233
) -> Option<Vec<BatchQueueEntry>> {
1234+
let batch_building_mutex = self.batch_building_mutex.lock().await;
12311235
let batch_state_lock = self.batch_state.lock().await;
1236+
// acquire all the user locks to make sure all the ongoing message have been processed
1237+
for user_mutex in self.user_mutexes.lock().await.values() {
1238+
let _ = user_mutex.lock().await;
1239+
}
12321240
let current_batch_len = batch_state_lock.batch_queue.len();
12331241
let last_uploaded_batch_block_lock = self.last_uploaded_batch_block.lock().await;
12341242

0 commit comments

Comments
 (0)