Skip to content

Commit 73c6da4

Browse files
committed
Merge branch 'refs/heads/staging' into merge_testnet_v_0_10_3
# Conflicts: # batcher/Cargo.lock
2 parents d908221 + 24aa347 commit 73c6da4

File tree

16 files changed

+633
-570
lines changed

16 files changed

+633
-570
lines changed

Makefile

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,7 @@ batcher_send_risc0_task_no_pub_input:
360360
--vm_program ../../scripts/test_files/risc_zero/no_public_inputs/no_pub_input_id.bin \
361361
--proof_generator_addr 0x66f9664f97F2b50F62D13eA064982f936dE76657 \
362362
--rpc_url $(RPC_URL) \
363-
--payment_service_addr $(BATCHER_PAYMENTS_CONTRACT_ADDRESS)
363+
--network $(NETWORK)
364364

365365
batcher_send_risc0_burst:
366366
@echo "Sending Risc0 fibonacci task to Batcher..."
@@ -1044,3 +1044,25 @@ telemetry_dump_db:
10441044
telemetry_create_env:
10451045
@cd telemetry_api && \
10461046
cp .env.dev .env
1047+
1048+
setup_local_aligned_all:
1049+
tmux kill-session -t aligned_layer || true
1050+
tmux new-session -d -s aligned_layer
1051+
1052+
tmux new-window -t aligned_layer -n anvil
1053+
tmux send-keys -t aligned_layer 'make anvil_start_with_block_time' C-m
1054+
1055+
tmux new-window -t aligned_layer -n aggregator
1056+
tmux send-keys -t aligned_layer:aggregator 'make aggregator_start' C-m
1057+
1058+
tmux new-window -t aligned_layer -n operator
1059+
tmux send-keys -t aligned_layer:operator 'sleep 5 && make operator_register_and_start' C-m
1060+
1061+
tmux new-window -t aligned_layer -n batcher
1062+
tmux send-keys -t aligned_layer:batcher 'sleep 60 && make batcher_start_local' C-m
1063+
1064+
tmux new-window -t aligned_layer -n explorer
1065+
tmux send-keys -t aligned_layer:explorer 'make explorer_create_env && make explorer_build_db && make run_explorer' C-m
1066+
1067+
tmux new-window -t aligned_layer -n telemetry
1068+
tmux send-keys -t aligned_layer:telemetry 'docker compose -f telemetry-docker-compose.yaml down && make telemetry_create_env && make telemetry_run_db && make open_telemetry_start && make telemetry_start' C-m

