Skip to content

Commit 07e6f34

Browse files
authored
refactor(batcher): batch building algorithm (#1008)
1 parent a8d8ace commit 07e6f34

File tree

5 files changed

+511
-326
lines changed

5 files changed

+511
-326
lines changed

batcher/aligned-batcher/src/lib.rs

Lines changed: 50 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ use config::NonPayingConfig;
66
use dotenv::dotenv;
77
use ethers::contract::ContractError;
88
use ethers::signers::Signer;
9-
use priority_queue::PriorityQueue;
109
use serde::Serialize;
1110

1211
use std::collections::hash_map::Entry;
@@ -35,7 +34,7 @@ use tokio::net::{TcpListener, TcpStream};
3534
use tokio::sync::{Mutex, RwLock};
3635
use tokio_tungstenite::tungstenite::{Error, Message};
3736
use tokio_tungstenite::WebSocketStream;
38-
use types::batch_queue::{BatchQueue, BatchQueueEntry, BatchQueueEntryPriority};
37+
use types::batch_queue::{self, BatchQueue, BatchQueueEntry, BatchQueueEntryPriority};
3938
use types::errors::{BatcherError, BatcherSendError};
4039

4140
use crate::config::{ConfigFromYaml, ContractDeploymentOutput};
@@ -52,10 +51,11 @@ mod zk_utils;
5251

5352
const AGGREGATOR_GAS_COST: u128 = 400_000;
5453
const BATCHER_SUBMISSION_BASE_GAS_COST: u128 = 125_000;
55-
const ADDITIONAL_SUBMISSION_GAS_COST_PER_PROOF: u128 = 13_000;
56-
const CONSTANT_GAS_COST: u128 = ((AGGREGATOR_GAS_COST * DEFAULT_AGGREGATOR_FEE_MULTIPLIER)
57-
/ DEFAULT_AGGREGATOR_FEE_DIVIDER)
58-
+ BATCHER_SUBMISSION_BASE_GAS_COST;
54+
pub(crate) const ADDITIONAL_SUBMISSION_GAS_COST_PER_PROOF: u128 = 13_000;
55+
pub(crate) const CONSTANT_GAS_COST: u128 =
56+
((AGGREGATOR_GAS_COST * DEFAULT_AGGREGATOR_FEE_MULTIPLIER) / DEFAULT_AGGREGATOR_FEE_DIVIDER)
57+
+ BATCHER_SUBMISSION_BASE_GAS_COST;
58+
5959
const DEFAULT_MAX_FEE_PER_PROOF: u128 = ADDITIONAL_SUBMISSION_GAS_COST_PER_PROOF * 100_000_000_000; // gas_price = 100 Gwei = 0.0000001 ether (high gas price)
6060
const MIN_FEE_PER_PROOF: u128 = ADDITIONAL_SUBMISSION_GAS_COST_PER_PROOF * 100_000_000; // gas_price = 0.1 Gwei = 0.0000000001 ether (low gas price)
6161
const RESPOND_TO_TASK_FEE_LIMIT_MULTIPLIER: u128 = 5; // to set the respondToTaskFeeLimit variable higher than fee_for_aggregator
@@ -715,14 +715,20 @@ impl Batcher {
715715

716716
// close old sink and replace with new one
717717
{
718-
let mut old_sink = replacement_entry.messaging_sink.write().await;
719-
if let Err(e) = old_sink.close().await {
720-
// we dont want to exit here, just log the error
721-
warn!("Error closing sink: {:?}", e);
722-
}
718+
if let Some(messaging_sink) = replacement_entry.messaging_sink {
719+
let mut old_sink = messaging_sink.write().await;
720+
if let Err(e) = old_sink.close().await {
721+
// we dont want to exit here, just log the error
722+
warn!("Error closing sink: {:?}", e);
723+
}
724+
} else {
725+
warn!(
726+
"Old websocket sink was empty. This should only happen in testing environments"
727+
)
728+
};
723729
}
724-
replacement_entry.messaging_sink = ws_conn_sink.clone();
725730

731+
replacement_entry.messaging_sink = Some(ws_conn_sink.clone());
726732
if let Some(msg) = batch_state.validate_and_increment_max_fee(replacement_entry) {
727733
warn!("Invalid max fee");
728734
send_message(ws_conn_sink.clone(), msg).await;
@@ -825,97 +831,27 @@ impl Batcher {
825831
// Set the batch posting flag to true
826832
*batch_posting = true;
827833

828-
let mut batch_queue_copy = batch_state.batch_queue.clone();
829-
830-
match self.try_build_batch(&mut batch_queue_copy, gas_price) {
831-
Some(finalized_batch) => {
834+
let batch_queue_copy = batch_state.batch_queue.clone();
835+
match batch_queue::try_build_batch(batch_queue_copy, gas_price, self.max_batch_size) {
836+
Ok((resulting_batch_queue, finalized_batch)) => {
832837
// Set the batch queue to batch queue copy
833-
batch_state.batch_queue = batch_queue_copy;
838+
batch_state.batch_queue = resulting_batch_queue;
834839
batch_state.update_user_proofs_in_batch_and_min_fee();
835-
836840
Some(finalized_batch)
837841
}
838-
None => {
842+
Err(BatcherError::BatchCostTooHigh) => {
839843
// We cant post a batch since users are not willing to pay the needed fee, wait for more proofs
840844
info!("No working batch found. Waiting for more proofs...");
841845
*batch_posting = false;
842846
None
843847
}
844-
}
845-
}
846-
847-
/// Tries to build a batch from the current batch queue.
848-
/// The function iterates over the batch queue and tries to build a batch that satisfies the gas price
849-
/// and the max_fee set by the users.
850-
/// If a working batch is found, the function tries to make it as big as possible by adding more proofs,
851-
/// until a user is not willing to pay the required fee.
852-
/// The extra check is that the batch size does not surpass the maximum batch size.
853-
/// Note that the batch queue is sorted descending by the max_fee set by the users.
854-
/// We use a copy of the batch queue because we might not find a working batch,
855-
/// and we want to keep the original batch queue intact.
856-
/// Returns Some(working_batch) if found, None otherwise.
857-
fn try_build_batch(
858-
&self,
859-
batch_queue_copy: &mut PriorityQueue<BatchQueueEntry, BatchQueueEntryPriority>,
860-
gas_price: U256,
861-
) -> Option<Vec<BatchQueueEntry>> {
862-
let mut finalized_batch = vec![];
863-
let mut finalized_batch_size = 2; // at most two extra bytes for cbor encoding array markers
864-
let mut finalized_batch_works = false;
865-
866-
while let Some((entry, _)) = batch_queue_copy.peek() {
867-
let serialized_vd_size =
868-
match cbor_serialize(&entry.nonced_verification_data.verification_data) {
869-
Ok(val) => val.len(),
870-
Err(e) => {
871-
warn!("Serialization error: {:?}", e);
872-
break;
873-
}
874-
};
875-
876-
if finalized_batch_size + serialized_vd_size > self.max_batch_size {
877-
break;
878-
}
879-
880-
let num_proofs = finalized_batch.len() + 1;
881-
882-
let gas_per_proof = (CONSTANT_GAS_COST
883-
+ ADDITIONAL_SUBMISSION_GAS_COST_PER_PROOF * num_proofs as u128)
884-
/ num_proofs as u128;
885-
886-
let fee_per_proof = U256::from(gas_per_proof) * gas_price;
887-
888-
debug!(
889-
"Validating that batch submission fee {} is less than max fee {} for sender {}",
890-
fee_per_proof, entry.nonced_verification_data.max_fee, entry.sender,
891-
);
892-
893-
// it is sufficient to check this max fee because it will be the lowest since its sorted
894-
if fee_per_proof < entry.nonced_verification_data.max_fee && num_proofs >= 2 {
895-
finalized_batch_works = true;
896-
} else if finalized_batch_works {
897-
// Can not add latest element since it is not willing to pay the corresponding fee
898-
// Could potentially still find another working solution later with more elements,
899-
// maybe we can explore all lengths in a future version
900-
// or do the reverse from this, try with whole batch,
901-
// then with whole batch minus last element, etc
902-
break;
848+
// FIXME: We should refactor this code and instead of returning None, return an error.
849+
// See issue https://github.com/yetanotherco/aligned_layer/issues/1046.
850+
Err(e) => {
851+
error!("Unexpected error: {:?}", e);
852+
*batch_posting = false;
853+
None
903854
}
904-
905-
// Either max fee is insufficient but we have not found a working solution yet,
906-
// or we can keep adding to a working batch,
907-
// Either way we need to keep iterating
908-
finalized_batch_size += serialized_vd_size;
909-
910-
// We can unwrap here because we have already peeked to check there is a value
911-
let (entry, _) = batch_queue_copy.pop().unwrap();
912-
finalized_batch.push(entry);
913-
}
914-
915-
if finalized_batch_works {
916-
Some(finalized_batch)
917-
} else {
918-
None
919855
}
920856
}
921857

@@ -977,13 +913,17 @@ impl Batcher {
977913
)
978914
.await
979915
{
980-
for entry in finalized_batch.iter() {
981-
let merkle_root = hex::encode(batch_merkle_tree.root);
982-
send_message(
983-
entry.messaging_sink.clone(),
984-
ResponseMessage::CreateNewTaskError(merkle_root),
985-
)
986-
.await
916+
for entry in finalized_batch.into_iter() {
917+
if let Some(ws_sink) = entry.messaging_sink {
918+
let merkle_root = hex::encode(batch_merkle_tree.root);
919+
send_message(
920+
ws_sink.clone(),
921+
ResponseMessage::CreateNewTaskError(merkle_root),
922+
)
923+
.await
924+
} else {
925+
warn!("Websocket sink was found empty. This should only happen in tests");
926+
}
987927
}
988928

989929
self.flush_queue_and_clear_nonce_cache().await;
@@ -999,7 +939,11 @@ impl Batcher {
999939
let mut batch_state = self.batch_state.lock().await;
1000940

1001941
for (entry, _) in batch_state.batch_queue.iter() {
1002-
send_message(entry.messaging_sink.clone(), ResponseMessage::BatchReset).await;
942+
if let Some(ws_sink) = entry.messaging_sink.as_ref() {
943+
send_message(ws_sink.clone(), ResponseMessage::BatchReset).await;
944+
} else {
945+
warn!("Websocket sink was found empty. This should only happen in tests");
946+
}
1003947
}
1004948

1005949
batch_state.batch_queue.clear();
@@ -1355,8 +1299,11 @@ async fn send_batch_inclusion_data_responses(
13551299
let serialized_response = cbor_serialize(&response)
13561300
.map_err(|e| BatcherError::SerializationError(e.to_string()))?;
13571301

1358-
let sending_result = entry
1359-
.messaging_sink
1302+
let Some(ws_sink) = entry.messaging_sink.as_ref() else {
1303+
return Err(BatcherError::WsSinkEmpty);
1304+
};
1305+
1306+
let sending_result = ws_sink
13601307
.write()
13611308
.await
13621309
.send(Message::binary(serialized_response))

0 commit comments

Comments
 (0)