Skip to content

Commit 94fcec9

Browse files
uri-99Julian Venturaavilagaston9MarcosNicolau
authored
fix: flush batcher queue only when needed (#1512)
Co-authored-by: Julian Ventura <[email protected]> Co-authored-by: avilagaston9 <[email protected]> Co-authored-by: Marcos Nicolau <[email protected]> Co-authored-by: Marcos Nicolau <[email protected]> Co-authored-by: Avila Gastón <[email protected]>
1 parent 256bb3b commit 94fcec9

File tree

3 files changed

+107
-68
lines changed

3 files changed

+107
-68
lines changed

batcher/aligned-batcher/src/lib.rs

Lines changed: 66 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use tokio::net::{TcpListener, TcpStream};
4848
use tokio::sync::{Mutex, MutexGuard, RwLock};
4949
use tokio_tungstenite::tungstenite::{Error, Message};
5050
use types::batch_queue::{self, BatchQueueEntry, BatchQueueEntryPriority};
51-
use types::errors::BatcherError;
51+
use types::errors::{BatcherError, TransactionSendError};
5252

5353
use crate::config::{ConfigFromYaml, ContractDeploymentOutput};
5454
use crate::telemetry::sender::TelemetrySender;
@@ -1116,12 +1116,13 @@ impl Batcher {
11161116
/// an empty batch, even if the block interval has been reached.
11171117
/// Once the batch meets the conditions for submission, the finalized batch is then passed to the
11181118
/// `finalize_batch` function.
1119+
/// This function doesn't remove the proofs from the queue.
11191120
async fn is_batch_ready(
11201121
&self,
11211122
block_number: u64,
11221123
gas_price: U256,
11231124
) -> Option<Vec<BatchQueueEntry>> {
1124-
let mut batch_state_lock = self.batch_state.lock().await;
1125+
let batch_state_lock = self.batch_state.lock().await;
11251126
let current_batch_len = batch_state_lock.batch_queue.len();
11261127
let last_uploaded_batch_block_lock = self.last_uploaded_batch_block.lock().await;
11271128

@@ -1153,7 +1154,7 @@ impl Batcher {
11531154
// Set the batch posting flag to true
11541155
*batch_posting = true;
11551156
let batch_queue_copy = batch_state_lock.batch_queue.clone();
1156-
let (resulting_batch_queue, finalized_batch) = batch_queue::try_build_batch(
1157+
let finalized_batch = batch_queue::try_build_batch(
11571158
batch_queue_copy,
11581159
gas_price,
11591160
self.max_batch_byte_size,
@@ -1173,7 +1174,26 @@ impl Batcher {
11731174
})
11741175
.ok()?;
11751176

1176-
batch_state_lock.batch_queue = resulting_batch_queue;
1177+
Some(finalized_batch)
1178+
}
1179+
1180+
/// Takes the submitted proofs and removes them from the queue.
1181+
/// This function should be called only AFTER the submission was confirmed onchain
1182+
async fn remove_proofs_from_queue(
1183+
&self,
1184+
finalized_batch: Vec<BatchQueueEntry>,
1185+
) -> Result<(), BatcherError> {
1186+
info!("Removing proofs from queue...");
1187+
let mut batch_state_lock = self.batch_state.lock().await;
1188+
1189+
finalized_batch.iter().for_each(|entry| {
1190+
if batch_state_lock.batch_queue.remove(entry).is_none() {
1191+
// If this happens, we have a bug in our code
1192+
error!("Some proofs were not found in the queue. This should not happen.");
1193+
}
1194+
});
1195+
1196+
// now we calculate the new user_states
11771197
let new_user_states = // proofs, max_fee_limit, total_fees_in_queue
11781198
batch_state_lock.calculate_new_user_states_data();
11791199

@@ -1189,17 +1209,33 @@ impl Batcher {
11891209
// informative error.
11901210

11911211
// Now we update the user states related to the batch (proof count in batch and min fee in batch)
1192-
batch_state_lock.update_user_proof_count(addr, *proof_count)?;
1193-
batch_state_lock.update_user_max_fee_limit(addr, *max_fee_limit)?;
1194-
batch_state_lock.update_user_total_fees_in_queue(addr, *total_fees_in_queue)?;
1212+
batch_state_lock
1213+
.update_user_proof_count(addr, *proof_count)
1214+
.ok_or(BatcherError::QueueRemoveError(
1215+
"Could not update_user_proof_count".into(),
1216+
))?;
1217+
batch_state_lock
1218+
.update_user_max_fee_limit(addr, *max_fee_limit)
1219+
.ok_or(BatcherError::QueueRemoveError(
1220+
"Could not update_user_max_fee_limit".into(),
1221+
))?;
1222+
batch_state_lock
1223+
.update_user_total_fees_in_queue(addr, *total_fees_in_queue)
1224+
.ok_or(BatcherError::QueueRemoveError(
1225+
"Could not update_user_total_fees_in_queue".into(),
1226+
))?;
11951227
}
11961228

1197-
Some(finalized_batch)
1229+
Ok(())
11981230
}
11991231

1200-
/// Takes the finalized batch as input and builds the merkle tree, posts verification data batch
1201-
/// to s3, creates new task in Aligned contract and sends responses to all clients that added proofs
1202-
/// to the batch. The last uploaded batch block is updated once the task is created in Aligned.
1232+
/// Takes the finalized batch as input and:
1233+
/// builds the merkle tree
1234+
/// posts verification data batch to s3
1235+
/// creates new task in Aligned contract
1236+
/// removes the proofs from the queue, once they are succesfully submitted on-chain
1237+
/// sends responses to all clients that added proofs to the batch.
1238+
/// The last uploaded batch block is updated once the task is created in Aligned.
12031239
async fn finalize_batch(
12041240
&self,
12051241
block_number: u64,
@@ -1257,6 +1293,7 @@ impl Batcher {
12571293
warn!("Failed to initialize task trace on telemetry: {:?}", e);
12581294
}
12591295

1296+
// Here we submit the batch on-chain
12601297
if let Err(e) = self
12611298
.submit_batch(
12621299
&batch_bytes,
@@ -1275,27 +1312,30 @@ impl Batcher {
12751312
{
12761313
warn!("Failed to send task status to telemetry: {:?}", e);
12771314
}
1278-
for entry in finalized_batch.into_iter() {
1279-
if let Some(ws_sink) = entry.messaging_sink {
1280-
let merkle_root = hex::encode(batch_merkle_tree.root);
1281-
send_message(
1282-
ws_sink.clone(),
1283-
SubmitProofResponseMessage::CreateNewTaskError(
1284-
merkle_root,
1285-
format!("{:?}", e),
1286-
),
1287-
)
1288-
.await
1289-
} else {
1290-
warn!("Websocket sink was found empty. This should only happen in tests");
1315+
1316+
// decide if i want to flush the queue:
1317+
match e {
1318+
BatcherError::TransactionSendError(
1319+
TransactionSendError::SubmissionInsufficientBalance,
1320+
) => {
1321+
// TODO calling remove_proofs_from_queue here is a better solution, flushing only the failed batch
1322+
// this would also need a message sent to the clients
1323+
self.flush_queue_and_clear_nonce_cache().await;
1324+
}
1325+
_ => {
1326+
// Add more cases here if we want in the future
12911327
}
12921328
}
12931329

1294-
self.flush_queue_and_clear_nonce_cache().await;
1295-
12961330
return Err(e);
12971331
};
12981332

1333+
// Once the submit is succesfull, we remove the submitted proofs from the queue
1334+
// TODO handle error case:
1335+
if let Err(e) = self.remove_proofs_from_queue(finalized_batch.clone()).await {
1336+
error!("Unexpected error while updating queue: {:?}", e);
1337+
}
1338+
12991339
connection::send_batch_inclusion_data_responses(finalized_batch, &batch_merkle_tree).await
13001340
}
13011341

batcher/aligned-batcher/src/types/batch_queue.rs

Lines changed: 34 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -134,27 +134,25 @@ pub(crate) fn calculate_batch_size(batch_queue: &BatchQueue) -> Result<usize, Ba
134134
}
135135

136136
/// This function tries to build a batch to be submitted to Aligned.
137-
/// Given a copy of the current batch queue, , and applyies an algorithm to find the biggest batch
137+
/// Given the current batch queue applies the following algorithm to find the biggest batch
138138
/// of proofs from users that are willing to pay for it:
139139
/// 1. Traverse each batch priority queue, starting from the one with minimum max fee.
140140
/// 2. Calculate the `fee_per_proof` for the whole batch and compare with the `max_fee` of the entry.
141141
/// 3. If `fee_per_proof` is less than the `max_fee` of the current entry, submit the batch. If not, pop this entry
142-
/// from the queue and push it to `resulting_priority_queue`, then repeat step 1.
142+
/// from the queue. then repeat step 1.
143143
///
144-
/// `resulting_priority_queue` will be the batch queue composed of all entries that were not willing to pay for the batch.
145-
/// This is outputted in along with the finalized batch.
144+
/// Returns the finalized batch.
146145
pub(crate) fn try_build_batch(
147146
batch_queue: BatchQueue,
148147
gas_price: U256,
149148
max_batch_byte_size: usize,
150149
max_batch_proof_qty: usize,
151-
) -> Result<(BatchQueue, Vec<BatchQueueEntry>), BatcherError> {
152-
let mut batch_queue = batch_queue;
153-
let mut batch_size = calculate_batch_size(&batch_queue)?;
154-
let mut resulting_priority_queue = BatchQueue::new();
150+
) -> Result<Vec<BatchQueueEntry>, BatcherError> {
151+
let mut finalized_batch = batch_queue;
152+
let mut batch_size = calculate_batch_size(&finalized_batch)?;
155153

156-
while let Some((entry, _)) = batch_queue.peek() {
157-
let batch_len = batch_queue.len();
154+
while let Some((entry, _)) = finalized_batch.peek() {
155+
let batch_len = finalized_batch.len();
158156
let fee_per_proof = calculate_fee_per_proof(batch_len, gas_price);
159157

160158
if batch_size > max_batch_byte_size
@@ -173,8 +171,7 @@ pub(crate) fn try_build_batch(
173171
.len();
174172
batch_size -= verification_data_size;
175173

176-
let (not_working_entry, not_working_priority) = batch_queue.pop().unwrap();
177-
resulting_priority_queue.push(not_working_entry, not_working_priority);
174+
finalized_batch.pop();
178175

179176
continue;
180177
}
@@ -183,16 +180,13 @@ pub(crate) fn try_build_batch(
183180
break;
184181
}
185182

186-
// If `batch_queue_copy` is empty, this means that all the batch queue was traversed and we didn't find
183+
// If `finalized_batch` is empty, this means that all the batch queue was traversed and we didn't find
187184
// any user willing to pay fot the fee per proof.
188-
if batch_queue.is_empty() {
185+
if finalized_batch.is_empty() {
189186
return Err(BatcherError::BatchCostTooHigh);
190187
}
191188

192-
Ok((
193-
resulting_priority_queue,
194-
batch_queue.clone().into_sorted_vec(),
195-
))
189+
Ok(finalized_batch.clone().into_sorted_vec())
196190
}
197191

198192
fn calculate_fee_per_proof(batch_len: usize, gas_price: U256) -> U256 {
@@ -301,14 +295,20 @@ mod test {
301295
batch_queue.push(entry_3, batch_priority_3);
302296

303297
let gas_price = U256::from(1);
304-
let (resulting_batch_queue, batch) =
305-
try_build_batch(batch_queue, gas_price, 5000000, 50).unwrap();
298+
let finalized_batch = try_build_batch(batch_queue, gas_price, 5000000, 50).unwrap();
306299

307-
assert!(resulting_batch_queue.is_empty());
308-
309-
assert_eq!(batch[0].nonced_verification_data.max_fee, max_fee_3);
310-
assert_eq!(batch[1].nonced_verification_data.max_fee, max_fee_2);
311-
assert_eq!(batch[2].nonced_verification_data.max_fee, max_fee_1);
300+
assert_eq!(
301+
finalized_batch[0].nonced_verification_data.max_fee,
302+
max_fee_3
303+
);
304+
assert_eq!(
305+
finalized_batch[1].nonced_verification_data.max_fee,
306+
max_fee_2
307+
);
308+
assert_eq!(
309+
finalized_batch[2].nonced_verification_data.max_fee,
310+
max_fee_1
311+
);
312312
}
313313

314314
#[test]
@@ -404,13 +404,11 @@ mod test {
404404
batch_queue.push(entry_3, batch_priority_3);
405405

406406
let gas_price = U256::from(1);
407-
let (resulting_batch_queue, finalized_batch) =
408-
try_build_batch(batch_queue, gas_price, 5000000, 50).unwrap();
407+
let finalized_batch = try_build_batch(batch_queue.clone(), gas_price, 5000000, 50).unwrap();
409408

410-
// The resulting batch queue (entries from the old batch queue that were not willing to pay
411-
// in this batch), should be empty and hence, all entries from the batch queue should be in
409+
// All entries from the batch queue should be in
412410
// the finalized batch.
413-
assert!(resulting_batch_queue.is_empty());
411+
assert_eq!(batch_queue.len(), 3);
414412
assert_eq!(finalized_batch.len(), 3);
415413
assert_eq!(
416414
finalized_batch[0].nonced_verification_data.max_fee,
@@ -515,14 +513,10 @@ mod test {
515513
batch_queue.push(entry_3, batch_priority_3);
516514

517515
let gas_price = U256::from(1);
518-
let (resulting_batch_queue, finalized_batch) =
519-
try_build_batch(batch_queue, gas_price, 5000000, 50).unwrap();
520-
521-
// The resulting batch queue (entries from the old batch queue that were not willing to pay
522-
// in this batch), should be empty and hence, all entries from the batch queue should be in
523-
// the finalized batch.
516+
let finalized_batch = try_build_batch(batch_queue.clone(), gas_price, 5000000, 50).unwrap();
524517

525-
assert_eq!(resulting_batch_queue.len(), 1);
518+
// All but one entries from the batch queue should be in the finalized batch.
519+
assert_eq!(batch_queue.len(), 3);
526520
assert_eq!(finalized_batch.len(), 2);
527521
assert_eq!(
528522
finalized_batch[0].nonced_verification_data.max_fee,
@@ -628,10 +622,10 @@ mod test {
628622
// The max batch len is 2, so the algorithm should stop at the second entry.
629623
let max_batch_proof_qty = 2;
630624

631-
let (resulting_batch_queue, finalized_batch) =
632-
try_build_batch(batch_queue, gas_price, 5000000, max_batch_proof_qty).unwrap();
625+
let finalized_batch =
626+
try_build_batch(batch_queue.clone(), gas_price, 5000000, max_batch_proof_qty).unwrap();
633627

634-
assert_eq!(resulting_batch_queue.len(), 1);
628+
assert_eq!(batch_queue.len(), 3);
635629
assert_eq!(finalized_batch.len(), 2);
636630
assert_eq!(
637631
finalized_batch[0].nonced_verification_data.max_fee,

batcher/aligned-batcher/src/types/errors.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,13 @@ impl From<Bytes> for TransactionSendError {
2727
"0xa3a8658a" => TransactionSendError::NoFeePerProof, // can't happen, don't flush
2828
"0x7899ec71" => TransactionSendError::InsufficientFeeForAggregator, // shouldn't happen, don't flush
2929
// returning the proofs and retrying later may help
30-
"0x4f779ceb" => TransactionSendError::SubmissionInsufficientBalance, // shouldn't happen,
31-
// flush can help if something went wrong
3230
"0x3102f10c" => TransactionSendError::BatchAlreadySubmitted, // can happen, don't flush
3331
"0x5c54305e" => TransactionSendError::InsufficientFunds, // shouldn't happen, don't flush
3432
"0x152bc288" => TransactionSendError::OnlyBatcherAllowed, // won't happen, don't flush
33+
"0x4f779ceb" => TransactionSendError::SubmissionInsufficientBalance, // shouldn't happen,
34+
// flush can help if something went wrong
3535
_ => {
36+
// flush because unkown error
3637
TransactionSendError::Generic(format!("Unknown bytestring error: {}", byte_string))
3738
}
3839
}
@@ -56,6 +57,7 @@ pub enum BatcherError {
5657
BatchCostTooHigh,
5758
WsSinkEmpty,
5859
AddressNotFoundInUserStates(Address),
60+
QueueRemoveError(String),
5961
}
6062

6163
impl From<tungstenite::Error> for BatcherError {
@@ -134,6 +136,9 @@ impl fmt::Debug for BatcherError {
134136
reason
135137
)
136138
}
139+
BatcherError::QueueRemoveError(e) => {
140+
write!(f, "Error while removing entry from queue: {}", e)
141+
}
137142
}
138143
}
139144
}

0 commit comments

Comments
 (0)