Skip to content

Commit 6ce9f1e

Browse files
committed
feat: store merkle paths and mark as verified after submitting proof
1 parent 0df9e0e commit 6ce9f1e

File tree

5 files changed

+116
-46
lines changed

5 files changed

+116
-46
lines changed

aggregation_mode/db/migrations/001_init.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
CREATE TYPE task_status AS ENUM ('pending', 'processing', 'verified', 'submitted');
1+
CREATE TYPE task_status AS ENUM ('pending', 'processing', 'verified');
22

33
CREATE TABLE tasks (
44
task_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),

aggregation_mode/db/src/types.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,8 @@ use sqlx::{
88
#[sqlx(type_name = "task_status", rename_all = "lowercase")]
99
pub enum TaskStatus {
1010
Pending,
11-
Running,
12-
Done,
13-
Failed,
11+
Processing,
12+
Verified,
1413
}
1514

1615
#[derive(Debug, Clone, FromRow)]

aggregation_mode/proof_aggregator/src/backend/db.rs

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use db::types::Task;
2-
use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
2+
use sqlx::{postgres::PgPoolOptions, types::Uuid, Pool, Postgres};
33

44
#[derive(Clone, Debug)]
55
pub struct Db {
@@ -29,7 +29,18 @@ impl Db {
2929
limit: i64,
3030
) -> Result<Vec<Task>, DbError> {
3131
sqlx::query_as::<_, Task>(
32-
"SELECT * FROM tasks WHERE status = 'pending' AND proving_system_id = $1 LIMIT $2",
32+
"WITH selected AS (
33+
SELECT task_id
34+
FROM tasks
35+
WHERE proving_system_id = $1 AND status = 'pending'
36+
LIMIT $2
37+
FOR UPDATE SKIP LOCKED
38+
)
39+
UPDATE tasks t
40+
SET status = 'processing'
41+
FROM selected s
42+
WHERE t.task_id = s.task_id
43+
RETURNING t.*;",
3344
)
3445
.bind(proving_system_id)
3546
.bind(limit)
@@ -38,11 +49,40 @@ impl Db {
3849
.map_err(|e| DbError::Query(e.to_string()))
3950
}
4051

41-
pub async fn mark_tasks_as_pending(&self) {}
52+
pub async fn insert_tasks_merkle_path_and_mark_them_as_submitted(
53+
&self,
54+
updates: Vec<(Uuid, Vec<u8>)>,
55+
) -> Result<(), DbError> {
56+
let mut tx = self
57+
.pool
58+
.begin()
59+
.await
60+
.map_err(|e| DbError::Query(e.to_string()))?;
61+
62+
for (task_id, merkle_path) in updates {
63+
if let Err(e) = sqlx::query(
64+
"UPDATE tasks SET merkle_path = $1, status = 'verified' WHERE task_id = $2",
65+
)
66+
.bind(merkle_path)
67+
.bind(task_id)
68+
.execute(&mut *tx)
69+
.await
70+
{
71+
tx.rollback()
72+
.await
73+
.map_err(|e| DbError::Query(e.to_string()))?;
74+
tracing::error!("Error while updating task merkle path and status {}", e);
75+
return Err(DbError::Query(e.to_string()));
76+
}
77+
}
4278

43-
pub async fn mark_tasks_as_processing(&self) {}
79+
tx.commit()
80+
.await
81+
.map_err(|e| DbError::Query(e.to_string()))?;
4482

45-
pub async fn mark_tasks_as_verified(&self) {}
83+
Ok(())
84+
}
4685

47-
pub async fn mark_tasks_as_submitted(&self) {}
86+
// TODO: this should be used when rolling back processing proofs on unexpected errors
87+
pub async fn mark_tasks_as_pending(&self) {}
4888
}

aggregation_mode/proof_aggregator/src/backend/fetcher.rs

Lines changed: 42 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ use crate::{
55
},
66
backend::db::{Db, DbError},
77
};
8-
use rayon::prelude::*;
8+
use rayon::iter::{IntoParallelIterator, ParallelIterator};
9+
use sqlx::types::Uuid;
910
use tracing::{error, info};
1011

