@@ -210,7 +210,7 @@ impl Batcher {
210210 . expect ( "Failed to get fallback Service Manager contract" ) ;
211211
212212 let mut user_states = HashMap :: new ( ) ;
213- let mut batch_state = BatchState :: new ( ) ;
213+ let mut batch_state = BatchState :: new ( config . batcher . max_queue_size ) ;
214214 let non_paying_config = if let Some ( non_paying_config) = config. batcher . non_paying {
215215 warn ! ( "Non-paying address configuration detected. Will replace non-paying address {} with configured address." ,
216216 non_paying_config. address) ;
@@ -228,7 +228,8 @@ impl Batcher {
228228 non_paying_user_state,
229229 ) ;
230230
231- batch_state = BatchState :: new_with_user_states ( user_states) ;
231+ batch_state =
232+ BatchState :: new_with_user_states ( user_states, config. batcher . max_queue_size ) ;
232233 Some ( non_paying_config)
233234 } else {
234235 None
@@ -702,7 +703,7 @@ impl Batcher {
702703 // This is needed because we need to query the user state to make validations and
703704 // finally add the proof to the batch queue.
704705
705- let batch_state_lock = self . batch_state . lock ( ) . await ;
706+ let mut batch_state_lock = self . batch_state . lock ( ) . await ;
706707
707708 let msg_max_fee = nonced_verification_data. max_fee ;
708709 let Some ( user_last_max_fee_limit) =
@@ -782,6 +783,8 @@ impl Batcher {
782783 return Ok ( ( ) ) ;
783784 }
784785
786+ // We check this after replacement logic because if user wants to replace a proof, their
787+ // new_max_fee must be greater or equal than old_max_fee
785788 if msg_max_fee > user_last_max_fee_limit {
786789 std:: mem:: drop ( batch_state_lock) ;
787790 warn ! ( "Invalid max fee for address {addr}, had fee limit of {user_last_max_fee_limit:?}, sent {msg_max_fee:?}" ) ;
@@ -794,6 +797,67 @@ impl Batcher {
794797 return Ok ( ( ) ) ;
795798 }
796799
800+ // * ---------------------------------------------------------------------*
801+ // * Perform validation over batcher queue *
802+ // * ---------------------------------------------------------------------*
803+
804+ if batch_state_lock. is_queue_full ( ) {
805+ debug ! ( "Batch queue is full. Evaluating if the incoming proof can replace a lower-priority entry." ) ;
806+
807+ // This cannot panic, if the batch queue is full it has at least one item
808+ let ( lowest_priority_entry, _) = batch_state_lock
809+ . batch_queue
810+ . peek ( )
811+ . expect ( "Batch queue was expected to be full, but somehow no item was inside" ) ;
812+
813+ let lowest_fee_in_queue = lowest_priority_entry. nonced_verification_data . max_fee ;
814+
815+ let new_proof_fee = nonced_verification_data. max_fee ;
816+
817+ // We will keep the proof with the highest fee
818+ // Note: we previously checked that if it's a new proof from the same user the fee is the same or lower
819+ // So this will never eject a proof of the same user with a lower nonce
820+ // which is the expected behaviour
821+ if new_proof_fee > lowest_fee_in_queue {
822+ // This cannot panic, if the batch queue is full it has at least one item
823+ let ( removed_entry, _) = batch_state_lock
824+ . batch_queue
825+ . pop ( )
826+ . expect ( "Batch queue was expected to be full, but somehow no item was inside" ) ;
827+
828+ info ! (
829+ "Incoming proof (nonce: {}, fee: {}) has higher fee. Replacing lowest fee proof from sender {} with nonce {}." ,
830+ nonced_verification_data. nonce,
831+ nonced_verification_data. max_fee,
832+ removed_entry. sender,
833+ removed_entry. nonced_verification_data. nonce
834+ ) ;
835+
836+ batch_state_lock. update_user_state_on_entry_removal ( & removed_entry) ;
837+
838+ if let Some ( removed_entry_ws) = removed_entry. messaging_sink {
839+ send_message (
840+ removed_entry_ws,
841+ SubmitProofResponseMessage :: UnderpricedProof ,
842+ )
843+ . await ;
844+ } ;
845+ } else {
846+ info ! (
847+ "Incoming proof (nonce: {}, fee: {}) has lower priority than all entries in the full queue. Rejecting submission." ,
848+ nonced_verification_data. nonce,
849+ nonced_verification_data. max_fee
850+ ) ;
851+ std:: mem:: drop ( batch_state_lock) ;
852+ send_message (
853+ ws_conn_sink. clone ( ) ,
854+ SubmitProofResponseMessage :: UnderpricedProof ,
855+ )
856+ . await ;
857+ return Ok ( ( ) ) ;
858+ }
859+ }
860+
797861 // * ---------------------------------------------------------------------*
798862 // * Add message data into the queue and update user state *
799863 // * ---------------------------------------------------------------------*
@@ -1727,6 +1791,16 @@ impl Batcher {
17271791
17281792 let batch_state_lock = self . batch_state . lock ( ) . await ;
17291793
1794+ if batch_state_lock. is_queue_full ( ) {
1795+ error ! ( "Can't add new entry, the batcher queue is full" ) ;
1796+ send_message (
1797+ ws_sink. clone ( ) ,
1798+ SubmitProofResponseMessage :: UnderpricedProof ,
1799+ )
1800+ . await ;
1801+ return Ok ( ( ) ) ;
1802+ }
1803+
17301804 let nonced_verification_data = NoncedVerificationData :: new (
17311805 client_msg. verification_data . verification_data . clone ( ) ,
17321806 client_msg. verification_data . nonce ,
0 commit comments