Skip to content

Commit 8787958

Browse files
committed
fix: move batch queue full condition to add_to_batch
1 parent 96def6c commit 8787958

File tree

1 file changed

+59
-70
lines changed

1 file changed

+59
-70
lines changed

crates/batcher/src/lib.rs

Lines changed: 59 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -852,69 +852,6 @@ impl Batcher {
852852
return Ok(());
853853
};
854854

855-
// * ---------------------------------------------------------------------*
856-
// * Perform validation over batcher queue *
857-
// * ---------------------------------------------------------------------*
858-
859-
if self.batch_state.lock().await.is_queue_full() {
860-
let mut batch_state_lock = self.batch_state.lock().await;
861-
debug!("Batch queue is full. Evaluating if the incoming proof can replace a lower-priority entry.");
862-
863-
// This cannot panic, if the batch queue is full it has at least one item
864-
let (lowest_priority_entry, _) = batch_state_lock
865-
.batch_queue
866-
.peek()
867-
.expect("Batch queue was expected to be full, but somehow no item was inside");
868-
869-
let lowest_fee_in_queue = lowest_priority_entry.nonced_verification_data.max_fee;
870-
871-
let new_proof_fee = nonced_verification_data.max_fee;
872-
873-
// We will keep the proof with the highest fee
874-
// Note: we previously checked that if it's a new proof from the same user the fee is the same or lower
875-
// So this will never eject a proof of the same user with a lower nonce
876-
// which is the expected behaviour
877-
if new_proof_fee > lowest_fee_in_queue {
878-
// This cannot panic, if the batch queue is full it has at least one item
879-
let (removed_entry, _) = batch_state_lock
880-
.batch_queue
881-
.pop()
882-
.expect("Batch queue was expected to be full, but somehow no item was inside");
883-
884-
info!(
885-
"Incoming proof (nonce: {}, fee: {}) has higher fee. Replacing lowest fee proof from sender {} with nonce {}.",
886-
nonced_verification_data.nonce,
887-
nonced_verification_data.max_fee,
888-
removed_entry.sender,
889-
removed_entry.nonced_verification_data.nonce
890-
);
891-
892-
batch_state_lock.update_user_state_on_entry_removal(&removed_entry);
893-
894-
if let Some(removed_entry_ws) = removed_entry.messaging_sink {
895-
std::mem::drop(batch_state_lock);
896-
send_message(
897-
removed_entry_ws,
898-
SubmitProofResponseMessage::UnderpricedProof,
899-
)
900-
.await;
901-
};
902-
} else {
903-
info!(
904-
"Incoming proof (nonce: {}, fee: {}) has lower priority than all entries in the full queue. Rejecting submission.",
905-
nonced_verification_data.nonce,
906-
nonced_verification_data.max_fee
907-
);
908-
std::mem::drop(batch_state_lock);
909-
send_message(
910-
ws_conn_sink.clone(),
911-
SubmitProofResponseMessage::UnderpricedProof,
912-
)
913-
.await;
914-
return Ok(());
915-
}
916-
}
917-
918855
// * ---------------------------------------------------------------------*
919856
// * Add message data into the queue and update user state *
920857
// * ---------------------------------------------------------------------*
@@ -1168,13 +1105,68 @@ impl Batcher {
11681105
proof_submitter_sig: Signature,
11691106
proof_submitter_addr: Address,
11701107
) -> Result<(), BatcherError> {
1108+
let mut batch_state_lock = self.batch_state.lock().await;
1109+
let max_fee = verification_data.max_fee;
1110+
let nonce = verification_data.nonce;
1111+
1112+
if batch_state_lock.is_queue_full() {
1113+
debug!("Batch queue is full. Evaluating if the incoming proof can replace a lower-priority entry.");
1114+
1115+
// This cannot panic, if the batch queue is full it has at least one item
1116+
let (lowest_priority_entry, _) = batch_state_lock
1117+
.batch_queue
1118+
.peek()
1119+
.expect("Batch queue was expected to be full, but somehow no item was inside");
1120+
1121+
let lowest_fee_in_queue = lowest_priority_entry.nonced_verification_data.max_fee;
1122+
1123+
// We will keep the proof with the highest fee
1124+
// Note: we previously checked that if it's a new proof from the same user the fee is the same or lower
1125+
// So this will never eject a proof of the same user with a lower nonce
1126+
// which is the expected behaviour
1127+
if max_fee > lowest_fee_in_queue {
1128+
// This cannot panic, if the batch queue is full it has at least one item
1129+
let (removed_entry, _) = batch_state_lock
1130+
.batch_queue
1131+
.pop()
1132+
.expect("Batch queue was expected to be full, but somehow no item was inside");
1133+
1134+
info!(
1135+
"Incoming proof (nonce: {}, fee: {}) has higher fee. Replacing lowest fee proof from sender {} with nonce {}.",
1136+
nonce,
1137+
max_fee,
1138+
removed_entry.sender,
1139+
removed_entry.nonced_verification_data.nonce
1140+
);
1141+
1142+
batch_state_lock.update_user_state_on_entry_removal(&removed_entry);
1143+
1144+
if let Some(removed_entry_ws) = removed_entry.messaging_sink {
1145+
// we spawn a task here so that we don't have to await the message with the batch state lock held
1146+
tokio::spawn(send_message(
1147+
removed_entry_ws,
1148+
SubmitProofResponseMessage::UnderpricedProof,
1149+
));
1150+
}
1151+
} else {
1152+
info!(
1153+
"Incoming proof (nonce: {}, fee: {}) has lower priority than all entries in the full queue. Rejecting submission.",
1154+
nonce,
1155+
max_fee
1156+
);
1157+
std::mem::drop(batch_state_lock);
1158+
send_message(
1159+
ws_conn_sink.clone(),
1160+
SubmitProofResponseMessage::UnderpricedProof,
1161+
)
1162+
.await;
1163+
return Ok(());
1164+
}
1165+
}
1166+
11711167
info!("Calculating verification data commitments...");
11721168
let verification_data_comm = verification_data.clone().into();
11731169
info!("Adding verification data to batch...");
1174-
1175-
let max_fee = verification_data.max_fee;
1176-
let nonce = verification_data.nonce;
1177-
let mut batch_state_lock = self.batch_state.lock().await;
11781170
batch_state_lock.batch_queue.push(
11791171
BatchQueueEntry::new(
11801172
verification_data,
@@ -1199,7 +1191,6 @@ impl Batcher {
11991191
.await
12001192
else {
12011193
error!("User state of address {proof_submitter_addr} was not found when trying to update user state. This user state should have been present");
1202-
std::mem::drop(batch_state_lock);
12031194
return Err(BatcherError::AddressNotFoundInUserStates(
12041195
proof_submitter_addr,
12051196
));
@@ -1210,7 +1201,6 @@ impl Batcher {
12101201
.await
12111202
else {
12121203
error!("User state of address {proof_submitter_addr} was not found when trying to update user state. This user state should have been present");
1213-
std::mem::drop(batch_state_lock);
12141204
return Err(BatcherError::AddressNotFoundInUserStates(
12151205
proof_submitter_addr,
12161206
));
@@ -1228,7 +1218,6 @@ impl Batcher {
12281218
.is_none()
12291219
{
12301220
error!("User state of address {proof_submitter_addr} was not found when trying to update user state. This user state should have been present");
1231-
std::mem::drop(batch_state_lock);
12321221
return Err(BatcherError::AddressNotFoundInUserStates(
12331222
proof_submitter_addr,
12341223
));

0 commit comments

Comments
 (0)