Skip to content

Commit 4ab27f1

Browse files
committed
feat: use orchestrator in proof aggregator
1 parent 5a2d233 commit 4ab27f1

File tree

3 files changed

+76
-56
lines changed

3 files changed

+76
-56
lines changed

aggregation_mode/proof_aggregator/src/backend/db.rs

Lines changed: 74 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,82 +1,102 @@
1-
use db::types::Task;
2-
use sqlx::{postgres::PgPoolOptions, types::Uuid, Pool, Postgres};
1+
use db::{orchestrator::DbOrchestartor, retry::RetryConfig, types::Task};
2+
use sqlx::types::Uuid;
33

4-
#[derive(Clone, Debug)]
4+
// Retry parameters for Db queries
5+
/// Initial delay before first retry attempt (in milliseconds)
6+
const RETRY_MIN_DELAY_MILLIS: u64 = 500;
7+
/// Exponential backoff multiplier for retry delays
8+
const RETRY_FACTOR: f32 = 2.0;
9+
/// Maximum number of retry attempts
10+
const RETRY_MAX_TIMES: usize = 5;
11+
/// Maximum delay between retry attempts (in seconds)
12+
const RETRY_MAX_DELAY_SECONDS: u64 = 30;
13+
14+
#[derive(Debug, Clone)]
515
pub struct Db {
6-
pool: Pool<Postgres>,
16+
orchestrator: DbOrchestartor,
717
}
818

919
#[derive(Debug, Clone)]
1020
pub enum DbError {
11-
ConnectError(String),
21+
Creation(String),
1222
Query(String),
1323
}
1424

1525
impl Db {
16-
pub async fn try_new(connection_url: &str) -> Result<Self, DbError> {
17-
let pool = PgPoolOptions::new()
18-
.max_connections(5)
19-
.connect(connection_url)
20-
.await
21-
.map_err(|e| DbError::ConnectError(e.to_string()))?;
26+
pub async fn try_new(connection_urls: &[&str]) -> Result<Self, DbError> {
27+
let orchestrator = DbOrchestartor::try_new(
28+
connection_urls,
29+
RetryConfig {
30+
min_delay_millis: RETRY_MIN_DELAY_MILLIS,
31+
factor: RETRY_FACTOR,
32+
max_times: RETRY_MAX_TIMES,
33+
max_delay_seconds: RETRY_MAX_DELAY_SECONDS,
34+
},
35+
)
36+
.map_err(|e| DbError::Creation(e.to_string()))?;
2237

23-
Ok(Self { pool })
38+
Ok(Self { orchestrator })
2439
}
2540

2641
pub async fn get_pending_tasks_and_mark_them_as_processing(
27-
&self,
42+
&mut self,
2843
proving_system_id: i32,
2944
limit: i64,
3045
) -> Result<Vec<Task>, DbError> {
31-
sqlx::query_as::<_, Task>(
32-
"WITH selected AS (
33-
SELECT task_id
34-
FROM tasks
35-
WHERE proving_system_id = $1 AND status = 'pending'
36-
LIMIT $2
37-
FOR UPDATE SKIP LOCKED
46+
self.orchestrator
47+
.write(async |pool| {
48+
sqlx::query_as::<_, Task>(
49+
"WITH selected AS (
50+
SELECT task_id
51+
FROM tasks
52+
WHERE proving_system_id = $1 AND status = 'pending'
53+
LIMIT $2
54+
FOR UPDATE SKIP LOCKED
55+
)
56+
UPDATE tasks t
57+
SET status = 'processing'
58+
FROM selected s
59+
WHERE t.task_id = s.task_id
60+
RETURNING t.*;",
3861
)
39-
UPDATE tasks t
40-
SET status = 'processing'
41-
FROM selected s
42-
WHERE t.task_id = s.task_id
43-
RETURNING t.*;",
44-
)
45-
.bind(proving_system_id)
46-
.bind(limit)
47-
.fetch_all(&self.pool)
48-
.await
49-
.map_err(|e| DbError::Query(e.to_string()))
62+
.bind(proving_system_id)
63+
.bind(limit)
64+
.fetch_all(&pool)
65+
.await
66+
})
67+
.await
68+
.map_err(|e| DbError::Query(e.to_string()))
5069
}
5170

5271
pub async fn insert_tasks_merkle_path_and_mark_them_as_verified(
53-
&self,
72+
&mut self,
5473
updates: Vec<(Uuid, Vec<u8>)>,
5574
) -> Result<(), DbError> {
56-
let mut tx = self
57-
.pool
58-
.begin()
59-
.await
60-
.map_err(|e| DbError::Query(e.to_string()))?;
75+
let updates_ref = &updates;
6176

62-
for (task_id, merkle_path) in updates {
63-
if let Err(e) = sqlx::query(
64-
"UPDATE tasks SET merkle_path = $1, status = 'verified', proof = NULL WHERE task_id = $2",
65-
)
66-
.bind(merkle_path)
67-
.bind(task_id)
68-
.execute(&mut *tx)
69-
.await
70-
{
71-
tx.rollback()
72-
.await
73-
.map_err(|e| DbError::Query(e.to_string()))?;
74-
tracing::error!("Error while updating task merkle path and status {}", e);
75-
return Err(DbError::Query(e.to_string()));
76-
}
77-
}
77+
self.orchestrator
78+
.write(|pool| {
79+
let updates = updates_ref;
80+
async move {
81+
let mut tx = pool.begin().await?;
82+
83+
for (task_id, merkle_path) in updates.iter() {
84+
if let Err(e) = sqlx::query(
85+
"UPDATE tasks SET merkle_path = $1, status = 'verified', proof = NULL WHERE task_id = $2",
86+
)
87+
.bind(merkle_path.as_slice())
88+
.bind(*task_id)
89+
.execute(&mut *tx)
90+
.await {
91+
tracing::error!("Error while updating task merkle path and status {}", e);
92+
return Err(e);
93+
};
94+
}
7895

79-
tx.commit()
96+
tx.commit().await?;
97+
Ok(())
98+
}
99+
})
80100
.await
81101
.map_err(|e| DbError::Query(e.to_string()))?;
82102

aggregation_mode/proof_aggregator/src/backend/fetcher.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ impl ProofsFetcher {
2424
}
2525

2626
pub async fn fetch_pending_proofs(
27-
&self,
27+
&mut self,
2828
engine: ZKVMEngine,
2929
limit: i64,
3030
) -> Result<(Vec<AlignedProof>, Vec<Uuid>), ProofsFetcherError> {

aggregation_mode/proof_aggregator/src/backend/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ impl ProofAggregator {
8585
let engine =
8686
ZKVMEngine::from_env().expect("AGGREGATOR env variable to be set to one of sp1|risc0");
8787

88-
let db = Db::try_new(&config.db_connection_url)
88+
let db = Db::try_new(&[&config.db_connection_url])
8989
.await
9090
.expect("To connect to db");
9191

0 commit comments

Comments
 (0)