Skip to content

Commit 653283c

Browse files
committed
fix: handle block don't spawn a new task for it
instead wait until it finishes and update the last block number with the receipt
1 parent d25bef1 commit 653283c

File tree

1 file changed

+71
-74
lines changed

1 file changed

+71
-74
lines changed

crates/batcher/src/lib.rs

Lines changed: 71 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -415,11 +415,9 @@ impl Batcher {
415415
}
416416

417417
info!("Received new block: {}", block_number);
418-
tokio::spawn(async move {
419-
if let Err(e) = batcher.handle_new_block(block_number).await {
420-
error!("Error when handling new block: {:?}", e);
421-
}
422-
});
418+
if let Err(e) = batcher.handle_new_block(block_number).await {
419+
error!("Error when handling new block: {:?}", e);
420+
}
423421
}
424422
error!("Both main and fallback Ethereum WS clients subscriptions have disconnected, will try to reconnect...");
425423

@@ -619,7 +617,9 @@ impl Batcher {
619617

620618
// if this is locked, then it means that the a batch is being built
621619
// so we need to stop the processing
620+
debug!("Checking if a batch is being built before proceeding with the message");
622621
let _ = self.batch_building_mutex.lock().await;
622+
debug!("Batch building has finished or did't started, proceeding with the message");
623623

624624
// * ---------------------------------------------------*
625625
// * Perform validations over the message *
@@ -732,45 +732,36 @@ impl Batcher {
732732
// This is needed because we need to query the user state to make validations and
733733
// finally add the proof to the batch queue.
734734

735+
debug!("Trying to acquire user mutex for {:?}...", addr_in_msg);
735736
let user_mutex = {
736737
let mut map = self.user_mutexes.lock().await;
737738
map.entry(addr)
738739
.or_insert_with(|| Arc::new(Mutex::new(())))
739740
.clone()
740741
};
741-
let _ = user_mutex.lock().await;
742+
let _user_mutex = user_mutex.lock().await;
743+
debug!("User mutex for {:?} acquired...", addr_in_msg);
742744

743745
let msg_max_fee = nonced_verification_data.max_fee;
744-
let Some(user_last_max_fee_limit) = self
745-
.batch_state
746-
.lock()
747-
.await
748-
.get_user_last_max_fee_limit(&addr)
749-
.await
750-
else {
751-
send_message(
752-
ws_conn_sink.clone(),
753-
SubmitProofResponseMessage::AddToBatchError,
754-
)
755-
.await;
756-
self.metrics.user_error(&["batcher_state_error", ""]);
757-
return Ok(());
758-
};
746+
let (user_last_max_fee_limit, user_accumulated_fee) = {
747+
let batch_state_lock = self.batch_state.lock().await;
748+
let last_max_fee = batch_state_lock.get_user_last_max_fee_limit(&addr).await;
749+
let accumulated_fee = batch_state_lock.get_user_total_fees_in_queue(&addr).await;
750+
drop(batch_state_lock);
759751

760-
let Some(user_accumulated_fee) = self
761-
.batch_state
762-
.lock()
763-
.await
764-
.get_user_total_fees_in_queue(&addr)
765-
.await
766-
else {
767-
send_message(
768-
ws_conn_sink.clone(),
769-
SubmitProofResponseMessage::AddToBatchError,
770-
)
771-
.await;
772-
self.metrics.user_error(&["batcher_state_error", ""]);
773-
return Ok(());
752+
match (last_max_fee, accumulated_fee) {
753+
(Some(last_max), Some(accumulated)) => (last_max, accumulated),
754+
_ => {
755+
send_message(
756+
ws_conn_sink.clone(),
757+
SubmitProofResponseMessage::AddToBatchError,
758+
)
759+
.await;
760+
761+
self.metrics.user_error(&["batcher_state_error", ""]);
762+
return Ok(());
763+
}
764+
}
774765
};
775766

776767
if !self.verify_user_has_enough_balance(user_balance, user_accumulated_fee, msg_max_fee) {
@@ -1203,15 +1194,18 @@ impl Batcher {
12031194
block_number: u64,
12041195
gas_price: U256,
12051196
) -> Option<Vec<BatchQueueEntry>> {
1197+
info!("Batch building: started, acquiring lock to stop processing new messages...");
12061198
let _ = self.batch_building_mutex.lock().await;
1207-
let batch_state_lock = self.batch_state.lock().await;
1199+
1200+
info!("Batch building: waiting until all the ongoing messages finish");
12081201
// acquire all the user locks to make sure all the ongoing message have been processed
12091202
for user_mutex in self.user_mutexes.lock().await.values() {
12101203
let _ = user_mutex.lock().await;
12111204
}
1212-
let current_batch_len = batch_state_lock.batch_queue.len();
1213-
let last_uploaded_batch_block_lock = self.last_uploaded_batch_block.lock().await;
1205+
info!("Batch building: all locks acquired, proceeding to build batch");
12141206

1207+
let batch_state_lock = self.batch_state.lock().await;
1208+
let current_batch_len = batch_state_lock.batch_queue.len();
12151209
if current_batch_len < 1 {
12161210
info!(
12171211
"Current batch has {} proofs. Waiting for more proofs...",
@@ -1220,6 +1214,7 @@ impl Batcher {
12201214
return None;
12211215
}
12221216

1217+
let last_uploaded_batch_block_lock = self.last_uploaded_batch_block.lock().await;
12231218
if block_number < *last_uploaded_batch_block_lock + self.min_block_interval {
12241219
info!(
12251220
"Current batch not ready to be posted. Minimium amount of {} blocks have not passed. Block passed: {}", self.min_block_interval,
@@ -1332,7 +1327,6 @@ impl Batcher {
13321327
/// The last uploaded batch block is updated once the task is created in Aligned.
13331328
async fn finalize_batch(
13341329
&self,
1335-
block_number: u64,
13361330
finalized_batch: Vec<BatchQueueEntry>,
13371331
gas_price: U256,
13381332
) -> Result<(), BatcherError> {
@@ -1364,16 +1358,6 @@ impl Batcher {
13641358
)
13651359
})?;
13661360

1367-
{
1368-
let mut last_uploaded_batch_block = self.last_uploaded_batch_block.lock().await;
1369-
// update last uploaded batch block
1370-
*last_uploaded_batch_block = block_number;
1371-
info!(
1372-
"Batch Finalizer: Last uploaded batch block updated to: {}. Lock unlocked",
1373-
block_number
1374-
);
1375-
}
1376-
13771361
let leaves: Vec<[u8; 32]> = batch_data_comm
13781362
.iter()
13791363
.map(VerificationCommitmentBatch::hash_data)
@@ -1388,7 +1372,7 @@ impl Batcher {
13881372
}
13891373

13901374
// Here we submit the batch on-chain
1391-
if let Err(e) = self
1375+
match self
13921376
.submit_batch(
13931377
&batch_bytes,
13941378
&batch_merkle_tree.root,
@@ -1398,30 +1382,41 @@ impl Batcher {
13981382
)
13991383
.await
14001384
{
1401-
let reason = format!("{:?}", e);
1402-
if let Err(e) = self
1403-
.telemetry
1404-
.task_creation_failed(&hex::encode(batch_merkle_tree.root), &reason)
1405-
.await
1406-
{
1407-
warn!("Failed to send task status to telemetry: {:?}", e);
1385+
Ok(block_number) => {
1386+
let mut last_uploaded_batch_block = self.last_uploaded_batch_block.lock().await;
1387+
// update last uploaded batch block
1388+
*last_uploaded_batch_block = block_number;
1389+
info!(
1390+
"Batch Finalizer: Last uploaded batch block updated to: {}. Lock unlocked",
1391+
block_number
1392+
);
14081393
}
1409-
1410-
// decide if i want to flush the queue:
1411-
match e {
1412-
BatcherError::TransactionSendError(
1413-
TransactionSendError::SubmissionInsufficientBalance,
1414-
) => {
1415-
// TODO calling remove_proofs_from_queue here is a better solution, flushing only the failed batch
1416-
// this would also need a message sent to the clients
1417-
self.flush_queue_and_clear_nonce_cache().await;
1394+
Err(e) => {
1395+
let reason = format!("{:?}", e);
1396+
if let Err(e) = self
1397+
.telemetry
1398+
.task_creation_failed(&hex::encode(batch_merkle_tree.root), &reason)
1399+
.await
1400+
{
1401+
warn!("Failed to send task status to telemetry: {:?}", e);
14181402
}
1419-
_ => {
1420-
// Add more cases here if we want in the future
1403+
1404+
// decide if i want to flush the queue:
1405+
match e {
1406+
BatcherError::TransactionSendError(
1407+
TransactionSendError::SubmissionInsufficientBalance,
1408+
) => {
1409+
// TODO calling remove_proofs_from_queue here is a better solution, flushing only the failed batch
1410+
// this would also need a message sent to the clients
1411+
self.flush_queue_and_clear_nonce_cache().await;
1412+
}
1413+
_ => {
1414+
// Add more cases here if we want in the future
1415+
}
14211416
}
1422-
}
14231417

1424-
return Err(e);
1418+
return Err(e);
1419+
}
14251420
};
14261421

14271422
// Once the submit is succesfull, we remove the submitted proofs from the queue
@@ -1499,7 +1494,7 @@ impl Batcher {
14991494

15001495
if let Some(finalized_batch) = self.is_batch_ready(block_number, modified_gas_price).await {
15011496
let batch_finalization_result = self
1502-
.finalize_batch(block_number, finalized_batch, modified_gas_price)
1497+
.finalize_batch(finalized_batch, modified_gas_price)
15031498
.await;
15041499

15051500
// Resetting this here to avoid doing it on every return path of `finalize_batch` function
@@ -1520,7 +1515,7 @@ impl Batcher {
15201515
leaves: Vec<[u8; 32]>,
15211516
finalized_batch: &[BatchQueueEntry],
15221517
gas_price: U256,
1523-
) -> Result<(), BatcherError> {
1518+
) -> Result<u64, BatcherError> {
15241519
let batch_merkle_root_hex = hex::encode(batch_merkle_root);
15251520
info!("Batch merkle root: 0x{}", batch_merkle_root_hex);
15261521
let file_name = batch_merkle_root_hex.clone() + ".json";
@@ -1592,10 +1587,10 @@ impl Batcher {
15921587
)
15931588
.await
15941589
{
1595-
Ok(_) => {
1590+
Ok(res) => {
15961591
info!("Batch verification task created on Aligned contract");
15971592
self.metrics.sent_batches.inc();
1598-
Ok(())
1593+
Ok(res.block_number.map(|e| e.as_u64()).unwrap_or_default())
15991594
}
16001595
Err(e) => {
16011596
error!("Failed to send batch to contract: {:?}", e);
@@ -2028,6 +2023,8 @@ impl Batcher {
20282023
&self,
20292024
nonced_verification_data: &NoncedVerificationData,
20302025
) -> Result<(), ProofInvalidReason> {
2026+
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
2027+
20312028
if !self.pre_verification_is_enabled {
20322029
return Ok(());
20332030
}

0 commit comments

Comments
 (0)