Skip to content

Commit 5a728d1

Browse files
Move the retryables functions back to the ProofAggregatorStruct
1 parent 5587f2d commit 5a728d1

File tree

2 files changed

+159
-155
lines changed

2 files changed

+159
-155
lines changed

aggregation_mode/proof_aggregator/src/backend/mod.rs

Lines changed: 159 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ use crate::{
1010
aggregators::{AlignedProof, ProofAggregationError, ZKVMEngine},
1111
backend::{
1212
db::{Db, DbError},
13-
retry::{retry_function, wait_and_send_proof_to_verify_on_chain},
13+
retry::{retry_function, RetryError},
14+
types::{AlignedProofAggregationService, AlignedProofAggregationServiceContract},
1415
},
1516
};
1617

@@ -19,22 +20,24 @@ use aligned_sdk::common::constants::{
1920
ETHEREUM_CALL_MIN_RETRY_DELAY,
2021
};
2122
use alloy::{
22-
consensus::BlobTransactionSidecar,
23-
eips::eip4844::BYTES_PER_BLOB,
23+
consensus::{BlobTransactionSidecar, EnvKzgSettings, EthereumTxEnvelope, TxEip4844WithSidecar},
24+
eips::{eip4844::BYTES_PER_BLOB, eip7594::BlobTransactionSidecarEip7594, Encodable2718},
2425
hex,
2526
network::EthereumWallet,
26-
primitives::{utils::parse_ether, Address},
27-
providers::{PendingTransactionError, ProviderBuilder},
27+
primitives::{utils::parse_ether, Address, U256},
28+
providers::{PendingTransactionError, Provider, ProviderBuilder},
2829
rpc::types::TransactionReceipt,
2930
signers::local::LocalSigner,
3031
};
3132
use config::Config;
3233
use fetcher::{ProofsFetcher, ProofsFetcherError};
3334
use merkle_tree::compute_proofs_merkle_root;
35+
use risc0_ethereum_contracts::encode_seal;
3436
use sqlx::types::Uuid;
35-
use std::str::FromStr;
36-
use tracing::{error, info, warn};
37-
use types::{AlignedProofAggregationService, AlignedProofAggregationServiceContract};
37+
use std::{str::FromStr, time::Duration};
38+
use tokio::time::sleep;
39+
use tracing::info;
40+
use tracing::{error, warn};
3841