1112
#[derive(Debug)]
@@ -26,48 +27,56 @@ impl ProofsFetcher {
2627
&self,
2728
engine: ZKVMEngine,
2829
limit: i64,
29-
) -> Result<Vec<AlignedProof>, ProofsFetcherError> {
30+
) -> Result<(Vec<AlignedProof>, Vec<Uuid>), ProofsFetcherError> {
3031
let tasks = self
3132
.db
3233
.get_pending_tasks_and_mark_them_as_processed(engine.proving_system_id() as i64, limit)
3334
.await
3435
.map_err(ProofsFetcherError::Query)?;
3536

36-
let proofs_to_aggregate: Vec<AlignedProof> = match engine {
37-
ZKVMEngine::SP1 => tasks
38-
.into_par_iter()
39-
.filter_map(|task| {
40-
let vk = bincode::deserialize(&task.program_commitment).ok()?;
41-
let proof_with_pub_values = bincode::deserialize(&task.proof).ok()?;
42-
let sp1_proof = SP1ProofWithPubValuesAndVk::new(proof_with_pub_values, vk);
37+
let (tasks_id, proofs_to_aggregate): (Vec<Uuid>, Vec<AlignedProof>) = match engine {
38+
ZKVMEngine::SP1 => {
39+
let pairs: Vec<(Uuid, AlignedProof)> = tasks
40+
.into_par_iter()
41+
.filter_map(|task| {
42+
let vk = bincode::deserialize(&task.program_commitment).ok()?;
43+
let proof_with_pub_values = bincode::deserialize(&task.proof).ok()?;
4344

44-
match sp1_proof {
45-
Ok(proof) => Some(AlignedProof::SP1(proof.into())),
46-
Err(err) => {
47-
error!("Could not add proof, verification failed: {:?}", err);
48-
None
45+
match SP1ProofWithPubValuesAndVk::new(proof_with_pub_values, vk) {
46+
Ok(proof) => Some((task.task_id, AlignedProof::SP1(proof.into()))),
47+
Err(err) => {
48+
error!("Could not add proof, verification failed: {:?}", err);
49+
None
50+
}
4951
}
50-
}
51-
})
52-
.collect(),
53-
ZKVMEngine::RISC0 => tasks
54-
.into_par_iter()
55-
.filter_map(|task| {
56-
let mut image_id = [0u8; 32];
57-
image_id.copy_from_slice(&task.program_commitment);
58-
let receipt: risc0_zkvm::Receipt = bincode::deserialize(&task.proof).ok()?;
52+
})
53+
.collect();
5954

60-
let risc0_proof = Risc0ProofReceiptAndImageId::new(image_id, receipt);
55+
pairs.into_iter().unzip()
56+
}
57+
ZKVMEngine::RISC0 => {
58+
let pairs: Vec<(Uuid, AlignedProof)> = tasks
59+
.into_par_iter()
60+
.filter_map(|task| {
61+
let mut image_id = [0u8; 32];
62+
image_id.copy_from_slice(&task.program_commitment);
63+
// we are inside a for_each callback so it returns for this particular iteration only
64+
let receipt = bincode::deserialize(&task.proof).ok()?;
6165

62-
match risc0_proof {
63-
Ok(proof) => Some(AlignedProof::Risc0(proof.into())),
64-
Err(err) => {
65-
error!("Could not add proof, verification failed: {:?}", err);
66-
None
66+
let risc0_proof = Risc0ProofReceiptAndImageId::new(image_id, receipt);
67+
68+
match risc0_proof {
69+
Ok(proof) => Some((task.task_id, AlignedProof::Risc0(proof.into()))),
70+
Err(err) => {
71+
error!("Could not add proof, verification failed: {:?}", err);
72+
None
73+
}
6774
}
68-
}
69-
})
70-
.collect(),
75+
})
76+
.collect();
77+
78+
pairs.into_iter().unzip()
79+
}
7180
};
7281

7382
info!(
@@ -76,6 +85,6 @@ impl ProofsFetcher {
7685
proofs_to_aggregate.len()
7786
);
7887

79-
Ok(proofs_to_aggregate)
88+
Ok((proofs_to_aggregate, tasks_id))
8089
}
8190
}

aggregation_mode/proof_aggregator/src/backend/mod.rs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ mod types;
66

77
use crate::{
88
aggregators::{AlignedProof, ProofAggregationError, ZKVMEngine},
9-
backend::db::Db,
9+
backend::db::{Db, DbError},
1010
};
1111

1212
use alloy::{
@@ -39,6 +39,7 @@ pub enum AggregatedProofSubmissionError {
3939
ZKVMAggregation(ProofAggregationError),
4040
BuildingMerkleRoot,
4141
MerkleRootMisMatch,
42+
StoringMerklePaths(DbError),
4243
}
4344

4445
pub struct ProofAggregator {
@@ -115,10 +116,11 @@ impl ProofAggregator {
115116
}
116117
}
117118

119+
// TODO: on failure, mark proofs as pending again
118120
async fn aggregate_and_submit_proofs_on_chain(
119121
&mut self,
120122
) -> Result<(), AggregatedProofSubmissionError> {
121-
let proofs = self
123+
let (proofs, tasks_id) = self
122124
.fetcher
123125
.query(self.engine.clone(), self.config.total_proofs_limit)
124126
.await
@@ -167,7 +169,27 @@ impl ProofAggregator {
167169
receipt.transaction_hash
168170
);
169171

170-
// TODO: mark them as verified and store their merkle paths
172+
info!("Storing merkle paths for each task...",);
173+
let mut merkle_paths_for_tasks = vec![];
174+
for task_id in tasks_id {
175+
let Some(proof) = merkle_tree.get_proof_by_pos(0) else {
176+
warn!("Proof not found for task id {task_id}");
177+
continue;
178+
};
179+
let proof_bytes = proof
180+
.merkle_path
181+
.iter()
182+
.map(|e| e.to_vec())
183+
.flatten()
184+
.collect::<Vec<_>>();
185+
186+
merkle_paths_for_tasks.push((task_id, proof_bytes))
187+
}
188+
self.db
189+
.insert_tasks_merkle_path_and_mark_them_as_submitted(merkle_paths_for_tasks)
190+
.await
191+
.map_err(AggregatedProofSubmissionError::StoringMerklePaths)?;
192+
info!("Merkle path inserted sucessfully",);
171193

172194
Ok(())
173195
}

0 commit comments

Comments
 (0)