Skip to content

Commit 3309e31

Browse files
committed
feat: basic fetcher flow
1 parent 0419233 commit 3309e31

File tree

7 files changed

+98
-397
lines changed

7 files changed

+98
-397
lines changed

aggregation_mode/proof_aggregator/src/aggregators/mod.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@ pub mod sp1_aggregator;
33

44
use std::fmt::Display;
55

6+
use aligned_sdk::aggregation_layer::AggregationModeProvingSystem;
67
use lambdaworks_crypto::merkle_tree::traits::IsMerkleTreeBackend;
78
use risc0_aggregator::{Risc0AggregationError, Risc0ProofReceiptAndImageId};
89
use sha3::{Digest, Keccak256};
9-
use sp1_aggregator::{SP1AggregationError, SP1ProofWithPubValuesAndElf};
10+
use sp1_aggregator::{SP1AggregationError, SP1ProofWithPubValuesAndVk};
1011
use tracing::info;
1112

1213
#[derive(Clone, Debug)]
@@ -44,6 +45,13 @@ impl ZKVMEngine {
4445
Some(engine)
4546
}
4647

48+
pub fn proving_system_id(&self) -> u16 {
49+
match &self {
50+
ZKVMEngine::SP1 => AggregationModeProvingSystem::SP1.as_u16(),
51+
ZKVMEngine::RISC0 => AggregationModeProvingSystem::RISC0.as_u16(),
52+
}
53+
}
54+
4755
/// Aggregates a list of [`AlignedProof`]s into a single [`AlignedProof`].
4856
///
4957
/// Returns a tuple containing:
@@ -61,7 +69,7 @@ impl ZKVMEngine {
6169
) -> Result<(AlignedProof, [u8; 32]), ProofAggregationError> {
6270
let res = match self {
6371
ZKVMEngine::SP1 => {
64-
let proofs: Vec<SP1ProofWithPubValuesAndElf> = proofs
72+
let proofs: Vec<SP1ProofWithPubValuesAndVk> = proofs
6573
.into_iter()
6674
// Fetcher already filtered for SP1
6775
// We do this for type casting, as to avoid using generics
@@ -80,7 +88,7 @@ impl ZKVMEngine {
8088
proofs_per_chunk,
8189
);
8290

83-
let mut agg_proofs: Vec<(SP1ProofWithPubValuesAndElf, Vec<[u8; 32]>)> = vec![];
91+
let mut agg_proofs: Vec<(SP1ProofWithPubValuesAndVk, Vec<[u8; 32]>)> = vec![];
8492
for (i, chunk) in chunks.enumerate() {
8593
let leaves_commitment =
8694
chunk.iter().map(|e| e.hash_vk_and_pub_inputs()).collect();
@@ -154,7 +162,7 @@ impl ZKVMEngine {
154162
}
155163

156164
pub enum AlignedProof {
157-
SP1(Box<SP1ProofWithPubValuesAndElf>),
165+
SP1(Box<SP1ProofWithPubValuesAndVk>),
158166
Risc0(Box<Risc0ProofReceiptAndImageId>),
159167
}
160168

aggregation_mode/proof_aggregator/src/aggregators/sp1_aggregator.rs

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,8 @@ static SP1_PROVER_CLIENT: LazyLock<EnvProver> = LazyLock::new(ProverClient::from
2525
static SP1_PROVER_CLIENT_CPU: LazyLock<CpuProver> =
2626
LazyLock::new(|| ProverClient::builder().cpu().build());
2727

28-
pub struct SP1ProofWithPubValuesAndElf {
28+
pub struct SP1ProofWithPubValuesAndVk {
2929
pub proof_with_pub_values: SP1ProofWithPublicValues,
30-
pub elf: Vec<u8>,
3130
pub vk: SP1VerifyingKey,
3231
}
3332

@@ -37,16 +36,14 @@ pub enum AlignedSP1VerificationError {
3736
UnsupportedProof,
3837
}
3938

40-
impl SP1ProofWithPubValuesAndElf {
39+
impl SP1ProofWithPubValuesAndVk {
4140
/// Constructs a new instance of the struct by verifying a given SP1 proof with its public values.
4241
pub fn new(
4342
proof_with_pub_values: SP1ProofWithPublicValues,
44-
elf: Vec<u8>,
43+
vk: SP1VerifyingKey,
4544
) -> Result<Self, AlignedSP1VerificationError> {
4645
let client = &*SP1_PROVER_CLIENT_CPU;
4746

48-
let (_pk, vk) = client.setup(&elf);
49-
5047
// only sp1 compressed proofs are supported for aggregation now
5148
match proof_with_pub_values.proof {
5249
sp1_sdk::SP1Proof::Compressed(_) => client
@@ -57,7 +54,6 @@ impl SP1ProofWithPubValuesAndElf {
5754

5855
Ok(Self {
5956
proof_with_pub_values,
60-
elf,
6157
vk,
6258
})
6359
}
@@ -80,8 +76,8 @@ pub enum SP1AggregationError {
8076
}
8177

8278
pub(crate) fn run_user_proofs_aggregator(
83-
proofs: &[SP1ProofWithPubValuesAndElf],
84-
) -> Result<SP1ProofWithPubValuesAndElf, SP1AggregationError> {
79+
proofs: &[SP1ProofWithPubValuesAndVk],
80+
) -> Result<SP1ProofWithPubValuesAndVk, SP1AggregationError> {
8581
let mut stdin = SP1Stdin::new();
8682

8783
let mut program_input = sp1_aggregation_program::UserProofsAggregatorInput {
@@ -131,18 +127,17 @@ pub(crate) fn run_user_proofs_aggregator(
131127
.verify(&proof, &vk)
132128
.map_err(SP1AggregationError::Verification)?;
133129

134-
let proof_and_elf = SP1ProofWithPubValuesAndElf {
130+
let proof_and_elf = SP1ProofWithPubValuesAndVk {
135131
proof_with_pub_values: proof,
136-
elf: USER_PROOFS_PROGRAM_ELF.to_vec(),
137132
vk,
138133
};
139134

140135
Ok(proof_and_elf)
141136
}
142137

143138
pub(crate) fn run_chunk_aggregator(
144-
proofs: &[(SP1ProofWithPubValuesAndElf, Vec<[u8; 32]>)],
145-
) -> Result<SP1ProofWithPubValuesAndElf, SP1AggregationError> {
139+
proofs: &[(SP1ProofWithPubValuesAndVk, Vec<[u8; 32]>)],
140+
) -> Result<SP1ProofWithPubValuesAndVk, SP1AggregationError> {
146141
let mut stdin = SP1Stdin::new();
147142

148143
let mut program_input = sp1_aggregation_program::ChunkAggregatorInput {
@@ -204,9 +199,8 @@ pub(crate) fn run_chunk_aggregator(
204199
.verify(&proof, &vk)
205200
.map_err(SP1AggregationError::Verification)?;
206201

207-
let proof_and_elf = SP1ProofWithPubValuesAndElf {
202+
let proof_and_elf = SP1ProofWithPubValuesAndVk {
208203
proof_with_pub_values: proof,
209-
elf: CHUNK_PROGRAM_ELF.to_vec(),
210204
vk,
211205
};
212206

Lines changed: 2 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,22 @@
11
use serde::{Deserialize, Serialize};
2-
use std::{fs::File, fs::OpenOptions, io::Read, io::Write};
2+
use std::{fs::File, io::Read};
33

44
#[derive(Debug, Deserialize, Serialize)]
55
pub struct ECDSAConfig {
66
pub private_key_store_path: String,
77
pub private_key_store_password: String,
88
}
99

10-
#[derive(Debug, Deserialize, Serialize)]
11-
pub struct LastAggregatedBlock {
12-
pub last_aggregated_block: u64,
13-
}
14-
1510
#[derive(Debug, Deserialize, Serialize)]
1611
pub struct Config {
1712
pub eth_rpc_url: String,
1813
pub eth_ws_url: String,
1914
pub max_proofs_in_queue: u16,
2015
pub proof_aggregation_service_address: String,
2116
pub aligned_service_manager_address: String,
22-
pub last_aggregated_block_filepath: String,
2317
pub ecdsa: ECDSAConfig,
2418
pub proofs_per_chunk: u16,
25-
pub total_proofs_limit: u16,
19+
pub total_proofs_limit: i64,
2620
pub sp1_chunk_aggregator_vk_hash: String,
2721
pub risc0_chunk_aggregator_image_id: String,
2822
pub db_connection_url: String,
@@ -36,32 +30,4 @@ impl Config {
3630
let config: Config = serde_yaml::from_str(&contents)?;
3731
Ok(config)
3832
}
39-
40-
pub fn get_last_aggregated_block(&self) -> Result<u64, Box<dyn std::error::Error>> {
41-
let mut file = File::open(&self.last_aggregated_block_filepath)?;
42-
let mut contents = String::new();
43-
file.read_to_string(&mut contents)?;
44-
let lab: LastAggregatedBlock = serde_json::from_str(&contents)?;
45-
Ok(lab.last_aggregated_block)
46-
}
47-
48-
pub fn update_last_aggregated_block(
49-
&self,
50-
last_aggregated_block: u64,
51-
) -> Result<(), Box<dyn std::error::Error>> {
52-
let last_aggregated_block_struct = LastAggregatedBlock {
53-
last_aggregated_block,
54-
};
55-
56-
let mut file = OpenOptions::new()
57-
.write(true)
58-
.truncate(true)
59-
.create(true)
60-
.open(&self.last_aggregated_block_filepath)?;
61-
62-
let content = serde_json::to_string(&last_aggregated_block_struct)?;
63-
file.write_all(content.as_bytes())?;
64-
65-
Ok(())
66-
}
6733
}

aggregation_mode/proof_aggregator/src/backend/db.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
use db::types::Task;
2-
use sqlx::{
3-
postgres::PgPoolOptions,
4-
types::{BigDecimal, Uuid},
5-
Pool, Postgres,
6-
};
2+
use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
73

84
#[derive(Clone, Debug)]
95
pub struct Db {
@@ -29,13 +25,17 @@ impl Db {
2925

3026
pub async fn get_pending_tasks_and_mark_them_as_processed(
3127
&self,
28+
proving_system_id: i64,
3229
limit: i64,
3330
) -> Result<Vec<Task>, DbError> {
34-
sqlx::query_as::<_, Task>("SELECT * FROM tasks WHERE status = 'pending' LIMIT $1")
35-
.bind(limit)
36-
.fetch_all(&self.pool)
37-
.await
38-
.map_err(|e| DbError::Query(e.to_string()))
31+
sqlx::query_as::<_, Task>(
32+
"SELECT * FROM tasks WHERE status = 'pending' AND proving_system_id = $1 LIMIT $2",
33+
)
34+
.bind(proving_system_id)
35+
.bind(limit)
36+
.fetch_all(&self.pool)
37+
.await
38+
.map_err(|e| DbError::Query(e.to_string()))
3939
}
4040

4141
pub async fn mark_tasks_as_pending(&self) {}

0 commit comments

Comments
 (0)