3942
#[derive(Debug)]
4043
pub enum AggregatedProofSubmissionError {
@@ -218,7 +221,7 @@ impl ProofAggregator {
218221
) -> Result<TransactionReceipt, AggregatedProofSubmissionError> {
219222
retry_function(
220223
|| {
221-
wait_and_send_proof_to_verify_on_chain(
224+
Self::wait_and_send_proof_to_verify_on_chain(
222225
blob.clone(),
223226
blob_versioned_hash,
224227
&aggregated_proof,
@@ -311,6 +314,153 @@ impl ProofAggregator {
311314

312315
Ok((blob, blob_versioned_hash))
313316
}
317+
318+
async fn wait_until_can_submit_aggregated_proof(
319+
proof_aggregation_service: AlignedProofAggregationServiceContract,
320+
monthly_budget_eth: f64,
321+
) -> Result<(), RetryError<AggregatedProofSubmissionError>> {
322+
info!("Started waiting until we can submit the aggregated proof.");
323+
324+
// We start on 24 hours because the proof aggregator runs once a day, so the time elapsed
325+
// should be considered over a 24h period.
326+
let mut time_elapsed = Duration::from_secs(24 * 3600);
327+
328+
// Sleep for 3 minutes (15 blocks) before re-evaluating on each iteration
329+
let time_to_sleep = Duration::from_secs(180);
330+
331+
// Iterate until we can send the proof on-chain
332+
loop {
333+
// Fetch gas price from network
334+
let gas_price = proof_aggregation_service
335+
.provider()
336+
.get_gas_price()
337+
.await
338+
.map_err(|e| {
339+
RetryError::Transient(AggregatedProofSubmissionError::GasPriceError(
340+
e.to_string(),
341+
))
342+
})?;
343+
344+
info!("Fetched gas price from network: {gas_price}");
345+
346+
if helpers::should_send_proof_to_verify_on_chain(
347+
time_elapsed,
348+
monthly_budget_eth,
349+
U256::from(gas_price),
350+
) {
351+
break;
352+
} else {
353+
info!("Skipping sending proof to ProofAggregationService contract due to budget/time constraints.");
354+
}
355+
356+
time_elapsed += time_to_sleep;
357+
sleep(time_to_sleep).await;
358+
}
359+
360+
Ok(())
361+
}
362+
363+
pub async fn wait_and_send_proof_to_verify_on_chain(
364+
blob: BlobTransactionSidecar,
365+
blob_versioned_hash: [u8; 32],
366+
aggregated_proof: &AlignedProof,
367+
proof_aggregation_service: AlignedProofAggregationServiceContract,
368+
sp1_chunk_aggregator_vk_hash_bytes: [u8; 32],
369+
risc0_chunk_aggregator_image_id_bytes: [u8; 32],
370+
monthly_budget_eth: f64,
371+
) -> Result<TransactionReceipt, RetryError<AggregatedProofSubmissionError>> {
372+
Self::wait_until_can_submit_aggregated_proof(
373+
proof_aggregation_service.clone(),
374+
monthly_budget_eth,
375+
)
376+
.await?;
377+
378+
info!("Sending proof to ProofAggregationService contract...");
379+
380+
let tx_req = match aggregated_proof {
381+
AlignedProof::SP1(proof) => proof_aggregation_service
382+
.verifyAggregationSP1(
383+
blob_versioned_hash.into(),
384+
proof.proof_with_pub_values.public_values.to_vec().into(),
385+
proof.proof_with_pub_values.bytes().into(),
386+
sp1_chunk_aggregator_vk_hash_bytes.into(),
387+
)
388+
.sidecar(blob)
389+
.into_transaction_request(),
390+
AlignedProof::Risc0(proof) => {
391+
let encoded_seal = encode_seal(&proof.receipt)
392+
.map_err(|e| AggregatedProofSubmissionError::Risc0EncodingSeal(e.to_string()))
393+
.map_err(RetryError::Permanent)?;
394+
proof_aggregation_service
395+
.verifyAggregationRisc0(
396+
blob_versioned_hash.into(),
397+
encoded_seal.into(),
398+
proof.receipt.journal.bytes.clone().into(),
399+
risc0_chunk_aggregator_image_id_bytes.into(),
400+
)
401+
.sidecar(blob)
402+
.into_transaction_request()
403+
}
404+
};
405+
406+
let provider = proof_aggregation_service.provider();
407+
let envelope = provider
408+
.fill(tx_req)
409+
.await
410+
.map_err(|err| {
411+
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(
412+
err.to_string(),
413+
)
414+
})
415+
.map_err(RetryError::Transient)?
416+
.try_into_envelope()
417+
.map_err(|err| {
418+
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(
419+
err.to_string(),
420+
)
421+
})
422+
.map_err(RetryError::Transient)?;
423+
let tx: EthereumTxEnvelope<TxEip4844WithSidecar<BlobTransactionSidecarEip7594>> = envelope
424+
.try_into_pooled()
425+
.map_err(|err| {
426+
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(
427+
err.to_string(),
428+
)
429+
})
430+
.map_err(RetryError::Transient)?
431+
.try_map_eip4844(|tx| {
432+
tx.try_map_sidecar(|sidecar| sidecar.try_into_7594(EnvKzgSettings::Default.get()))
433+
})
434+
.map_err(|err| {
435+
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(
436+
err.to_string(),
437+
)
438+
})
439+
.map_err(RetryError::Transient)?;
440+
441+
let encoded_tx = tx.encoded_2718();
442+
let pending_tx = provider
443+
.send_raw_transaction(&encoded_tx)
444+
.await
445+
.map_err(|err| {
446+
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(
447+
err.to_string(),
448+
)
449+
})
450+
.map_err(RetryError::Transient)?;
451+
452+
let receipt = pending_tx
453+
.get_receipt()
454+
.await
455+
.map_err(|err| {
456+
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(
457+
err.to_string(),
458+
)
459+
})
460+
.map_err(RetryError::Transient)?;
461+
462+
Ok(receipt)
463+
}
314464
}
315465

316466
#[cfg(test)]

aggregation_mode/proof_aggregator/src/backend/retry.rs

Lines changed: 0 additions & 146 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,6 @@ use backon::ExponentialBuilder;
22
use backon::Retryable;
33
use std::future::Future;
44
use std::time::Duration;
5-
use tracing::info;
6-
7-
use crate::aggregators::AlignedProof;
8-
use crate::backend::types::AlignedProofAggregationServiceContract;
9-
use crate::backend::AggregatedProofSubmissionError;
10-
11-
use crate::backend::helpers;
12-
use alloy::{
13-
consensus::{BlobTransactionSidecar, EnvKzgSettings, EthereumTxEnvelope, TxEip4844WithSidecar},
14-
eips::{eip7594::BlobTransactionSidecarEip7594, Encodable2718},
15-
primitives::U256,
16-
providers::Provider,
17-
rpc::types::TransactionReceipt,
18-
};
19-
use risc0_ethereum_contracts::encode_seal;
20-
use tokio::time::sleep;
215

226
#[derive(Debug)]
237
pub enum RetryError<E> {
@@ -70,133 +54,3 @@ where
7054
.when(|e| matches!(e, RetryError::Transient(_)))
7155
.await
7256
}
73-
74-
async fn wait_until_can_submit_aggregated_proof(
75-
proof_aggregation_service: AlignedProofAggregationServiceContract,
76-
monthly_budget_eth: f64,
77-
) -> Result<(), RetryError<AggregatedProofSubmissionError>> {
78-
info!("Started waiting until we can submit the aggregated proof.");
79-
80-
// We start on 24 hours because the proof aggregator runs once a day, so the time elapsed
81-
// should be considered over a 24h period.
82-
let mut time_elapsed = Duration::from_secs(24 * 3600);
83-
84-
// Sleep for 3 minutes (15 blocks) before re-evaluating on each iteration
85-
let time_to_sleep = Duration::from_secs(180);
86-
87-
// Iterate until we can send the proof on-chain
88-
loop {
89-
// Fetch gas price from network
90-
let gas_price = proof_aggregation_service
91-
.provider()
92-
.get_gas_price()
93-
.await
94-
.map_err(|e| {
95-
RetryError::Transient(AggregatedProofSubmissionError::GasPriceError(e.to_string()))
96-
})?;
97-
98-
info!("Fetched gas price from network: {gas_price}");
99-
100-
if helpers::should_send_proof_to_verify_on_chain(
101-
time_elapsed,
102-
monthly_budget_eth,
103-
U256::from(gas_price),
104-
) {
105-
break;
106-
} else {
107-
info!("Skipping sending proof to ProofAggregationService contract due to budget/time constraints.");
108-
}
109-
110-
time_elapsed += time_to_sleep;
111-
sleep(time_to_sleep).await;
112-
}
113-
114-
Ok(())
115-
}
116-
117-
pub async fn wait_and_send_proof_to_verify_on_chain(
118-
blob: BlobTransactionSidecar,
119-
blob_versioned_hash: [u8; 32],
120-
aggregated_proof: &AlignedProof,
121-
proof_aggregation_service: AlignedProofAggregationServiceContract,
122-
sp1_chunk_aggregator_vk_hash_bytes: [u8; 32],
123-
risc0_chunk_aggregator_image_id_bytes: [u8; 32],
124-
monthly_budget_eth: f64,
125-
) -> Result<TransactionReceipt, RetryError<AggregatedProofSubmissionError>> {
126-
wait_until_can_submit_aggregated_proof(proof_aggregation_service.clone(), monthly_budget_eth)
127-
.await?;
128-
129-
info!("Sending proof to ProofAggregationService contract...");
130-
131-
let tx_req = match aggregated_proof {
132-
AlignedProof::SP1(proof) => proof_aggregation_service
133-
.verifyAggregationSP1(
134-
blob_versioned_hash.into(),
135-
proof.proof_with_pub_values.public_values.to_vec().into(),
136-
proof.proof_with_pub_values.bytes().into(),
137-
sp1_chunk_aggregator_vk_hash_bytes.into(),
138-
)
139-
.sidecar(blob)
140-
.into_transaction_request(),
141-
AlignedProof::Risc0(proof) => {
142-
let encoded_seal = encode_seal(&proof.receipt)
143-
.map_err(|e| AggregatedProofSubmissionError::Risc0EncodingSeal(e.to_string()))
144-
.map_err(RetryError::Permanent)?;
145-
proof_aggregation_service
146-
.verifyAggregationRisc0(
147-
blob_versioned_hash.into(),
148-
encoded_seal.into(),
149-
proof.receipt.journal.bytes.clone().into(),
150-
risc0_chunk_aggregator_image_id_bytes.into(),
151-
)
152-
.sidecar(blob)
153-
.into_transaction_request()
154-
}
155-
};
156-
157-
let provider = proof_aggregation_service.provider();
158-
let envelope = provider
159-
.fill(tx_req)
160-
.await
161-
.map_err(|err| {
162-
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string())
163-
})
164-
.map_err(RetryError::Transient)?
165-
.try_into_envelope()
166-
.map_err(|err| {
167-
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string())
168-
})
169-
.map_err(RetryError::Transient)?;
170-
let tx: EthereumTxEnvelope<TxEip4844WithSidecar<BlobTransactionSidecarEip7594>> = envelope
171-
.try_into_pooled()
172-
.map_err(|err| {
173-
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string())
174-
})
175-
.map_err(RetryError::Transient)?
176-
.try_map_eip4844(|tx| {
177-
tx.try_map_sidecar(|sidecar| sidecar.try_into_7594(EnvKzgSettings::Default.get()))
178-
})
179-
.map_err(|err| {
180-
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string())
181-
})
182-
.map_err(RetryError::Transient)?;
183-
184-
let encoded_tx = tx.encoded_2718();
185-
let pending_tx = provider
186-
.send_raw_transaction(&encoded_tx)
187-
.await
188-
.map_err(|err| {
189-
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string())
190-
})
191-
.map_err(RetryError::Transient)?;
192-
193-
let receipt = pending_tx
194-
.get_receipt()
195-
.await
196-
.map_err(|err| {
197-
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string())
198-
})
199-
.map_err(RetryError::Transient)?;
200-
201-
Ok(receipt)
202-
}

0 commit comments

Comments
 (0)