@@ -24,8 +24,8 @@ use aligned_sdk::core::constants::{
2424 BUMP_MAX_RETRIES , BUMP_MAX_RETRY_DELAY , BUMP_MIN_RETRY_DELAY , CONSTANT_GAS_COST ,
2525 DEFAULT_AGGREGATOR_FEE_PERCENTAGE_MULTIPLIER , DEFAULT_MAX_FEE_PER_PROOF ,
2626 ETHEREUM_CALL_BACKOFF_FACTOR , ETHEREUM_CALL_MAX_RETRIES , ETHEREUM_CALL_MAX_RETRY_DELAY ,
27- ETHEREUM_CALL_MIN_RETRY_DELAY , GAS_PRICE_PERCENTAGE_MULTIPLIER , MIN_FEE_PER_PROOF ,
28- PERCENTAGE_DIVIDER , RESPOND_TO_TASK_FEE_LIMIT_PERCENTAGE_MULTIPLIER ,
27+ ETHEREUM_CALL_MIN_RETRY_DELAY , GAS_PRICE_PERCENTAGE_MULTIPLIER , PERCENTAGE_DIVIDER ,
28+ RESPOND_TO_TASK_FEE_LIMIT_PERCENTAGE_MULTIPLIER ,
2929} ;
3030use aligned_sdk:: core:: types:: {
3131 ClientMessage , GetNonceResponseMessage , NoncedVerificationData , ProofInvalidReason ,
@@ -521,7 +521,7 @@ impl Batcher {
521521 )
522522 . await ;
523523 self . metrics
524- . user_error ( & [ "invalid_paument_service_address " , "" ] ) ;
524+ . user_error ( & [ "invalid_payment_service_address " , "" ] ) ;
525525 return Ok ( ( ) ) ;
526526 }
527527
@@ -632,10 +632,10 @@ impl Batcher {
632632 ) ;
633633 send_message (
634634 ws_conn_sink. clone ( ) ,
635- SubmitProofResponseMessage :: InvalidNonce ,
635+ SubmitProofResponseMessage :: EthRpcError ,
636636 )
637637 . await ;
638- self . metrics . user_error ( & [ "invalid_nonce " , "" ] ) ;
638+ self . metrics . user_error ( & [ "eth_rpc_error " , "" ] ) ;
639639 return Ok ( ( ) ) ;
640640 }
641641 } ;
@@ -667,19 +667,34 @@ impl Batcher {
667667 // finally add the proof to the batch queue.
668668
669669 let batch_state_lock = self . batch_state . lock ( ) . await ;
670- let Some ( proofs_in_batch) = batch_state_lock. get_user_proof_count ( & addr) . await else {
671- error ! ( "Failed to get user proof count: User not found in user states, but it should have been already inserted" ) ;
670+
671+ let msg_max_fee = nonced_verification_data. max_fee ;
672+ let Some ( user_last_max_fee_limit) =
673+ batch_state_lock. get_user_last_max_fee_limit ( & addr) . await
674+ else {
672675 std:: mem:: drop ( batch_state_lock) ;
673676 send_message (
674677 ws_conn_sink. clone ( ) ,
675- SubmitProofResponseMessage :: InvalidNonce ,
678+ SubmitProofResponseMessage :: AddToBatchError ,
676679 )
677680 . await ;
678- self . metrics . user_error ( & [ "invalid_nonce" , "" ] ) ;
681+ self . metrics . user_error ( & [ "batcher_state_error" , "" ] ) ;
682+ return Ok ( ( ) ) ;
683+ } ;
684+
685+ let Some ( user_accumulated_fee) = batch_state_lock. get_user_total_fees_in_queue ( & addr) . await
686+ else {
687+ std:: mem:: drop ( batch_state_lock) ;
688+ send_message (
689+ ws_conn_sink. clone ( ) ,
690+ SubmitProofResponseMessage :: AddToBatchError ,
691+ )
692+ . await ;
693+ self . metrics . user_error ( & [ "batcher_state_error" , "" ] ) ;
679694 return Ok ( ( ) ) ;
680695 } ;
681696
682- if !self . check_min_balance ( proofs_in_batch + 1 , user_balance ) {
697+ if !self . verify_user_has_enough_balance ( user_balance , user_accumulated_fee , msg_max_fee ) {
683698 std:: mem:: drop ( batch_state_lock) ;
684699 send_message (
685700 ws_conn_sink. clone ( ) ,
@@ -696,10 +711,10 @@ impl Batcher {
696711 std:: mem:: drop ( batch_state_lock) ;
697712 send_message (
698713 ws_conn_sink. clone ( ) ,
699- SubmitProofResponseMessage :: InvalidNonce ,
714+ SubmitProofResponseMessage :: AddToBatchError ,
700715 )
701716 . await ;
702- self . metrics . user_error ( & [ "invalid_nonce " , "" ] ) ;
717+ self . metrics . user_error ( & [ "batcher_state_error " , "" ] ) ;
703718 return Ok ( ( ) ) ;
704719 } ;
705720
@@ -731,21 +746,9 @@ impl Batcher {
731746 return Ok ( ( ) ) ;
732747 }
733748
734- let msg_max_fee = nonced_verification_data. max_fee ;
735- let Some ( user_min_fee) = batch_state_lock. get_user_min_fee ( & addr) . await else {
736- std:: mem:: drop ( batch_state_lock) ;
737- send_message (
738- ws_conn_sink. clone ( ) ,
739- SubmitProofResponseMessage :: InvalidNonce ,
740- )
741- . await ;
742- self . metrics . user_error ( & [ "invalid_nonce" , "" ] ) ;
743- return Ok ( ( ) ) ;
744- } ;
745-
746- if msg_max_fee > user_min_fee {
749+ if msg_max_fee > user_last_max_fee_limit {
747750 std:: mem:: drop ( batch_state_lock) ;
748- warn ! ( "Invalid max fee for address {addr}, had fee {user_min_fee :?} < {msg_max_fee:?}" ) ;
751+ warn ! ( "Invalid max fee for address {addr}, had fee limit of {user_last_max_fee_limit :?}, sent {msg_max_fee:?}" ) ;
749752 send_message (
750753 ws_conn_sink. clone ( ) ,
751754 SubmitProofResponseMessage :: InvalidMaxFee ,
@@ -784,10 +787,15 @@ impl Batcher {
784787 zk_utils:: is_verifier_disabled ( * disabled_verifiers, verifier)
785788 }
786789
787- // Checks user has sufficient balance for paying all its the proofs in the current batch.
788- fn check_min_balance ( & self , user_proofs_in_batch : usize , user_balance : U256 ) -> bool {
789- let min_balance = U256 :: from ( user_proofs_in_batch) * U256 :: from ( MIN_FEE_PER_PROOF ) ;
790- user_balance >= min_balance
790+ // Verifies user has enough balance for paying all his proofs in the current batch.
791+ fn verify_user_has_enough_balance (
792+ & self ,
793+ user_balance : U256 ,
794+ user_accumulated_fee : U256 ,
795+ new_msg_max_fee : U256 ,
796+ ) -> bool {
797+ let required_balance: U256 = user_accumulated_fee + new_msg_max_fee;
798+ user_balance >= required_balance
791799 }
792800
793801 /// Handles a replacement message
@@ -822,7 +830,7 @@ impl Batcher {
822830 let original_max_fee = entry. nonced_verification_data . max_fee ;
823831 if original_max_fee > replacement_max_fee {
824832 std:: mem:: drop ( batch_state_lock) ;
825- warn ! ( "Invalid replacement message for address {addr}, had fee {original_max_fee:?} < {replacement_max_fee:?}" ) ;
833+ warn ! ( "Invalid replacement message for address {addr}, had max fee: {original_max_fee:?}, received fee: {replacement_max_fee:?}" ) ;
826834 send_message (
827835 ws_conn_sink. clone ( ) ,
828836 SubmitProofResponseMessage :: InvalidReplacementMessage ,
@@ -888,13 +896,38 @@ impl Batcher {
888896 BatchQueueEntryPriority :: new ( replacement_max_fee, nonce) ,
889897 ) ;
890898
891- let updated_min_fee_in_batch = batch_state_lock. get_user_min_fee_in_batch ( & addr) ;
899+ // update max_fee_limit
900+ let updated_max_fee_limit_in_batch = batch_state_lock. get_user_min_fee_in_batch ( & addr) ;
892901 if batch_state_lock
893- . update_user_min_fee ( & addr, updated_min_fee_in_batch )
902+ . update_user_max_fee_limit ( & addr, updated_max_fee_limit_in_batch )
894903 . is_none ( )
895904 {
896905 std:: mem:: drop ( batch_state_lock) ;
897906 warn ! ( "User state for address {addr:?} was not present in batcher user states, but it should be" ) ;
907+ send_message (
908+ ws_conn_sink. clone ( ) ,
909+ SubmitProofResponseMessage :: AddToBatchError ,
910+ )
911+ . await ;
912+ return ;
913+ } ;
914+
915+ // update total_fees_in_queue
916+ if batch_state_lock
917+ . update_user_total_fees_in_queue_of_replacement_message (
918+ & addr,
919+ original_max_fee,
920+ replacement_max_fee,
921+ )
922+ . is_none ( )
923+ {
924+ std:: mem:: drop ( batch_state_lock) ;
925+ warn ! ( "User state for address {addr:?} was not present in batcher user states, but it should be" ) ;
926+ send_message (
927+ ws_conn_sink. clone ( ) ,
928+ SubmitProofResponseMessage :: AddToBatchError ,
929+ )
930+ . await ;
898931 } ;
899932 }
900933
@@ -986,13 +1019,25 @@ impl Batcher {
9861019 ) ) ;
9871020 } ;
9881021
1022+ let Some ( current_total_fees_in_queue) = batch_state_lock
1023+ . get_user_total_fees_in_queue ( & proof_submitter_addr)
1024+ . await
1025+ else {
1026+ error ! ( "User state of address {proof_submitter_addr} was not found when trying to update user state. This user state should have been present" ) ;
1027+ std:: mem:: drop ( batch_state_lock) ;
1028+ return Err ( BatcherError :: AddressNotFoundInUserStates (
1029+ proof_submitter_addr,
1030+ ) ) ;
1031+ } ;
1032+
9891033 // User state is updated
9901034 if batch_state_lock
9911035 . update_user_state (
9921036 & proof_submitter_addr,
9931037 nonce + U256 :: one ( ) ,
9941038 max_fee,
9951039 user_proof_count + 1 ,
1040+ current_total_fees_in_queue + max_fee,
9961041 )
9971042 . is_none ( )
9981043 {
@@ -1031,7 +1076,7 @@ impl Batcher {
10311076
10321077 if current_batch_len < 2 {
10331078 info ! (
1034- "Current batch has {} proof . Waiting for more proofs..." ,
1079+ "Current batch has {} proofs . Waiting for more proofs..." ,
10351080 current_batch_len
10361081 ) ;
10371082 return None ;
@@ -1078,14 +1123,14 @@ impl Batcher {
10781123 . ok ( ) ?;
10791124
10801125 batch_state_lock. batch_queue = resulting_batch_queue;
1081- let updated_user_proof_count_and_min_fee =
1082- batch_state_lock. get_user_proofs_in_batch_and_min_fee ( ) ;
1126+ let new_user_states = // proofs, max_fee_limit, total_fees_in_queue
1127+ batch_state_lock. calculate_new_user_states_data ( ) ;
10831128
10841129 let user_addresses: Vec < Address > = batch_state_lock. user_states . keys ( ) . cloned ( ) . collect ( ) ;
1130+ let default_value = ( 0 , U256 :: MAX , U256 :: zero ( ) ) ;
10851131 for addr in user_addresses. iter ( ) {
1086- let ( proof_count, min_fee) = updated_user_proof_count_and_min_fee
1087- . get ( addr)
1088- . unwrap_or ( & ( 0 , U256 :: MAX ) ) ;
1132+ let ( proof_count, max_fee_limit, total_fees_in_queue) =
1133+ new_user_states. get ( addr) . unwrap_or ( & default_value) ;
10891134
10901135 // FIXME: The case where a the update functions return `None` can only happen when the user was not found
10911136 // in the `user_states` map should not really happen here, but doing this check so that we don't unwrap.
@@ -1094,7 +1139,8 @@ impl Batcher {
10941139
10951140 // Now we update the user states related to the batch (proof count in batch and min fee in batch)
10961141 batch_state_lock. update_user_proof_count ( addr, * proof_count) ?;
1097- batch_state_lock. update_user_min_fee ( addr, * min_fee) ?;
1142+ batch_state_lock. update_user_max_fee_limit ( addr, * max_fee_limit) ?;
1143+ batch_state_lock. update_user_total_fees_in_queue ( addr, * total_fees_in_queue) ?;
10981144 }
10991145
11001146 Some ( finalized_batch)
0 commit comments