Skip to content

Commit 1be8530

Browse files
committed
feat: remove queue and returns proofs from fetcher
1 parent 1ab8c7e commit 1be8530

File tree

4 files changed

+29
-73
lines changed

4 files changed

+29
-73
lines changed

aggregation-mode/src/backend/config.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ pub struct Config {
1717
pub proof_aggregation_service_address: String,
1818
pub aligned_service_manager_address: String,
1919
pub ecdsa: ECDSAConfig,
20-
pub bucket_name: String,
2120
}
2221

2322
impl Config {

aggregation-mode/src/backend/fetcher.rs

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ use std::str::FromStr;
22

33
use super::{
44
config::Config,
5-
queue::ProofsQueue,
65
types::{AlignedLayerServiceManager, AlignedLayerServiceManagerContract},
76
};
87
use crate::{
@@ -13,11 +12,11 @@ use aligned_sdk::core::types::ProvingSystemId;
1312
use alloy::{primitives::Address, providers::ProviderBuilder};
1413
use tracing::{error, info};
1514

16-
/// This services is in charge of:
17-
/// 1. Listens to aligned new batch task
18-
/// 2. Downloads proofs from S3 bucket
19-
/// 3. Filter supported proofs to be aggregated
20-
/// 4. Push the proofs to the queue
15+
#[derive(Debug)]
16+
pub enum ProofsFetcherError {
17+
QueryingLogs,
18+
}
19+
2120
pub struct ProofsFetcher {
2221
aligned_service_manager: AlignedLayerServiceManagerContract,
2322
}
@@ -37,8 +36,7 @@ impl ProofsFetcher {
3736
}
3837
}
3938

40-
// TODO: remove panic and return proofs instead of taking queue
41-
pub async fn fetch(&self, queue: &mut ProofsQueue) {
39+
pub async fn fetch(&self) -> Result<Vec<Proof>, ProofsFetcherError> {
4240
info!("Fetching proofs from batch logs");
4341
// Subscribe to NewBatch event from AlignedServiceManager
4442
let logs = self
@@ -47,10 +45,12 @@ impl ProofsFetcher {
4745
.from_block(0)
4846
.query()
4947
.await
50-
.expect("to get logs");
48+
.map_err(|_| ProofsFetcherError::QueryingLogs)?;
5149

5250
info!("Logs collected {}", logs.len());
5351

52+
let mut proofs = vec![];
53+
5454
for (batch, _) in logs {
5555
info!(
5656
"New batch submitted, about to process. Batch merkle root {}...",
@@ -69,7 +69,7 @@ impl ProofsFetcher {
6969
info!("Data downloaded from S3, number of proofs {}", data.len());
7070

7171
// Filter SP1 compressed proofs to and push to queue to be aggregated
72-
let proofs: Vec<Proof> = data
72+
let proofs_to_add: Vec<Proof> = data
7373
.into_iter()
7474
.filter_map(|p| match p.proving_system {
7575
ProvingSystemId::SP1 => {
@@ -86,15 +86,16 @@ impl ProofsFetcher {
8686
info!("SP1 proofs filtered, total proofs to add {}", proofs.len());
8787

8888
// try to add them to the queue
89-
for proof in proofs {
90-
match queue.add_proof(proof) {
91-
Ok(_) => info!(
92-
"New proof added to queue, current length {}",
93-
queue.proofs().len()
94-
),
95-
Err(e) => error!("Could not add proof, reason: {:?}", e),
89+
for proof in proofs_to_add {
90+
if let Err(err) = proof.verify() {
91+
error!("Could not add proof, verification failed: {:?}", err);
92+
continue;
9693
};
94+
95+
proofs.push(proof);
9796
}
9897
}
98+
99+
Ok(proofs)
99100
}
100101
}

aggregation-mode/src/backend/mod.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
pub mod config;
22
pub mod fetcher;
33
mod merkle_tree;
4-
pub mod queue;
54
mod s3;
65
mod types;
76

@@ -18,28 +17,27 @@ use alloy::{
1817
signers::local::LocalSigner,
1918
};
2019
use config::Config;
21-
use fetcher::ProofsFetcher;
20+
use fetcher::{ProofsFetcher, ProofsFetcherError};
2221
use merkle_tree::compute_proofs_merkle_root;
23-
use queue::ProofsQueue;
2422
use sp1_sdk::HashableKey;
2523
use std::{str::FromStr, time::Duration};
2624
use tracing::{error, info, warn};
2725
use types::{AlignedProofAggregationService, AlignedProofAggregationServiceContract};
2826

2927
#[derive(Debug)]
30-
enum AggregatedProofSubmissionError {
28+
pub enum AggregatedProofSubmissionError {
3129
Aggregation(ProofAggregationError),
3230
SendBlobTransaction,
3331
SendVerifyAggregatedProofTransaction(alloy::contract::Error),
3432
ReceiptError(PendingTransactionError),
33+
FetchingProofs(ProofsFetcherError),
3534
}
3635

3736
pub struct ProofAggregator {
3837
engine: ZKVMEngine,
3938
submit_proof_every_secs: u64,
4039
proof_aggregation_service: AlignedProofAggregationServiceContract,
4140
fetcher: ProofsFetcher,
42-
queue: ProofsQueue,
4341
}
4442

4543
impl ProofAggregator {
@@ -63,7 +61,6 @@ impl ProofAggregator {
6361
engine: ZKVMEngine::SP1,
6462
submit_proof_every_secs: config.submit_proofs_every_secs,
6563
proof_aggregation_service,
66-
queue: ProofsQueue::new(config.max_proofs_in_queue),
6764
fetcher,
6865
}
6966
}
@@ -99,8 +96,11 @@ impl ProofAggregator {
9996
async fn aggregate_and_submit_proofs_on_chain(
10097
&mut self,
10198
) -> Result<(), AggregatedProofSubmissionError> {
102-
self.fetcher.fetch(&mut self.queue).await;
103-
let proofs = self.queue.clear();
99+
let proofs = self
100+
.fetcher
101+
.fetch()
102+
.await
103+
.map_err(AggregatedProofSubmissionError::FetchingProofs)?;
104104

105105
if proofs.len() == 0 {
106106
warn!("No proofs in queue, skipping iteration...");
@@ -130,14 +130,14 @@ impl ProofAggregator {
130130

131131
info!("Sending blob transaction...");
132132
let blob_tx_hash = self.send_blob_transaction(leaves).await?;
133-
info!("Blob transaction sen, hash: {:?}", blob_tx_hash);
133+
info!("Blob transaction sent, hash: {:?}", blob_tx_hash);
134134

135135
info!("Sending proof to ProofAggregationService contract...");
136136
let receipt = self
137137
.send_proof_to_verify_on_chain(&blob_tx_hash, output.proof)
138138
.await?;
139139
info!(
140-
"Proof sent anv verified, tx hash {:?}",
140+
"Proof sent and verified, tx hash {:?}",
141141
receipt.transaction_hash
142142
);
143143

@@ -175,7 +175,7 @@ impl ProofAggregator {
175175
// TODO
176176
async fn send_blob_transaction(
177177
&self,
178-
leaves: Vec<[u8; 32]>,
178+
_leaves: Vec<[u8; 32]>,
179179
) -> Result<[u8; 32], AggregatedProofSubmissionError> {
180180
Ok([0u8; 32])
181181
}

aggregation-mode/src/backend/queue.rs

Lines changed: 0 additions & 44 deletions
This file was deleted.

0 commit comments

Comments
 (0)