Skip to content

Commit 87173c1

Browse files
committed
Merge branch 'staging' into fix/tree-openzeppelin-compliant
2 parents bb1e223 + f8db023 commit 87173c1

File tree

9 files changed

+137
-8
lines changed

9 files changed

+137
-8
lines changed

batcher/aligned-batcher/src/config/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ pub struct BatcherConfigFromYaml {
4646
pub max_proof_size: usize,
4747
pub max_batch_byte_size: usize,
4848
pub max_batch_proof_qty: usize,
49+
pub max_queue_size: usize,
4950
pub pre_verification_is_enabled: bool,
5051
pub metrics_port: u16,
5152
pub telemetry_ip_port_address: String,

batcher/aligned-batcher/src/lib.rs

Lines changed: 77 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ impl Batcher {
210210
.expect("Failed to get fallback Service Manager contract");
211211

212212
let mut user_states = HashMap::new();
213-
let mut batch_state = BatchState::new();
213+
let mut batch_state = BatchState::new(config.batcher.max_queue_size);
214214
let non_paying_config = if let Some(non_paying_config) = config.batcher.non_paying {
215215
warn!("Non-paying address configuration detected. Will replace non-paying address {} with configured address.",
216216
non_paying_config.address);
@@ -228,7 +228,8 @@ impl Batcher {
228228
non_paying_user_state,
229229
);
230230

231-
batch_state = BatchState::new_with_user_states(user_states);
231+
batch_state =
232+
BatchState::new_with_user_states(user_states, config.batcher.max_queue_size);
232233
Some(non_paying_config)
233234
} else {
234235
None
@@ -702,7 +703,7 @@ impl Batcher {
702703
// This is needed because we need to query the user state to make validations and
703704
// finally add the proof to the batch queue.
704705

705-
let batch_state_lock = self.batch_state.lock().await;
706+
let mut batch_state_lock = self.batch_state.lock().await;
706707

707708
let msg_max_fee = nonced_verification_data.max_fee;
708709
let Some(user_last_max_fee_limit) =
@@ -782,6 +783,8 @@ impl Batcher {
782783
return Ok(());
783784
}
784785

786+
// We check this after replacement logic because if user wants to replace a proof, their
787+
// new_max_fee must be greater or equal than old_max_fee
785788
if msg_max_fee > user_last_max_fee_limit {
786789
std::mem::drop(batch_state_lock);
787790
warn!("Invalid max fee for address {addr}, had fee limit of {user_last_max_fee_limit:?}, sent {msg_max_fee:?}");
@@ -794,6 +797,67 @@ impl Batcher {
794797
return Ok(());
795798
}
796799

800+
// * ---------------------------------------------------------------------*
801+
// * Perform validation over batcher queue *
802+
// * ---------------------------------------------------------------------*
803+
804+
if batch_state_lock.is_queue_full() {
805+
debug!("Batch queue is full. Evaluating if the incoming proof can replace a lower-priority entry.");
806+
807+
// This cannot panic, if the batch queue is full it has at least one item
808+
let (lowest_priority_entry, _) = batch_state_lock
809+
.batch_queue
810+
.peek()
811+
.expect("Batch queue was expected to be full, but somehow no item was inside");
812+
813+
let lowest_fee_in_queue = lowest_priority_entry.nonced_verification_data.max_fee;
814+
815+
let new_proof_fee = nonced_verification_data.max_fee;
816+
817+
// We will keep the proof with the highest fee
818+
// Note: we previously checked that if it's a new proof from the same user the fee is the same or lower
819+
// So this will never eject a proof of the same user with a lower nonce
820+
// which is the expected behaviour
821+
if new_proof_fee > lowest_fee_in_queue {
822+
// This cannot panic, if the batch queue is full it has at least one item
823+
let (removed_entry, _) = batch_state_lock
824+
.batch_queue
825+
.pop()
826+
.expect("Batch queue was expected to be full, but somehow no item was inside");
827+
828+
info!(
829+
"Incoming proof (nonce: {}, fee: {}) has higher fee. Replacing lowest fee proof from sender {} with nonce {}.",
830+
nonced_verification_data.nonce,
831+
nonced_verification_data.max_fee,
832+
removed_entry.sender,
833+
removed_entry.nonced_verification_data.nonce
834+
);
835+
836+
batch_state_lock.update_user_state_on_entry_removal(&removed_entry);
837+
838+
if let Some(removed_entry_ws) = removed_entry.messaging_sink {
839+
send_message(
840+
removed_entry_ws,
841+
SubmitProofResponseMessage::UnderpricedProof,
842+
)
843+
.await;
844+
};
845+
} else {
846+
info!(
847+
"Incoming proof (nonce: {}, fee: {}) has lower priority than all entries in the full queue. Rejecting submission.",
848+
nonced_verification_data.nonce,
849+
nonced_verification_data.max_fee
850+
);
851+
std::mem::drop(batch_state_lock);
852+
send_message(
853+
ws_conn_sink.clone(),
854+
SubmitProofResponseMessage::UnderpricedProof,
855+
)
856+
.await;
857+
return Ok(());
858+
}
859+
}
860+
797861
// * ---------------------------------------------------------------------*
798862
// * Add message data into the queue and update user state *
799863
// * ---------------------------------------------------------------------*
@@ -1727,6 +1791,16 @@ impl Batcher {
17271791

17281792
let batch_state_lock = self.batch_state.lock().await;
17291793

1794+
if batch_state_lock.is_queue_full() {
1795+
error!("Can't add new entry, the batcher queue is full");
1796+
send_message(
1797+
ws_sink.clone(),
1798+
SubmitProofResponseMessage::UnderpricedProof,
1799+
)
1800+
.await;
1801+
return Ok(());
1802+
}
1803+
17301804
let nonced_verification_data = NoncedVerificationData::new(
17311805
client_msg.verification_data.verification_data.clone(),
17321806
client_msg.verification_data.nonce,

batcher/aligned-batcher/src/types/batch_state.rs

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,22 +10,28 @@ use log::debug;
1010
pub(crate) struct BatchState {
1111
pub(crate) batch_queue: BatchQueue,
1212
pub(crate) user_states: HashMap<Address, UserState>,
13+
pub(crate) max_size: usize,
1314
}
1415

1516
impl BatchState {
1617
// CONSTRUCTORS:
1718

18-
pub(crate) fn new() -> Self {
19+
pub(crate) fn new(max_size: usize) -> Self {
1920
Self {
2021
batch_queue: BatchQueue::new(),
2122
user_states: HashMap::new(),
23+
max_size,
2224
}
2325
}
2426

25-
pub(crate) fn new_with_user_states(user_states: HashMap<Address, UserState>) -> Self {
27+
pub(crate) fn new_with_user_states(
28+
user_states: HashMap<Address, UserState>,
29+
max_size: usize,
30+
) -> Self {
2631
Self {
2732
batch_queue: BatchQueue::new(),
2833
user_states,
34+
max_size,
2935
}
3036
}
3137

@@ -214,4 +220,39 @@ impl BatchState {
214220
&& entry.nonced_verification_data.max_fee < replacement_max_fee
215221
})
216222
}
223+
224+
/// Updates or removes a user's state when their latest proof entry is removed from the batch queue.
225+
///
226+
/// If the user has no other proofs remaining in the queue, their state is removed entirely.
227+
/// Otherwise, the user's state is updated to reflect the next most recent entry in the queue.
228+
///
229+
/// Note: The given `removed_entry` must be the most recent (latest or highest nonce) entry for the user in the queue.
230+
pub(crate) fn update_user_state_on_entry_removal(&mut self, removed_entry: &BatchQueueEntry) {
231+
let addr = removed_entry.sender;
232+
233+
let new_last_max_fee_limit = match self
234+
.batch_queue
235+
.iter()
236+
.filter(|(e, _)| e.sender == addr)
237+
.next_back()
238+
{
239+
Some((last_entry, _)) => last_entry.nonced_verification_data.max_fee,
240+
None => {
241+
self.user_states.remove(&addr);
242+
return;
243+
}
244+
};
245+
246+
if let Entry::Occupied(mut user_state) = self.user_states.entry(addr) {
247+
user_state.get_mut().proofs_in_batch -= 1;
248+
user_state.get_mut().nonce -= U256::one();
249+
user_state.get_mut().total_fees_in_queue -=
250+
removed_entry.nonced_verification_data.max_fee;
251+
user_state.get_mut().last_max_fee_limit = new_last_max_fee_limit;
252+
}
253+
}
254+
255+
pub(crate) fn is_queue_full(&self) -> bool {
256+
self.batch_queue.len() >= self.max_size
257+
}
217258
}

batcher/aligned-sdk/src/communication/messaging.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,10 @@ async fn handle_batcher_response(msg: Message) -> Result<BatchInclusionData, Sub
265265
);
266266
Err(SubmitError::GenericError(e))
267267
}
268+
Ok(SubmitProofResponseMessage::UnderpricedProof) => {
269+
error!("Batcher responded with error: queue limit has been exceeded. Funds have not been spent.");
270+
Err(SubmitError::BatchQueueLimitExceededError)
271+
}
268272
Err(e) => {
269273
error!(
270274
"Error while deserializing batch inclusion data: {}. Funds have not been spent.",

batcher/aligned-sdk/src/core/errors.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ pub enum SubmitError {
9696
AddToBatchError,
9797
InvalidProofInclusionData,
9898
GetNonceError(String),
99+
BatchQueueLimitExceededError,
99100
GenericError(String),
100101
}
101102

@@ -210,6 +211,10 @@ impl fmt::Display for SubmitError {
210211
SubmitError::InvalidProofInclusionData => {
211212
write!(f, "Batcher responded with invalid batch inclusion data. Can't verify your proof was correctly included in the batch.")
212213
}
214+
SubmitError::BatchQueueLimitExceededError => {
215+
write!(f, "Error while adding entry to batch, queue limit exeeded.")
216+
}
217+
213218
SubmitError::GetNonceError(e) => write!(f, "Error while getting nonce {}", e),
214219
}
215220
}

batcher/aligned-sdk/src/core/types.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,7 @@ pub enum SubmitProofResponseMessage {
399399
AddToBatchError,
400400
EthRpcError,
401401
InvalidPaymentServiceAddress(Address, Address),
402+
UnderpricedProof,
402403
}
403404

404405
#[derive(Debug, Clone, Serialize, Deserialize)]

config-files/config-batcher-docker.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@ batcher:
2121
block_interval: 3
2222
batch_size_interval: 10
2323
transaction_wait_timeout: 96000 # 8 blocks
24-
max_proof_size: 67108864 # 64 MiB
24+
max_proof_size: 4194304 # 4 MiB
2525
max_batch_byte_size: 268435456 # 256 MiB
2626
max_batch_proof_qty: 3000 # 3000 proofs in a batch
27+
max_queue_size: 10000
2728
pre_verification_is_enabled: true
2829
metrics_port: 9093
2930
telemetry_ip_port_address: localhost:4001

config-files/config-batcher-ethereum-package.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@ batcher:
1919
block_interval: 3
2020
batch_size_interval: 10
2121
transaction_wait_timeout: 36000 # 3 blocks
22-
max_proof_size: 67108864 # 64 MiB
22+
max_proof_size: 4194304 # 4 MiB
2323
max_batch_byte_size: 268435456 # 256 MiB
2424
max_batch_proof_qty: 3000 # 3000 proofs in a batch
25+
max_queue_size: 10000
2526
pre_verification_is_enabled: true
2627
metrics_port: 9093
2728
telemetry_ip_port_address: localhost:4001

config-files/config-batcher.yaml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@ batcher:
2121
block_interval: 3
2222
batch_size_interval: 10
2323
transaction_wait_timeout: 96000 # 8 blocks
24-
max_proof_size: 67108864 # 64 MiB
24+
max_proof_size: 4194304 # 4 MiB
2525
max_batch_byte_size: 268435456 # 256 MiB
2626
max_batch_proof_qty: 3000 # 3000 proofs in a batch
27+
max_queue_size: 10000
2728
pre_verification_is_enabled: true
2829
metrics_port: 9093
2930
telemetry_ip_port_address: localhost:4001

0 commit comments

Comments
 (0)