batcher/aligned-batcher/src/connection.rs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use aligned_sdk::{
77
};
88
use futures_util::{stream::SplitSink, SinkExt};
99
use lambdaworks_crypto::merkle_tree::merkle::MerkleTree;
10-
use log::{error, info};
10+
use log::{debug, error};
1111
use serde::Serialize;
1212
use tokio::{net::TcpStream, sync::RwLock};
1313
use tokio_tungstenite::{
@@ -22,7 +22,11 @@ pub(crate) async fn send_batch_inclusion_data_responses(
2222
batch_merkle_tree: &MerkleTree<VerificationCommitmentBatch>,
2323
) -> Result<(), BatcherError> {
2424
for (vd_batch_idx, entry) in finalized_batch.iter().enumerate() {
25-
let batch_inclusion_data = BatchInclusionData::new(vd_batch_idx, batch_merkle_tree);
25+
let batch_inclusion_data = BatchInclusionData::new(
26+
vd_batch_idx,
27+
batch_merkle_tree,
28+
entry.nonced_verification_data.nonce,
29+
);
2630
let response = ResponseMessage::BatchInclusionData(batch_inclusion_data);
2731

2832
let serialized_response = cbor_serialize(&response)
@@ -44,7 +48,7 @@ pub(crate) async fn send_batch_inclusion_data_responses(
4448
Ok(_) => (),
4549
}
4650

47-
info!("Response sent");
51+
debug!("Response sent");
4852
}
4953

5054
Ok(())

batcher/aligned-batcher/src/lib.rs

Lines changed: 30 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ use aligned_sdk::core::constants::{
2828
};
2929
use aligned_sdk::core::types::{
3030
ClientMessage, NoncedVerificationData, ProofInvalidReason, ProvingSystemId, ResponseMessage,
31-
ValidityResponseMessage, VerificationCommitmentBatch, VerificationData,
32-
VerificationDataCommitment,
31+
VerificationCommitmentBatch, VerificationData, VerificationDataCommitment,
3332
};
3433

3534
use aws_sdk_s3::client::Client as S3Client;
@@ -415,11 +414,7 @@ impl Batcher {
415414
let msg_chain_id = client_msg.verification_data.chain_id;
416415
if msg_chain_id != self.chain_id {
417416
warn!("Received message with incorrect chain id: {msg_chain_id}");
418-
send_message(
419-
ws_conn_sink.clone(),
420-
ValidityResponseMessage::InvalidChainId,
421-
)
422-
.await;
417+
send_message(ws_conn_sink.clone(), ResponseMessage::InvalidChainId).await;
423418
self.metrics.user_error(&["invalid_chain_id", ""]);
424419
return Ok(());
425420
}
@@ -430,7 +425,7 @@ impl Batcher {
430425
warn!("Received message with incorrect payment service address: {msg_payment_service_addr}");
431426
send_message(
432427
ws_conn_sink.clone(),
433-
ValidityResponseMessage::InvalidPaymentServiceAddress(
428+
ResponseMessage::InvalidPaymentServiceAddress(
434429
msg_payment_service_addr,
435430
self.payment_service.address(),
436431
),
@@ -444,11 +439,7 @@ impl Batcher {
444439
info!("Verifying message signature...");
445440
let Ok(addr) = client_msg.verify_signature() else {
446441
error!("Signature verification error");
447-
send_message(
448-
ws_conn_sink.clone(),
449-
ValidityResponseMessage::InvalidSignature,
450-
)
451-
.await;
442+
send_message(ws_conn_sink.clone(), ResponseMessage::InvalidSignature).await;
452443
self.metrics.user_error(&["invalid_signature", ""]);
453444
return Ok(());
454445
};
@@ -457,7 +448,7 @@ impl Batcher {
457448
let proof_size = client_msg.verification_data.verification_data.proof.len();
458449
if proof_size > self.max_proof_size {
459450
error!("Proof size exceeds the maximum allowed size.");
460-
send_message(ws_conn_sink.clone(), ValidityResponseMessage::ProofTooLarge).await;
451+
send_message(ws_conn_sink.clone(), ResponseMessage::ProofTooLarge).await;
461452
self.metrics.user_error(&["proof_too_large", ""]);
462453
return Ok(());
463454
}
@@ -477,7 +468,7 @@ impl Batcher {
477468
);
478469
send_message(
479470
ws_conn_sink.clone(),
480-
ValidityResponseMessage::InvalidProof(ProofInvalidReason::DisabledVerifier(
471+
ResponseMessage::InvalidProof(ProofInvalidReason::DisabledVerifier(
481472
verification_data.proving_system,
482473
)),
483474
)
@@ -493,7 +484,7 @@ impl Batcher {
493484
error!("Invalid proof detected. Verification failed");
494485
send_message(
495486
ws_conn_sink.clone(),
496-
ValidityResponseMessage::InvalidProof(ProofInvalidReason::RejectedProof),
487+
ResponseMessage::InvalidProof(ProofInvalidReason::RejectedProof),
497488
)
498489
.await;
499490
self.metrics.user_error(&[
@@ -518,7 +509,7 @@ impl Batcher {
518509
if self.user_balance_is_unlocked(&addr).await {
519510
send_message(
520511
ws_conn_sink.clone(),
521-
ValidityResponseMessage::InsufficientBalance(addr),
512+
ResponseMessage::InsufficientBalance(addr),
522513
)
523514
.await;
524515
self.metrics.user_error(&["insufficient_balance", ""]);
@@ -542,7 +533,7 @@ impl Batcher {
542533
error!(
543534
"Failed to get user nonce from Ethereum for address {addr:?}. Error: {e:?}"
544535
);
545-
send_message(ws_conn_sink.clone(), ValidityResponseMessage::InvalidNonce).await;
536+
send_message(ws_conn_sink.clone(), ResponseMessage::InvalidNonce).await;
546537
self.metrics.user_error(&["invalid_nonce", ""]);
547538
return Ok(());
548539
}
@@ -561,7 +552,7 @@ impl Batcher {
561552

562553
let Some(user_balance) = self.get_user_balance(&addr).await else {
563554
error!("Could not get balance for address {addr:?}");
564-
send_message(ws_conn_sink.clone(), ValidityResponseMessage::EthRpcError).await;
555+
send_message(ws_conn_sink.clone(), ResponseMessage::EthRpcError).await;
565556
self.metrics.user_error(&["eth_rpc_error", ""]);
566557
return Ok(());
567558
};
@@ -574,7 +565,7 @@ impl Batcher {
574565
let Some(proofs_in_batch) = batch_state_lock.get_user_proof_count(&addr).await else {
575566
error!("Failed to get user proof count: User not found in user states, but it should have been already inserted");
576567
std::mem::drop(batch_state_lock);
577-
send_message(ws_conn_sink.clone(), ValidityResponseMessage::InvalidNonce).await;
568+
send_message(ws_conn_sink.clone(), ResponseMessage::InvalidNonce).await;
578569
self.metrics.user_error(&["invalid_nonce", ""]);
579570
return Ok(());
580571
};
@@ -583,7 +574,7 @@ impl Batcher {
583574
std::mem::drop(batch_state_lock);
584575
send_message(
585576
ws_conn_sink.clone(),
586-
ValidityResponseMessage::InsufficientBalance(addr),
577+
ResponseMessage::InsufficientBalance(addr),
587578
)
588579
.await;
589580
self.metrics.user_error(&["insufficient_balance", ""]);
@@ -594,15 +585,15 @@ impl Batcher {
594585
let Some(expected_nonce) = cached_user_nonce else {
595586
error!("Failed to get cached user nonce: User not found in user states, but it should have been already inserted");
596587
std::mem::drop(batch_state_lock);
597-
send_message(ws_conn_sink.clone(), ValidityResponseMessage::InvalidNonce).await;
588+
send_message(ws_conn_sink.clone(), ResponseMessage::InvalidNonce).await;
598589
self.metrics.user_error(&["invalid_nonce", ""]);
599590
return Ok(());
600591
};
601592

602593
if expected_nonce < msg_nonce {
603594
std::mem::drop(batch_state_lock);
604-
warn!("Invalid nonce for address {addr}, had nonce {expected_nonce:?} < {msg_nonce:?}");
605-
send_message(ws_conn_sink.clone(), ValidityResponseMessage::InvalidNonce).await;
595+
warn!("Invalid nonce for address {addr}, expected nonce: {expected_nonce:?}, received nonce: {msg_nonce:?}");
596+
send_message(ws_conn_sink.clone(), ResponseMessage::InvalidNonce).await;
606597
self.metrics.user_error(&["invalid_nonce", ""]);
607598
return Ok(());
608599
}
@@ -626,15 +617,15 @@ impl Batcher {
626617
let msg_max_fee = nonced_verification_data.max_fee;
627618
let Some(user_min_fee) = batch_state_lock.get_user_min_fee(&addr).await else {
628619
std::mem::drop(batch_state_lock);
629-
send_message(ws_conn_sink.clone(), ValidityResponseMessage::InvalidNonce).await;
620+
send_message(ws_conn_sink.clone(), ResponseMessage::InvalidNonce).await;
630621
self.metrics.user_error(&["invalid_nonce", ""]);
631622
return Ok(());
632623
};
633624

634625
if msg_max_fee > user_min_fee {
635626
std::mem::drop(batch_state_lock);
636627
warn!("Invalid max fee for address {addr}, had fee {user_min_fee:?} < {msg_max_fee:?}");
637-
send_message(ws_conn_sink.clone(), ValidityResponseMessage::InvalidMaxFee).await;
628+
send_message(ws_conn_sink.clone(), ResponseMessage::InvalidMaxFee).await;
638629
self.metrics.user_error(&["invalid_max_fee", ""]);
639630
return Ok(());
640631
}
@@ -654,13 +645,12 @@ impl Batcher {
654645
.await
655646
{
656647
error!("Error while adding entry to batch: {e:?}");
657-
send_message(ws_conn_sink, ValidityResponseMessage::AddToBatchError).await;
648+
send_message(ws_conn_sink, ResponseMessage::AddToBatchError).await;
658649
self.metrics.user_error(&["add_to_batch_error", ""]);
659650
return Ok(());
660651
};
661652

662653
info!("Verification data message handled");
663-
send_message(ws_conn_sink, ValidityResponseMessage::Valid).await;
664654
Ok(())
665655
}
666656

@@ -695,7 +685,7 @@ impl Batcher {
695685
let Some(entry) = batch_state_lock.get_entry(addr, nonce) else {
696686
std::mem::drop(batch_state_lock);
697687
warn!("Invalid nonce for address {addr}. Queue entry with nonce {nonce} not found");
698-
send_message(ws_conn_sink.clone(), ValidityResponseMessage::InvalidNonce).await;
688+
send_message(ws_conn_sink.clone(), ResponseMessage::InvalidNonce).await;
699689
self.metrics.user_error(&["invalid_nonce", ""]);
700690
return;
701691
};
@@ -706,7 +696,7 @@ impl Batcher {
706696
warn!("Invalid replacement message for address {addr}, had fee {original_max_fee:?} < {replacement_max_fee:?}");
707697
send_message(
708698
ws_conn_sink.clone(),
709-
ValidityResponseMessage::InvalidReplacementMessage,
699+
ResponseMessage::InvalidReplacementMessage,
710700
)
711701
.await;
712702
self.metrics
@@ -730,6 +720,8 @@ impl Batcher {
730720
if let Err(e) = old_sink.close().await {
731721
// we dont want to exit here, just log the error
732722
warn!("Error closing sink: {e:?}");
723+
} else {
724+
info!("Old websocket sink closed");
733725
}
734726
} else {
735727
warn!(
@@ -744,7 +736,7 @@ impl Batcher {
744736
warn!("Invalid replacement message");
745737
send_message(
746738
ws_conn_sink.clone(),
747-
ValidityResponseMessage::InvalidReplacementMessage,
739+
ResponseMessage::InvalidReplacementMessage,
748740
)
749741
.await;
750742
self.metrics
@@ -1055,7 +1047,7 @@ impl Batcher {
10551047
let merkle_root = hex::encode(batch_merkle_tree.root);
10561048
send_message(
10571049
ws_sink.clone(),
1058-
ResponseMessage::CreateNewTaskError(merkle_root),
1050+
ResponseMessage::CreateNewTaskError(merkle_root, format!("{:?}", e)),
10591051
)
10601052
.await
10611053
} else {
@@ -1281,7 +1273,7 @@ impl Batcher {
12811273
self.cancel_create_new_task_tx(fee_params.gas_price).await;
12821274
Err(BatcherError::ReceiptNotFoundError)
12831275
}
1284-
Err(_) => Err(BatcherError::TransactionSendError),
1276+
Err(RetryError::Permanent(e)) | Err(RetryError::Transient(e)) => Err(e),
12851277
}
12861278
}
12871279

@@ -1359,7 +1351,7 @@ impl Batcher {
13591351
info!("Handling nonpaying message");
13601352
let Some(non_paying_config) = self.non_paying_config.as_ref() else {
13611353
warn!("There isn't a non-paying configuration loaded. This message will be ignored");
1362-
send_message(ws_sink.clone(), ValidityResponseMessage::InvalidNonce).await;
1354+
send_message(ws_sink.clone(), ResponseMessage::InvalidNonce).await;
13631355
return Ok(());
13641356
};
13651357

@@ -1368,7 +1360,7 @@ impl Batcher {
13681360
error!("Could not get balance for non-paying address {replacement_addr:?}");
13691361
send_message(
13701362
ws_sink.clone(),
1371-
ValidityResponseMessage::InsufficientBalance(replacement_addr),
1363+
ResponseMessage::InsufficientBalance(replacement_addr),
13721364
)
13731365
.await;
13741366
return Ok(());
@@ -1378,26 +1370,17 @@ impl Batcher {
13781370
error!("Insufficient funds for non-paying address {replacement_addr:?}");
13791371
send_message(
13801372
ws_sink.clone(),
1381-
ValidityResponseMessage::InsufficientBalance(replacement_addr),
1373+
ResponseMessage::InsufficientBalance(replacement_addr),
13821374
)
13831375
.await;
13841376
return Ok(());
13851377
}
13861378

13871379
let batch_state_lock = self.batch_state.lock().await;
1388-
let Some(non_paying_nonce) = batch_state_lock.get_user_nonce(&replacement_addr).await
1389-
else {
1390-
std::mem::drop(batch_state_lock);
1391-
error!("Nonce for non-paying address {replacement_addr:?} not found in cache.");
1392-
send_message(ws_sink.clone(), ValidityResponseMessage::EthRpcError).await;
1393-
return Ok(());
1394-
};
1395-
1396-
debug!("Non-paying nonce: {:?}", non_paying_nonce);
13971380

13981381
let nonced_verification_data = NoncedVerificationData::new(
13991382
client_msg.verification_data.verification_data.clone(),
1400-
non_paying_nonce,
1383+
client_msg.verification_data.nonce,
14011384
DEFAULT_MAX_FEE_PER_PROOF.into(), // 13_000 gas per proof * 100 gwei gas price (upper bound)
14021385
self.chain_id,
14031386
self.payment_service.address(),
@@ -1421,12 +1404,11 @@ impl Batcher {
14211404
.await
14221405
{
14231406
info!("Error while adding nonpaying address entry to batch: {e:?}");
1424-
send_message(ws_sink, ValidityResponseMessage::AddToBatchError).await;
1407+
send_message(ws_sink, ResponseMessage::AddToBatchError).await;
14251408
return Ok(());
14261409
};
14271410

14281411
info!("Non-paying verification data message handled");
1429-
send_message(ws_sink, ValidityResponseMessage::Valid).await;
14301412
Ok(())
14311413
}
14321414

batcher/aligned-batcher/src/retry/batcher_retryables.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,9 @@ pub async fn create_new_task_retryable(
129129
Err(ContractError::Revert(err)) => {
130130
// Since transaction was reverted, we don't want to retry with fallback.
131131
warn!("Transaction reverted {:?}", err);
132-
return Err(RetryError::Permanent(BatcherError::TransactionSendError));
132+
return Err(RetryError::Permanent(BatcherError::TransactionSendError(
133+
err.to_string(),
134+
)));
133135
}
134136
_ => {
135137
call_fallback = payment_service_fallback
@@ -146,9 +148,15 @@ pub async fn create_new_task_retryable(
146148
Ok(pending_tx) => pending_tx,
147149
Err(ContractError::Revert(err)) => {
148150
warn!("Transaction reverted {:?}", err);
149-
return Err(RetryError::Permanent(BatcherError::TransactionSendError));
151+
return Err(RetryError::Permanent(BatcherError::TransactionSendError(
152+
err.to_string(),
153+
)));
154+
}
155+
Err(err) => {
156+
return Err(RetryError::Transient(BatcherError::TransactionSendError(
157+
err.to_string(),
158+
)))
150159
}
151-
_ => return Err(RetryError::Transient(BatcherError::TransactionSendError)),
152160
}
153161
}
154162
};
@@ -162,7 +170,7 @@ pub async fn create_new_task_retryable(
162170
})?
163171
.map_err(|e| {
164172
warn!("Error while waiting for batch inclusion: {e}");
165-
RetryError::Transient(BatcherError::TransactionSendError)
173+
RetryError::Transient(BatcherError::TransactionSendError(e.to_string()))
166174
})?
167175
.ok_or(RetryError::Permanent(BatcherError::ReceiptNotFoundError))
168176
}

0 commit comments

Comments
 (0)