Skip to content

Commit f89957f

Browse files
Move the retryable function to the retry module
1 parent 5828919 commit f89957f

File tree

3 files changed

+150
-150
lines changed

3 files changed

+150
-150
lines changed

aggregation_mode/proof_aggregator/src/backend/mod.rs

Lines changed: 9 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::{
1010
aggregators::{AlignedProof, ProofAggregationError, ZKVMEngine},
1111
backend::{
1212
db::{Db, DbError},
13-
retry::{retry_function, RetryError},
13+
retry::{retry_function, wait_and_send_proof_to_verify_on_chain},
1414
},
1515
};
1616

@@ -19,22 +19,20 @@ use aligned_sdk::common::constants::{
1919
ETHEREUM_CALL_MIN_RETRY_DELAY,
2020
};
2121
use alloy::{
22-
consensus::{BlobTransactionSidecar, EnvKzgSettings, EthereumTxEnvelope, TxEip4844WithSidecar},
23-
eips::{eip4844::BYTES_PER_BLOB, eip7594::BlobTransactionSidecarEip7594, Encodable2718},
22+
consensus::BlobTransactionSidecar,
23+
eips::eip4844::BYTES_PER_BLOB,
2424
hex,
2525
network::EthereumWallet,
26-
primitives::{utils::parse_ether, Address, U256},
27-
providers::{PendingTransactionError, Provider, ProviderBuilder},
26+
primitives::{utils::parse_ether, Address},
27+
providers::{PendingTransactionError, ProviderBuilder},
2828
rpc::types::TransactionReceipt,
2929
signers::local::LocalSigner,
3030
};
3131
use config::Config;
3232
use fetcher::{ProofsFetcher, ProofsFetcherError};
3333
use merkle_tree::compute_proofs_merkle_root;
34-
use risc0_ethereum_contracts::encode_seal;
3534
use sqlx::types::Uuid;
36-
use std::thread::sleep;
37-
use std::{str::FromStr, time::Duration};
35+
use std::str::FromStr;
3836
use tracing::{error, info, warn};
3937
use types::{AlignedProofAggregationService, AlignedProofAggregationServiceContract};
4038

@@ -319,136 +317,14 @@ impl ProofAggregator {
319317
}
320318
}
321319

