Skip to content

Commit 36a8369

Browse files
Initial version with retry only on proof sending
1 parent 3630400 commit 36a8369

File tree

4 files changed

+184
-58
lines changed

4 files changed

+184
-58
lines changed

aggregation_mode/proof_aggregator/src/aggregators/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ impl ZKVMEngine {
161161
}
162162
}
163163

164+
#[derive(Clone)]
164165
pub enum AlignedProof {
165166
SP1(Box<SP1ProofWithPubValuesAndVk>),
166167
Risc0(Box<Risc0ProofReceiptAndImageId>),

aggregation_mode/proof_aggregator/src/aggregators/risc0_aggregator.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use aligned_sdk::aggregation_layer::AggregationModeProvingSystem;
44
use risc0_zkvm::{default_prover, ExecutorEnv, ProverOpts, Receipt};
55
use sha3::{Digest, Keccak256};
66

7+
#[derive(Clone)]
78
pub struct Risc0ProofReceiptAndImageId {
89
pub image_id: [u8; 32],
910
pub receipt: Receipt,

aggregation_mode/proof_aggregator/src/aggregators/sp1_aggregator.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ static SP1_PROVER_CLIENT: LazyLock<EnvProver> = LazyLock::new(ProverClient::from
2525
static SP1_PROVER_CLIENT_CPU: LazyLock<CpuProver> =
2626
LazyLock::new(|| ProverClient::builder().cpu().build());
2727

28+
#[derive(Clone)]
2829
pub struct SP1ProofWithPubValuesAndVk {
2930
pub proof_with_pub_values: SP1ProofWithPublicValues,
3031
pub vk: SP1VerifyingKey,

aggregation_mode/proof_aggregator/src/backend/mod.rs

Lines changed: 181 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -205,8 +205,10 @@ impl ProofAggregator {
205205
}
206206

207207
info!("Sending proof to ProofAggregationService contract...");
208+
209+
// Retry in case of failure
208210
let receipt = self
209-
.send_proof_to_verify_on_chain(blob, blob_versioned_hash, aggregated_proof)
211+
.send_proof_to_verify_on_chain_retryable(blob, blob_versioned_hash, aggregated_proof)
210212
.await?;
211213
info!(
212214
"Proof sent and verified, tx hash {:?}",
@@ -272,70 +274,49 @@ impl ProofAggregator {
272274
expected_cost_in_wei <= max_to_spend_in_wei
273275
}
274276

275-
async fn send_proof_to_verify_on_chain(
277+
async fn send_proof_to_verify_on_chain_retryable(
276278
&self,
277279
blob: BlobTransactionSidecar,
278280
blob_versioned_hash: [u8; 32],
279281
aggregated_proof: AlignedProof,
280282
) -> Result<TransactionReceipt, AggregatedProofSubmissionError> {
281-
let tx_req = match aggregated_proof {
282-
AlignedProof::SP1(proof) => self
283-
.proof_aggregation_service
284-
.verifyAggregationSP1(
285-
blob_versioned_hash.into(),
286-
proof.proof_with_pub_values.public_values.to_vec().into(),
287-
proof.proof_with_pub_values.bytes().into(),
288-
self.sp1_chunk_aggregator_vk_hash_bytes.into(),
283+
match send_proof_to_verify_on_chain(
284+
blob.clone(),
285+
blob_versioned_hash,
286+
aggregated_proof.clone(),
287+
self.proof_aggregation_service.clone(),
288+
self.sp1_chunk_aggregator_vk_hash_bytes,
289+
self.risc0_chunk_aggregator_image_id_bytes,
290+
)
291+
.await
292+
{
293+
Ok(tx_receipt) => Ok(tx_receipt),
294+
Err(err) => {
295+
tracing::error!("Failed to send proof to be verified on chain: {err:?}");
296+
297+
retry_function(
298+
|| {
299+
send_proof_to_verify_on_chain(
300+
blob.clone(),
301+
blob_versioned_hash,
302+
aggregated_proof.clone(),
303+
self.proof_aggregation_service.clone(),
304+
self.sp1_chunk_aggregator_vk_hash_bytes,
305+
self.risc0_chunk_aggregator_image_id_bytes,
306+
)
307+
},
308+
ETHEREUM_CALL_MIN_RETRY_DELAY,
309+
ETHEREUM_CALL_BACKOFF_FACTOR,
310+
ETHEREUM_CALL_MAX_RETRIES,
311+
ETHEREUM_CALL_MAX_RETRY_DELAY,
289312
)
290-
.sidecar(blob)
291-
.into_transaction_request(),
292-
AlignedProof::Risc0(proof) => {
293-
let encoded_seal = encode_seal(&proof.receipt).map_err(|e| {
294-
AggregatedProofSubmissionError::Risc0EncodingSeal(e.to_string())
295-
})?;
296-
self.proof_aggregation_service
297-
.verifyAggregationRisc0(
298-
blob_versioned_hash.into(),
299-
encoded_seal.into(),
300-
proof.receipt.journal.bytes.into(),
301-
self.risc0_chunk_aggregator_image_id_bytes.into(),
302-
)
303-
.sidecar(blob)
304-
.into_transaction_request()
313+
.await
314+
.map_err(|e| {
315+
error!("Could't get nonce: {:?}", e);
316+
e.inner()
317+
})
305318
}
306-
};
307-
308-
let provider = self.proof_aggregation_service.provider();
309-
let envelope = provider
310-
.fill(tx_req)
311-
.await
312-
.map_err(Self::send_verify_aggregated_proof_err)?
313-
.try_into_envelope()
314-
.map_err(Self::send_verify_aggregated_proof_err)?;
315-
let tx: EthereumTxEnvelope<TxEip4844WithSidecar<BlobTransactionSidecarEip7594>> = envelope
316-
.try_into_pooled()
317-
.map_err(Self::send_verify_aggregated_proof_err)?
318-
.try_map_eip4844(|tx| {
319-
tx.try_map_sidecar(|sidecar| sidecar.try_into_7594(EnvKzgSettings::Default.get()))
320-
})
321-
.map_err(Self::send_verify_aggregated_proof_err)?;
322-
323-
let encoded_tx = tx.encoded_2718();
324-
let pending_tx = provider
325-
.send_raw_transaction(&encoded_tx)
326-
.await
327-
.map_err(Self::send_verify_aggregated_proof_err)?;
328-
329-
let receipt = pending_tx
330-
.get_receipt()
331-
.await
332-
.map_err(Self::send_verify_aggregated_proof_err)?;
333-
334-
Ok(receipt)
335-
}
336-
337-
fn send_verify_aggregated_proof_err<E: ToString>(err: E) -> AggregatedProofSubmissionError {
338-
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string())
319+
}
339320
}
340321

341322
/// ### Blob capacity
@@ -411,6 +392,148 @@ impl ProofAggregator {
411392
}
412393
}
413394

395+
async fn send_proof_to_verify_on_chain(
396+
blob: BlobTransactionSidecar,
397+
blob_versioned_hash: [u8; 32],
398+
aggregated_proof: AlignedProof,
399+
proof_aggregation_service: AlignedProofAggregationServiceContract,
400+
sp1_chunk_aggregator_vk_hash_bytes: [u8; 32],
401+
risc0_chunk_aggregator_image_id_bytes: [u8; 32],
402+
) -> Result<TransactionReceipt, RetryError<AggregatedProofSubmissionError>> {
403+
let tx_req = match aggregated_proof {
404+
AlignedProof::SP1(proof) => proof_aggregation_service
405+
.verifyAggregationSP1(
406+
blob_versioned_hash.into(),
407+
proof.proof_with_pub_values.public_values.to_vec().into(),
408+
proof.proof_with_pub_values.bytes().into(),
409+
sp1_chunk_aggregator_vk_hash_bytes.into(),
410+
)
411+
.sidecar(blob)
412+
.into_transaction_request(),
413+
AlignedProof::Risc0(proof) => {
414+
let encoded_seal = encode_seal(&proof.receipt)
415+
.map_err(|e| AggregatedProofSubmissionError::Risc0EncodingSeal(e.to_string()))
416+
.map_err(RetryError::Transient)?;
417+
proof_aggregation_service
418+
.verifyAggregationRisc0(
419+
blob_versioned_hash.into(),
420+
encoded_seal.into(),
421+
proof.receipt.journal.bytes.into(),
422+
risc0_chunk_aggregator_image_id_bytes.into(),
423+
)
424+
.sidecar(blob)
425+
.into_transaction_request()
426+
}
427+
};
428+
429+
let provider = proof_aggregation_service.provider();
430+
let envelope = provider
431+
.fill(tx_req)
432+
.await
433+
.map_err(|err| {
434+
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string())
435+
})
436+
.map_err(RetryError::Transient)?
437+
.try_into_envelope()
438+
.map_err(|err| {
439+
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string())
440+
})
441+
.map_err(RetryError::Transient)?;
442+
let tx: EthereumTxEnvelope<TxEip4844WithSidecar<BlobTransactionSidecarEip7594>> = envelope
443+
.try_into_pooled()
444+
.map_err(|err| {
445+
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string())
446+
})
447+
.map_err(RetryError::Transient)?
448+
.try_map_eip4844(|tx| {
449+
tx.try_map_sidecar(|sidecar| sidecar.try_into_7594(EnvKzgSettings::Default.get()))
450+
})
451+
.map_err(|err| {
452+
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string())
453+
})
454+
.map_err(RetryError::Transient)?;
455+
456+
let encoded_tx = tx.encoded_2718();
457+
let pending_tx = provider
458+
.send_raw_transaction(&encoded_tx)
459+
.await
460+
.map_err(|err| {
461+
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string())
462+
})
463+
.map_err(RetryError::Transient)?;
464+
465+
let receipt = pending_tx
466+
.get_receipt()
467+
.await
468+
.map_err(|err| {
469+
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string())
470+
})
471+
.map_err(RetryError::Transient)?;
472+
473+
Ok(receipt)
474+
}
475+
476+
use backon::ExponentialBuilder;
477+
use backon::Retryable;
478+
use std::future::Future;
479+
480+
#[derive(Debug)]
481+
pub enum RetryError<E> {
482+
Transient(E),
483+
Permanent(E),
484+
}
485+
486+
impl<E: std::fmt::Display> std::fmt::Display for RetryError<E> {
487+
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
488+
match self {
489+
RetryError::Transient(e) => write!(f, "{}", e),
490+
RetryError::Permanent(e) => write!(f, "{}", e),
491+
}
492+
}
493+
}
494+
495+
impl<E> RetryError<E> {
496+
pub fn inner(self) -> E {
497+
match self {
498+
RetryError::Transient(e) => e,
499+
RetryError::Permanent(e) => e,
500+
}
501+
}
502+
}
503+
504+
impl<E: std::fmt::Display> std::error::Error for RetryError<E> where E: std::fmt::Debug {}
505+
506+
pub const ETHEREUM_CALL_MIN_RETRY_DELAY: u64 = 500; // milliseconds
507+
pub const ETHEREUM_CALL_MAX_RETRIES: usize = 5;
508+
pub const ETHEREUM_CALL_BACKOFF_FACTOR: f32 = 2.0;
509+
pub const ETHEREUM_CALL_MAX_RETRY_DELAY: u64 = 60; // seconds
510+
511+
/// Supports retries only on async functions. See: https://docs.rs/backon/latest/backon/#retry-an-async-function
512+
/// Runs with `jitter: false`.
513+
pub async fn retry_function<FutureFn, Fut, T, E>(
514+
function: FutureFn,
515+
min_delay: u64,
516+
factor: f32,
517+
max_times: usize,
518+
max_delay: u64,
519+
) -> Result<T, RetryError<E>>
520+
where
521+
Fut: Future<Output = Result<T, RetryError<E>>>,
522+
FutureFn: FnMut() -> Fut,
523+
{
524+
let backoff = ExponentialBuilder::default()
525+
.with_min_delay(Duration::from_millis(min_delay))
526+
.with_max_times(max_times)
527+
.with_factor(factor)
528+
.with_max_delay(Duration::from_secs(max_delay));
529+
530+
function
531+
.retry(backoff)
532+
.sleep(tokio::time::sleep)
533+
.when(|e| matches!(e, RetryError::Transient(_)))
534+
.await
535+
}
536+
414537
#[cfg(test)]
415538
mod tests {
416539
use super::*;

0 commit comments

Comments
 (0)