Skip to content

Commit b5c7aa3

Browse files
committed
feat: fetch logs instead of listening for events via ws
1 parent 6fc017d commit b5c7aa3

File tree

4 files changed

+74
-115
lines changed

4 files changed

+74
-115
lines changed

aggregation-mode/src/backend/fetcher.rs

Lines changed: 39 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,16 @@
1-
use std::{str::FromStr, sync::Arc};
1+
use std::str::FromStr;
22

33
use super::{
44
config::Config,
55
queue::ProofsQueue,
6-
s3::S3Client,
76
types::{AlignedLayerServiceManager, AlignedLayerServiceManagerContract},
87
};
9-
use crate::zk::{
10-
backends::sp1::{vk_from_elf, SP1Proof},
11-
Proof,
8+
use crate::{
9+
backend::s3::get_aligned_batch_from_s3,
10+
zk::{backends::sp1::SP1Proof, Proof},
1211
};
1312
use aligned_sdk::core::types::ProvingSystemId;
14-
use alloy::{
15-
primitives::Address,
16-
providers::{ProviderBuilder, WsConnect},
17-
};
18-
use futures_util::stream::StreamExt;
19-
use tokio::sync::Mutex;
13+
use alloy::{primitives::Address, providers::ProviderBuilder};
2014
use tracing::{error, info};
2115

2216
/// This services is in charge of:
@@ -26,79 +20,77 @@ use tracing::{error, info};
2620
/// 4. Push the proofs to the queue
2721
pub struct ProofsFetcher {
2822
aligned_service_manager: AlignedLayerServiceManagerContract,
29-
s3: S3Client,
30-
queue: Arc<Mutex<ProofsQueue>>,
3123
}
3224

3325
impl ProofsFetcher {
34-
pub async fn new(config: &Config, queue: Arc<Mutex<ProofsQueue>>) -> Self {
35-
let ws = WsConnect::new(&config.eth_ws_url);
36-
let provider = ProviderBuilder::new()
37-
.on_ws(ws)
38-
.await
39-
.expect("Successful connection");
40-
26+
pub fn new(config: &Config) -> Self {
27+
let rpc_url = config.eth_rpc_url.parse().expect("correct url");
28+
let provider = ProviderBuilder::new().on_http(rpc_url);
4129
let aligned_service_manager = AlignedLayerServiceManager::new(
4230
Address::from_str(&config.aligned_service_manager_address)
4331
.expect("Address to be correct"),
4432
provider,
4533
);
4634

47-
let s3 = S3Client::new(config.bucket_name.clone(), None).await;
48-
4935
Self {
5036
aligned_service_manager,
51-
s3,
52-
queue,
5337
}
5438
}
5539

56-
pub async fn start(&self) {
40+
// TODO: remove panic and return proofs instead of taking queue
41+
pub async fn fetch(&self, queue: &mut ProofsQueue) {
42+
info!("Fetching proofs from batch logs");
5743
// Subscribe to NewBatch event from AlignedServiceManager
58-
let event_sub = self
44+
let logs = self
5945
.aligned_service_manager
60-
.NewBatch_filter()
61-
.subscribe()
46+
.NewBatchV3_filter()
47+
.from_block(0)
48+
.query()
6249
.await
63-
.expect("To subscribe to event");
64-
let mut stream = event_sub.into_stream();
50+
.expect("to get logs");
6551

66-
while let Some(log) = stream.next().await {
67-
let Ok(log) = log else {
68-
continue;
69-
};
52+
info!("Logs collected {}", logs.len());
53+
54+
for (batch, _) in logs {
55+
info!(
56+
"New batch submitted, about to process. Batch merkle root {}...",
57+
batch.batchMerkleRoot
58+
);
7059

7160
// Download batch proofs from s3
72-
let Ok(data) = self.s3.get_aligned_batch(log.0.batchDataPointer).await else {
73-
error!("Error while downloading proofs from s3");
74-
continue;
61+
let data = match get_aligned_batch_from_s3(batch.batchDataPointer).await {
62+
Ok(data) => data,
63+
Err(err) => {
64+
error!("Error while downloading proofs from s3. Err {:?}", err);
65+
continue;
66+
}
7567
};
7668

69+
info!("Data downloaded from S3, number of proofs {}", data.len());
70+
7771
// Filter SP1 compressed proofs to and push to queue to be aggregated
78-
let proofs: Vec<(Proof, Vec<u8>)> = data
72+
let proofs: Vec<Proof> = data
7973
.into_iter()
8074
.filter_map(|p| match p.proving_system {
8175
ProvingSystemId::SP1 => {
8276
let elf = p.vm_program_code?;
8377
let proof = bincode::deserialize(&p.proof).ok()?;
84-
let sp1_proof = SP1Proof {
85-
proof,
86-
vk: vk_from_elf(&elf),
87-
};
78+
let sp1_proof = SP1Proof { proof, elf };
8879

89-
Some((Proof::SP1(sp1_proof), elf))
80+
Some(Proof::SP1(sp1_proof))
9081
}
9182
_ => None,
9283
})
9384
.collect();
9485

86+
info!("SP1 proofs filtered, total proofs to add {}", proofs.len());
87+
9588
// try to add them to the queue
96-
let mut queue_lock = self.queue.lock().await;
97-
for (proof, elf) in proofs {
98-
match queue_lock.add_proof(proof, &elf) {
89+
for proof in proofs {
90+
match queue.add_proof(proof) {
9991
Ok(_) => info!(
10092
"New proof added to queue, current length {}",
101-
queue_lock.proofs().len()
93+
queue.proofs().len()
10294
),
10395
Err(e) => error!("Could not add proof, reason: {:?}", e),
10496
};

aggregation-mode/src/backend/mod.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@ use alloy::{
1818
signers::local::LocalSigner,
1919
};
2020
use config::Config;
21+
use fetcher::ProofsFetcher;
2122
use merkle_tree::compute_proofs_merkle_root;
2223
use queue::ProofsQueue;
2324
use sp1_sdk::HashableKey;
24-
use std::{str::FromStr, sync::Arc, time::Duration};
25-
use tokio::sync::Mutex;
25+
use std::{str::FromStr, time::Duration};
2626
use tracing::{error, info, warn};
2727
use types::{AlignedProofAggregationService, AlignedProofAggregationServiceContract};
2828

@@ -38,11 +38,12 @@ pub struct ProofAggregator {
3838
engine: ZKVMEngine,
3939
submit_proof_every_secs: u64,
4040
proof_aggregation_service: AlignedProofAggregationServiceContract,
41-
queue: Arc<Mutex<ProofsQueue>>,
41+
fetcher: ProofsFetcher,
42+
queue: ProofsQueue,
4243
}
4344

4445
impl ProofAggregator {
45-
pub async fn new(config: &Config, queue: Arc<Mutex<ProofsQueue>>) -> Self {
46+
pub fn new(config: &Config) -> Self {
4647
let rpc_url = config.eth_rpc_url.parse().expect("correct url");
4748
let signer = LocalSigner::decrypt_keystore(
4849
config.ecdsa.private_key_store_path.clone(),
@@ -51,17 +52,19 @@ impl ProofAggregator {
5152
.expect("Correct keystore signer");
5253
let wallet = EthereumWallet::from(signer);
5354
let provider = ProviderBuilder::new().wallet(wallet).on_http(rpc_url);
54-
let proof_aggregation_service = AlignedProofAggregationService::new(
55+
let proof_aggregation_service: AlignedProofAggregationService::AlignedProofAggregationServiceInstance<(), alloy::providers::fillers::FillProvider<alloy::providers::fillers::JoinFill<alloy::providers::fillers::JoinFill<alloy::providers::Identity, alloy::providers::fillers::JoinFill<alloy::providers::fillers::GasFiller, alloy::providers::fillers::JoinFill<alloy::providers::fillers::BlobGasFiller, alloy::providers::fillers::JoinFill<alloy::providers::fillers::NonceFiller, alloy::providers::fillers::ChainIdFiller>>>>, alloy::providers::fillers::WalletFiller<EthereumWallet>>, alloy::providers::RootProvider>> = AlignedProofAggregationService::new(
5556
Address::from_str(&config.proof_aggregation_service_address)
5657
.expect("Address to be correct"),
5758
provider,
5859
);
60+
let fetcher = ProofsFetcher::new(config);
5961

6062
Self {
6163
engine: ZKVMEngine::SP1,
6264
submit_proof_every_secs: config.submit_proofs_every_secs,
6365
proof_aggregation_service,
64-
queue,
66+
queue: ProofsQueue::new(config.max_proofs_in_queue),
67+
fetcher,
6568
}
6669
}
6770

@@ -96,7 +99,9 @@ impl ProofAggregator {
9699
async fn aggregate_and_submit_proofs_on_chain(
97100
&mut self,
98101
) -> Result<(), AggregatedProofSubmissionError> {
99-
let proofs = self.queue.lock().await.clear();
102+
self.fetcher.fetch(&mut self.queue).await;
103+
let proofs = self.queue.clear();
104+
100105
if proofs.len() == 0 {
101106
warn!("No proofs in queue, skipping iteration...");
102107
return Ok(());
@@ -141,7 +146,7 @@ impl ProofAggregator {
141146
.proof_aggregation_service
142147
.verify(
143148
blob_tx_hash.into(),
144-
proof.vk.bytes32_raw().into(),
149+
proof.vk().bytes32_raw().into(),
145150
proof.proof.public_values.to_vec().into(),
146151
proof.proof.bytes().into(),
147152
)

aggregation-mode/src/backend/s3.rs

Lines changed: 18 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -1,63 +1,32 @@
11
use aligned_sdk::core::types::VerificationData;
2-
use aws_config::{meta::region::RegionProviderChain, BehaviorVersion};
3-
use aws_sdk_s3::Client;
4-
use tracing::info;
5-
6-
pub struct S3Client {
7-
client: Client,
8-
bucket_name: String,
9-
}
102

3+
#[derive(Debug)]
114
pub enum GetBatchProofsError {
125
Fetching,
136
Deserialization,
147
EmptyBody,
8+
StatusFailed,
159
}
1610

17-
impl S3Client {
18-
pub async fn new(bucket_name: String, endpoint_url: Option<String>) -> Self {
19-
let region_provider = RegionProviderChain::default_provider().or_else("us-east-2");
20-
let mut config = aws_config::defaults(BehaviorVersion::latest()).region(region_provider);
21-
if let Some(endpoint_url) = &endpoint_url {
22-
info!("Using custom endpoint: {}", endpoint_url);
23-
config = config.endpoint_url(endpoint_url);
24-
}
25-
let config = config.load().await;
26-
27-
let mut s3_config_builder = aws_sdk_s3::config::Builder::from(&config);
28-
if endpoint_url.is_some() {
29-
info!("Forcing path style for custom endpoint");
30-
s3_config_builder = s3_config_builder.force_path_style(true);
31-
}
11+
pub async fn get_aligned_batch_from_s3(
12+
url: String,
13+
) -> Result<Vec<VerificationData>, GetBatchProofsError> {
14+
let response = reqwest::get(url)
15+
.await
16+
.map_err(|_| GetBatchProofsError::Fetching)?;
3217

33-
let client = Client::from_conf(s3_config_builder.build());
34-
35-
Self {
36-
client,
37-
bucket_name,
38-
}
18+
if !response.status().is_success() {
19+
return Err(GetBatchProofsError::StatusFailed);
3920
}
4021

41-
pub async fn get_aligned_batch(
42-
&self,
43-
key: String,
44-
) -> Result<Vec<VerificationData>, GetBatchProofsError> {
45-
let result = self
46-
.client
47-
.get_object()
48-
.bucket(self.bucket_name.clone())
49-
.key(key)
50-
.send()
51-
.await
52-
.map_err(|_| GetBatchProofsError::Fetching)?;
53-
54-
let Some(bytes) = result.body.bytes() else {
55-
return Err(GetBatchProofsError::EmptyBody);
56-
};
22+
let bytes = response
23+
.bytes()
24+
.await
25+
.map_err(|_| GetBatchProofsError::EmptyBody)?;
26+
let bytes: &[u8] = bytes.iter().as_slice();
5727

58-
let data: Vec<VerificationData> =
59-
ciborium::from_reader(bytes).map_err(|_| GetBatchProofsError::Deserialization)?;
28+
let data: Vec<VerificationData> =
29+
ciborium::from_reader(bytes).map_err(|_| GetBatchProofsError::Deserialization)?;
6030

61-
Ok(data)
62-
}
31+
Ok(data)
6332
}

aggregation-mode/src/main.rs

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
1-
use std::{env, sync::Arc};
1+
use std::env;
22

3-
use proof_aggregator::backend::{
4-
config::Config, fetcher::ProofsFetcher, queue::ProofsQueue, ProofAggregator,
5-
};
6-
use tokio::sync::Mutex;
3+
use proof_aggregator::backend::{config::Config, ProofAggregator};
74
use tracing_subscriber::FmtSubscriber;
85

96
fn read_config_filepath_from_args() -> String {
@@ -29,13 +26,9 @@ async fn main() {
2926
let config = Config::from_file(&config_file_path).expect("Config is valid");
3027
tracing::info!("Config loaded");
3128

32-
let queue = Arc::new(Mutex::new(ProofsQueue::new(config.max_proofs_in_queue)));
33-
let mut proof_aggregator = ProofAggregator::new(&config, queue.clone()).await;
34-
let proofs_fetcher = ProofsFetcher::new(&config, queue).await;
29+
let mut proof_aggregator = ProofAggregator::new(&config);
3530

36-
// start tasks -> Proof aggregator + Proofs fetcher
3731
let proof_aggregator_handle = tokio::spawn(async move { proof_aggregator.start().await });
38-
let proofs_fetcher_handle = tokio::spawn(async move { proofs_fetcher.start().await });
3932

40-
let _ = tokio::join!(proof_aggregator_handle, proofs_fetcher_handle);
33+
let _ = tokio::join!(proof_aggregator_handle);
4134
}

0 commit comments

Comments
 (0)