Skip to content

Commit f9a8acc

Browse files
committed
refactor: move queue to its own struct
1 parent 99a6a37 commit f9a8acc

File tree

2 files changed

+65
-40
lines changed

2 files changed

+65
-40
lines changed

aggregation-mode/src/backend/mod.rs

Lines changed: 19 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
pub mod config;
2+
pub mod fetcher;
23
mod merkle_tree;
4+
pub mod queue;
5+
mod s3;
36
mod types;
47

58
use crate::zk::{
69
aggregator::{self, AggregatedProof, ProgramInput, ProofAggregationError},
710
backends::sp1::SP1AggregationInput,
8-
Proof, VerificationError, ZKVMEngine,
11+
Proof, ZKVMEngine,
912
};
1013
use alloy::{
1114
network::EthereumWallet,
@@ -16,17 +19,13 @@ use alloy::{
1619
};
1720
use config::Config;
1821
use merkle_tree::compute_proofs_merkle_root;
22+
use queue::ProofsQueue;
1923
use sp1_sdk::HashableKey;
20-
use std::{str::FromStr, time::Duration};
24+
use std::{str::FromStr, sync::Arc, time::Duration};
25+
use tokio::sync::Mutex;
2126
use tracing::{error, info, warn};
2227
use types::{AlignedProofAggregationService, AlignedProofAggregationServiceContract};
2328

24-
#[derive(Debug)]
25-
pub enum ProofQueueError {
26-
QueueMaxCapacity,
27-
InvalidProof(VerificationError),
28-
}
29-
3029
#[derive(Debug)]
3130
enum AggregatedProofSubmissionError {
3231
Aggregation(ProofAggregationError),
@@ -38,17 +37,16 @@ enum AggregatedProofSubmissionError {
3837
pub struct ProofAggregator {
3938
engine: ZKVMEngine,
4039
submit_proof_every_secs: u64,
41-
max_proofs_in_queue: u16,
42-
proofs_queue: Vec<Proof>,
4340
proof_aggregation_service: AlignedProofAggregationServiceContract,
41+
queue: Arc<Mutex<ProofsQueue>>,
4442
}
4543

4644
impl ProofAggregator {
47-
pub fn new(config: Config) -> Self {
45+
pub async fn new(config: &Config, queue: Arc<Mutex<ProofsQueue>>) -> Self {
4846
let rpc_url = config.eth_rpc_url.parse().expect("correct url");
4947
let signer = LocalSigner::decrypt_keystore(
50-
config.ecdsa.private_key_store_path,
51-
config.ecdsa.private_key_store_password,
48+
config.ecdsa.private_key_store_path.clone(),
49+
config.ecdsa.private_key_store_password.clone(),
5250
)
5351
.expect("Correct keystore signer");
5452
let wallet = EthereumWallet::from(signer);
@@ -62,14 +60,17 @@ impl ProofAggregator {
6260
Self {
6361
engine: ZKVMEngine::SP1,
6462
submit_proof_every_secs: config.submit_proofs_every_secs,
65-
max_proofs_in_queue: config.max_proofs_in_queue,
66-
proofs_queue: vec![],
6763
proof_aggregation_service,
64+
queue,
6865
}
6966
}
7067

7168
pub async fn start(&mut self) {
72-
info!("Starting proof aggregator service");
69+
info!(
70+
"Starting proof aggregator service, configured to run every {}",
71+
self.submit_proof_every_secs
72+
);
73+
7374
loop {
7475
tokio::time::sleep(Duration::from_secs(self.submit_proof_every_secs)).await;
7576
info!("About to aggregate and submit proof to be verified on chain");
@@ -92,37 +93,15 @@ impl ProofAggregator {
9293
}
9394
}
9495

95-
pub fn add_proof(&mut self, proof: Proof, elf: &[u8]) -> Result<(), ProofQueueError> {
96-
if let Err(err) = proof.verify(elf) {
97-
return Err(ProofQueueError::InvalidProof(err));
98-
};
99-
100-
if self.proofs_queue.len() as u16 >= self.max_proofs_in_queue {
101-
return Err(ProofQueueError::QueueMaxCapacity);
102-
}
103-
104-
self.proofs_queue.push(proof);
105-
106-
info!(
107-
"New proof added to queue, current length {}",
108-
self.proofs_queue.len()
109-
);
110-
Ok(())
111-
}
112-
11396
async fn aggregate_and_submit_proofs_on_chain(
11497
&mut self,
11598
) -> Result<(), AggregatedProofSubmissionError> {
116-
if self.proofs_queue.len() == 0 {
99+
let proofs = self.queue.lock().await.clear();
100+
if proofs.len() == 0 {
117101
warn!("No proofs in queue, skipping iteration...");
118102
return Ok(());
119103
}
120104

121-
let proofs = self
122-
.proofs_queue
123-
.drain(0..self.proofs_queue.len())
124-
.collect::<Vec<_>>();
125-
126105
let (merkle_root, leaves) = compute_proofs_merkle_root(&proofs);
127106
let output = match self.engine {
128107
ZKVMEngine::SP1 => {
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
use tracing::info;
2+
3+
use crate::zk::{Proof, VerificationError};
4+
5+
#[derive(Debug)]
6+
pub enum ProofQueueError {
7+
QueueMaxCapacity,
8+
InvalidProof(VerificationError),
9+
}
10+
11+
pub struct ProofsQueue {
12+
max_proofs_in_queue: u16,
13+
proofs: Vec<Proof>,
14+
}
15+
16+
impl ProofsQueue {
17+
pub fn new(max_proofs_in_queue: u16) -> Self {
18+
Self {
19+
max_proofs_in_queue,
20+
proofs: vec![],
21+
}
22+
}
23+
24+
pub fn proofs(&self) -> &[Proof] {
25+
&self.proofs
26+
}
27+
28+
/// Clears the queue and returns all the current proofs in it
29+
pub fn clear(&mut self) -> Vec<Proof> {
30+
self.proofs.drain(0..self.proofs.len()).collect()
31+
}
32+
33+
pub fn add_proof(&mut self, proof: Proof, elf: &[u8]) -> Result<(), ProofQueueError> {
34+
if let Err(err) = proof.verify(elf) {
35+
return Err(ProofQueueError::InvalidProof(err));
36+
};
37+
38+
if self.proofs.len() as u16 >= self.max_proofs_in_queue {
39+
return Err(ProofQueueError::QueueMaxCapacity);
40+
}
41+
42+
self.proofs.push(proof);
43+
44+
Ok(())
45+
}
46+
}

0 commit comments

Comments
 (0)