@@ -73,6 +73,7 @@ mod zk_utils;
7373pub const LISTEN_NEW_BLOCKS_MAX_TIMES : usize = usize:: MAX ;
7474
7575pub struct Batcher {
76+ // Configuration parameters
7677 s3_client : S3Client ,
7778 s3_bucket_name : String ,
7879 download_endpoint : String ,
@@ -85,20 +86,39 @@ pub struct Batcher {
8586 payment_service_fallback : BatcherPaymentService ,
8687 service_manager : ServiceManager ,
8788 service_manager_fallback : ServiceManager ,
88- batch_state : Mutex < BatchState > ,
89- user_states : DashMap < Address , Arc < Mutex < UserState > > > ,
9089 min_block_interval : u64 ,
9190 transaction_wait_timeout : u64 ,
9291 max_proof_size : usize ,
9392 max_batch_byte_size : usize ,
9493 max_batch_proof_qty : usize ,
95- last_uploaded_batch_block : Mutex < u64 > ,
9694 pre_verification_is_enabled : bool ,
9795 non_paying_config : Option < NonPayingConfig > ,
98- posting_batch : Mutex < bool > ,
99- disabled_verifiers : Mutex < U256 > ,
10096 aggregator_fee_percentage_multiplier : u128 ,
10197 aggregator_gas_cost : u128 ,
98+
99+ // Shared state (Mutex)
100+ /// The general business rule is:
101+ /// - User processing can be done in parallel unless a batch creation is happening
102+ /// - Batch creation needs to be able to change all the states, so all processing
103+ /// needs to be stopped, and all user_states locks need to be taken
104+ batch_state : Mutex < BatchState > ,
105+ user_states : DashMap < Address , Arc < Mutex < UserState > > > ,
106+ /// When posting a task, this is taken as a write to stop new threads to update
107+ /// user_states, ideally we would want a bigger mutex on the whole user_states, but this can't be done
108+ batch_processing_lock : RwLock < ( ) > ,
109+
110+ last_uploaded_batch_block : Mutex < u64 > ,
111+
112+ /// This is used to avoid multiple batches being submitted at the same time
113+ /// It could be removed in the future by changing how we spawn
114+ /// the batch creation task
115+ posting_batch : Mutex < bool > ,
116+
117+
118+ disabled_verifiers : Mutex < U256 > ,
119+
120+
121+ // Observability and monitoring
102122 pub metrics : metrics:: BatcherMetrics ,
103123 pub telemetry : TelemetrySender ,
104124}
@@ -274,6 +294,7 @@ impl Batcher {
274294 . aggregator_fee_percentage_multiplier ,
275295 aggregator_gas_cost : config. batcher . aggregator_gas_cost ,
276296 posting_batch : Mutex :: new ( false ) ,
297+ batch_processing_lock : RwLock :: new ( ( ) ) ,
277298 batch_state : Mutex :: new ( batch_state) ,
278299 user_states,
279300 disabled_verifiers : Mutex :: new ( disabled_verifiers) ,
@@ -667,6 +688,9 @@ impl Batcher {
667688 client_msg : Box < SubmitProofMessage > ,
668689 ws_conn_sink : WsMessageSink ,
669690 ) -> Result < ( ) , Error > {
691+ // Acquire read lock to allow concurrent user processing but block during batch creation
692+ let _batch_processing_guard = self . batch_processing_lock . read ( ) . await ;
693+
670694 let msg_nonce = client_msg. verification_data . nonce ;
671695 debug ! ( "Received message with nonce: {msg_nonce:?}" ) ;
672696 self . metrics . received_proofs . inc ( ) ;
@@ -1365,6 +1389,9 @@ impl Batcher {
13651389 finalized_batch : Vec < BatchQueueEntry > ,
13661390 gas_price : U256 ,
13671391 ) -> Result < ( ) , BatcherError > {
1392+ // Acquire write lock to ensure exclusive access during batch creation (blocks all user processing)
1393+ let _batch_processing_guard = self . batch_processing_lock . write ( ) . await ;
1394+
13681395 let nonced_batch_verifcation_data: Vec < NoncedVerificationData > = finalized_batch
13691396 . clone ( )
13701397 . into_iter ( )
@@ -1528,6 +1555,8 @@ impl Batcher {
15281555 / U256 :: from ( PERCENTAGE_DIVIDER ) ;
15291556
15301557 if let Some ( finalized_batch) = self . is_batch_ready ( block_number, modified_gas_price) . await {
1558+ // TODO (Mauro): There is a race condition here,
1559+
15311560 let batch_finalization_result = self
15321561 . finalize_batch ( block_number, finalized_batch, modified_gas_price)
15331562 . await ;
0 commit comments