Skip to content

Commit f7ed7b9

Browse files
committed
feat: proofs fetcher
1 parent f9a8acc commit f7ed7b9

File tree

3 files changed

+127
-0
lines changed

3 files changed

+127
-0
lines changed

aggregation-mode/abi/AlignedLayerServiceManager.json

Lines changed: 1 addition & 0 deletions
Large diffs are not rendered by default.
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
use std::{str::FromStr, sync::Arc};
2+
3+
use super::{
4+
config::Config,
5+
queue::ProofsQueue,
6+
s3::S3Client,
7+
types::{AlignedLayerServiceManager, AlignedLayerServiceManagerContract},
8+
};
9+
use crate::zk::{
10+
backends::sp1::{vk_from_elf, SP1Proof},
11+
Proof,
12+
};
13+
use aligned_sdk::core::types::ProvingSystemId;
14+
use alloy::{
15+
primitives::Address,
16+
providers::{ProviderBuilder, WsConnect},
17+
};
18+
use futures_util::stream::StreamExt;
19+
use tokio::sync::Mutex;
20+
use tracing::{error, info};
21+
22+
/// This services is in charge of:
23+
/// 1. Listens to aligned new batch task
24+
/// 2. Downloads proofs from S3 bucket
25+
/// 3. Filter supported proofs to be aggregated
26+
/// 4. Push the proofs to the queue
27+
pub struct ProofsFetcher {
28+
aligned_service_manager: AlignedLayerServiceManagerContract,
29+
s3: S3Client,
30+
queue: Arc<Mutex<ProofsQueue>>,
31+
}
32+
33+
impl ProofsFetcher {
34+
pub async fn new(config: &Config, queue: Arc<Mutex<ProofsQueue>>) -> Self {
35+
let ws = WsConnect::new(&config.eth_ws_url);
36+
let provider = ProviderBuilder::new()
37+
.on_ws(ws)
38+
.await
39+
.expect("Successful connection");
40+
41+
let aligned_service_manager = AlignedLayerServiceManager::new(
42+
Address::from_str(&config.aligned_service_manager_address)
43+
.expect("Address to be correct"),
44+
provider,
45+
);
46+
47+
let s3 = S3Client::new(config.bucket_name.clone(), None).await;
48+
49+
Self {
50+
aligned_service_manager,
51+
s3,
52+
queue,
53+
}
54+
}
55+
56+
pub async fn start(&self) {
57+
// Subscribe to NewBatch event from AlignedServiceManager
58+
let event_sub = self
59+
.aligned_service_manager
60+
.NewBatch_filter()
61+
.subscribe()
62+
.await
63+
.expect("To subscribe to event");
64+
let mut stream = event_sub.into_stream();
65+
66+
while let Some(log) = stream.next().await {
67+
let Ok(log) = log else {
68+
continue;
69+
};
70+
71+
// Download batch proofs from s3
72+
let Ok(data) = self.s3.get_aligned_batch(log.0.batchDataPointer).await else {
73+
error!("Error while downloading proofs from s3");
74+
continue;
75+
};
76+
77+
// Filter SP1 compressed proofs to and push to queue to be aggregated
78+
let proofs: Vec<(Proof, Vec<u8>)> = data
79+
.into_iter()
80+
.filter_map(|p| match p.proving_system {
81+
ProvingSystemId::SP1 => {
82+
let elf = p.vm_program_code?;
83+
let proof = bincode::deserialize(&p.proof).ok()?;
84+
let sp1_proof = SP1Proof {
85+
proof,
86+
vk: vk_from_elf(&elf),
87+
};
88+
89+
Some((Proof::SP1(sp1_proof), elf))
90+
}
91+
_ => None,
92+
})
93+
.collect();
94+
95+
// try to add them to the queue
96+
let mut queue_lock = self.queue.lock().await;
97+
for (proof, elf) in proofs {
98+
match queue_lock.add_proof(proof, &elf) {
99+
Ok(_) => info!(
100+
"New proof added to queue, current length {}",
101+
queue_lock.proofs().len()
102+
),
103+
Err(e) => error!("Could not add proof, reason: {:?}", e),
104+
};
105+
}
106+
}
107+
}
108+
}

aggregation-mode/src/backend/types.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use alloy::{
99
},
1010
sol,
1111
};
12+
use AlignedLayerServiceManager::AlignedLayerServiceManagerInstance;
1213
use AlignedProofAggregationService::AlignedProofAggregationServiceInstance;
1314

1415
sol!(
@@ -30,3 +31,20 @@ pub type AlignedProofAggregationServiceContract = AlignedProofAggregationService
3031
RootProvider,
3132
>,
3233
>;
34+
35+
sol!(
36+
#[sol(rpc)]
37+
AlignedLayerServiceManager,
38+
"abi/AlignedLayerServiceManager.json"
39+
);
40+
41+
pub type AlignedLayerServiceManagerContract = AlignedLayerServiceManagerInstance<
42+
(),
43+
FillProvider<
44+
JoinFill<
45+
Identity,
46+
JoinFill<GasFiller, JoinFill<BlobGasFiller, JoinFill<NonceFiller, ChainIdFiller>>>,
47+
>,
48+
RootProvider,
49+
>,
50+
>;

0 commit comments

Comments
 (0)