322-
async fn wait_until_can_submit_aggregated_proof(
323-
proof_aggregation_service: AlignedProofAggregationServiceContract,
324-
monthly_budget_eth: f64,
325-
) -> Result<(), RetryError<AggregatedProofSubmissionError>> {
326-
// We start on 24 hours because the proof aggregator runs once a day, so the time elapsed
327-
// should be considered over a 24h period.
328-
let mut time_elapsed = Duration::from_secs(24 * 3600);
329-
330-
// Iterate until we can send the proof on-chain
331-
loop {
332-
// Fetch gas price from network
333-
let gas_price = proof_aggregation_service
334-
.provider()
335-
.get_gas_price()
336-
.await
337-
.map_err(|e| {
338-
RetryError::Transient(AggregatedProofSubmissionError::GasPriceError(e.to_string()))
339-
})?;
340-
341-
if helpers::should_send_proof_to_verify_on_chain(
342-
time_elapsed,
343-
monthly_budget_eth,
344-
U256::from(gas_price),
345-
) {
346-
break;
347-
} else {
348-
info!("Skipping sending proof to ProofAggregationService contract due to budget/time constraints.");
349-
}
350-
351-
// Sleep for 3 minutes (15 blocks) before re-evaluating
352-
let time_to_sleep = Duration::from_secs(180);
353-
time_elapsed += time_to_sleep;
354-
sleep(time_to_sleep);
355-
}
356-
357-
Ok(())
358-
}
359-
360-
async fn wait_and_send_proof_to_verify_on_chain(
361-
blob: BlobTransactionSidecar,
362-
blob_versioned_hash: [u8; 32],
363-
aggregated_proof: AlignedProof,
364-
proof_aggregation_service: AlignedProofAggregationServiceContract,
365-
sp1_chunk_aggregator_vk_hash_bytes: [u8; 32],
366-
risc0_chunk_aggregator_image_id_bytes: [u8; 32],
367-
monthly_budget_eth: f64,
368-
) -> Result<TransactionReceipt, RetryError<AggregatedProofSubmissionError>> {
369-
wait_until_can_submit_aggregated_proof(proof_aggregation_service.clone(), monthly_budget_eth)
370-
.await?;
371-
372-
info!("Sending proof to ProofAggregationService contract...");
373-
374-
let tx_req = match aggregated_proof {
375-
AlignedProof::SP1(proof) => proof_aggregation_service
376-
.verifyAggregationSP1(
377-
blob_versioned_hash.into(),
378-
proof.proof_with_pub_values.public_values.to_vec().into(),
379-
proof.proof_with_pub_values.bytes().into(),
380-
sp1_chunk_aggregator_vk_hash_bytes.into(),
381-
)
382-
.sidecar(blob)
383-
.into_transaction_request(),
384-
AlignedProof::Risc0(proof) => {
385-
let encoded_seal = encode_seal(&proof.receipt)
386-
.map_err(|e| AggregatedProofSubmissionError::Risc0EncodingSeal(e.to_string()))
387-
.map_err(RetryError::Transient)?;
388-
proof_aggregation_service
389-
.verifyAggregationRisc0(
390-
blob_versioned_hash.into(),
391-
encoded_seal.into(),
392-
proof.receipt.journal.bytes.into(),
393-
risc0_chunk_aggregator_image_id_bytes.into(),
394-
)
395-
.sidecar(blob)
396-
.into_transaction_request()
397-
}
398-
};
399-
400-
let provider = proof_aggregation_service.provider();
401-
let envelope = provider
402-
.fill(tx_req)
403-
.await
404-
.map_err(|err| {
405-
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string())
406-
})
407-
.map_err(RetryError::Transient)?
408-
.try_into_envelope()
409-
.map_err(|err| {
410-
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string())
411-
})
412-
.map_err(RetryError::Transient)?;
413-
let tx: EthereumTxEnvelope<TxEip4844WithSidecar<BlobTransactionSidecarEip7594>> = envelope
414-
.try_into_pooled()
415-
.map_err(|err| {
416-
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string())
417-
})
418-
.map_err(RetryError::Transient)?
419-
.try_map_eip4844(|tx| {
420-
tx.try_map_sidecar(|sidecar| sidecar.try_into_7594(EnvKzgSettings::Default.get()))
421-
})
422-
.map_err(|err| {
423-
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string())
424-
})
425-
.map_err(RetryError::Transient)?;
426-
427-
let encoded_tx = tx.encoded_2718();
428-
let pending_tx = provider
429-
.send_raw_transaction(&encoded_tx)
430-
.await
431-
.map_err(|err| {
432-
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string())
433-
})
434-
.map_err(RetryError::Transient)?;
435-
436-
let receipt = pending_tx
437-
.get_receipt()
438-
.await
439-
.map_err(|err| {
440-
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string())
441-
})
442-
.map_err(RetryError::Transient)?;
443-
444-
Ok(receipt)
445-
}
446-
447320
#[cfg(test)]
448321
mod tests {
322+
449323
use super::*;
450324

325+
use alloy::primitives::U256;
451326
use helpers::should_send_proof_to_verify_on_chain;
327+
use std::time::Duration;
452328

453329
#[test]
454330
fn test_should_send_proof_to_verify_on_chain_updated_cases() {

aggregation_mode/proof_aggregator/src/backend/retry.rs

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,22 @@ use backon::ExponentialBuilder;
22
use backon::Retryable;
33
use std::future::Future;
44
use std::time::Duration;
5+
use tracing::info;
6+
7+
use crate::aggregators::AlignedProof;
8+
use crate::backend::types::AlignedProofAggregationServiceContract;
9+
use crate::backend::AggregatedProofSubmissionError;
10+
11+
use crate::backend::helpers;
12+
use alloy::{
13+
consensus::{BlobTransactionSidecar, EnvKzgSettings, EthereumTxEnvelope, TxEip4844WithSidecar},
14+
eips::{eip7594::BlobTransactionSidecarEip7594, Encodable2718},
15+
primitives::U256,
16+
providers::Provider,
17+
rpc::types::TransactionReceipt,
18+
};
19+
use risc0_ethereum_contracts::encode_seal;
20+
use std::thread::sleep;
521

622
#[derive(Debug)]
723
pub enum RetryError<E> {
@@ -54,3 +70,128 @@ where
5470
.when(|e| matches!(e, RetryError::Transient(_)))
5571
.await
5672
}
73+
74+
async fn wait_until_can_submit_aggregated_proof(
75+
proof_aggregation_service: AlignedProofAggregationServiceContract,
76+
monthly_budget_eth: f64,
77+
) -> Result<(), RetryError<AggregatedProofSubmissionError>> {
78+
// We start on 24 hours because the proof aggregator runs once a day, so the time elapsed
79+
// should be considered over a 24h period.
80+
let mut time_elapsed = Duration::from_secs(24 * 3600);
81+
82+
// Iterate until we can send the proof on-chain
83+
loop {
84+
// Fetch gas price from network
85+
let gas_price = proof_aggregation_service
86+
.provider()
87+
.get_gas_price()
88+
.await
89+
.map_err(|e| {
90+
RetryError::Transient(AggregatedProofSubmissionError::GasPriceError(e.to_string()))
91+
})?;
92+
93+
if helpers::should_send_proof_to_verify_on_chain(
94+
time_elapsed,
95+
monthly_budget_eth,
96+
U256::from(gas_price),
97+
) {
98+
break;
99+
} else {
100+
info!("Skipping sending proof to ProofAggregationService contract due to budget/time constraints.");
101+
}
102+
103+
// Sleep for 3 minutes (15 blocks) before re-evaluating
104+
let time_to_sleep = Duration::from_secs(180);
105+
time_elapsed += time_to_sleep;
106+
sleep(time_to_sleep);
107+
}
108+
109+
Ok(())
110+
}
111+
112+
pub async fn wait_and_send_proof_to_verify_on_chain(
113+
blob: BlobTransactionSidecar,
114+
blob_versioned_hash: [u8; 32],
115+
aggregated_proof: AlignedProof,
116+
proof_aggregation_service: AlignedProofAggregationServiceContract,
117+
sp1_chunk_aggregator_vk_hash_bytes: [u8; 32],
118+
risc0_chunk_aggregator_image_id_bytes: [u8; 32],
119+
monthly_budget_eth: f64,
120+
) -> Result<TransactionReceipt, RetryError<AggregatedProofSubmissionError>> {
121+
wait_until_can_submit_aggregated_proof(proof_aggregation_service.clone(), monthly_budget_eth)
122+
.await?;
123+
124+
info!("Sending proof to ProofAggregationService contract...");
125+
126+
let tx_req = match aggregated_proof {
127+
AlignedProof::SP1(proof) => proof_aggregation_service
128+
.verifyAggregationSP1(
129+
blob_versioned_hash.into(),
130+
proof.proof_with_pub_values.public_values.to_vec().into(),
131+
proof.proof_with_pub_values.bytes().into(),
132+
sp1_chunk_aggregator_vk_hash_bytes.into(),
133+
)
134+
.sidecar(blob)
135+
.into_transaction_request(),
136+
AlignedProof::Risc0(proof) => {
137+
let encoded_seal = encode_seal(&proof.receipt)
138+
.map_err(|e| AggregatedProofSubmissionError::Risc0EncodingSeal(e.to_string()))
139+
.map_err(RetryError::Transient)?;
140+
proof_aggregation_service
141+
.verifyAggregationRisc0(
142+
blob_versioned_hash.into(),
143+
encoded_seal.into(),
144+
proof.receipt.journal.bytes.into(),
145+
risc0_chunk_aggregator_image_id_bytes.into(),
146+
)
147+
.sidecar(blob)
148+
.into_transaction_request()
149+
}
150+
};
151+
152+
let provider = proof_aggregation_service.provider();
153+
let envelope = provider
154+
.fill(tx_req)
155+
.await
156+
.map_err(|err| {
157+
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string())
158+
})
159+
.map_err(RetryError::Transient)?
160+
.try_into_envelope()
161+
.map_err(|err| {
162+
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string())
163+
})
164+
.map_err(RetryError::Transient)?;
165+
let tx: EthereumTxEnvelope<TxEip4844WithSidecar<BlobTransactionSidecarEip7594>> = envelope
166+
.try_into_pooled()
167+
.map_err(|err| {
168+
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string())
169+
})
170+
.map_err(RetryError::Transient)?
171+
.try_map_eip4844(|tx| {
172+
tx.try_map_sidecar(|sidecar| sidecar.try_into_7594(EnvKzgSettings::Default.get()))
173+
})
174+
.map_err(|err| {
175+
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string())
176+
})
177+
.map_err(RetryError::Transient)?;
178+
179+
let encoded_tx = tx.encoded_2718();
180+
let pending_tx = provider
181+
.send_raw_transaction(&encoded_tx)
182+
.await
183+
.map_err(|err| {
184+
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string())
185+
})
186+
.map_err(RetryError::Transient)?;
187+
188+
let receipt = pending_tx
189+
.get_receipt()
190+
.await
191+
.map_err(|err| {
192+
AggregatedProofSubmissionError::SendVerifyAggregatedProofTransaction(err.to_string())
193+
})
194+
.map_err(RetryError::Transient)?;
195+
196+
Ok(receipt)
197+
}

aggregation_mode/proof_aggregator/src/backend/types.rs

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -30,20 +30,3 @@ pub type AlignedProofAggregationServiceContract = AlignedProofAggregationService
3030
RootProvider,
3131
>,
3232
>;
33-
34-
pub type RPCProvider = alloy::providers::fillers::FillProvider<
35-
alloy::providers::fillers::JoinFill<
36-
alloy::providers::Identity,
37-
alloy::providers::fillers::JoinFill<
38-
alloy::providers::fillers::GasFiller,
39-
alloy::providers::fillers::JoinFill<
40-
alloy::providers::fillers::BlobGasFiller,
41-
alloy::providers::fillers::JoinFill<
42-
alloy::providers::fillers::NonceFiller,
43-
alloy::providers::fillers::ChainIdFiller,
44-
>,
45-
>,
46-
>,
47-
>,
48-
alloy::providers::RootProvider,
49-
>;

0 commit comments

Comments
 (0)