Skip to content

Commit 8d6dddb

Browse files
committed
feat: handle processing proofs that have failed in the way
1 parent d623ed0 commit 8d6dddb

File tree

6 files changed

+59
-18
lines changed

6 files changed

+59
-18
lines changed

aggregation_mode/Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

aggregation_mode/db/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@ edition = "2021"
55

66
[dependencies]
77
tokio = { version = "1"}
8-
# TODO: enable tls
9-
sqlx = { version = "0.8", features = [ "runtime-tokio", "postgres", "migrate" ] }
8+
sqlx = { version = "0.8", features = [ "runtime-tokio", "postgres", "migrate", "chrono" ] }
109

1110

1211
[[bin]]

aggregation_mode/db/migrations/001_init.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ CREATE TABLE tasks (
99
merkle_path BYTEA,
1010
status task_status DEFAULT 'pending',
1111
nonce BIGINT NOT NULL,
12-
inserted_at TIMESTAMPTZ NOT NULL DEFAULT now()
12+
inserted_at TIMESTAMPTZ NOT NULL DEFAULT now(),
13+
status_updated_at TIMESTAMPTZ
1314
);
1415

1516
CREATE TABLE payment_events (

aggregation_mode/db/src/types.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
use sqlx::{
22
prelude::FromRow,
3-
types::{BigDecimal, Uuid},
3+
types::{
4+
chrono::{DateTime, Utc},
5+
BigDecimal, Uuid,
6+
},
47
Type,
58
};
69

@@ -21,6 +24,7 @@ pub struct Task {
2124
pub program_commitment: Vec<u8>,
2225
pub merkle_path: Option<Vec<u8>>,
2326
pub status: TaskStatus,
27+
pub status_updated_at: Option<DateTime<Utc>>,
2428
}
2529

2630
#[derive(Debug, Clone, FromRow)]

aggregation_mode/proof_aggregator/src/backend/db.rs

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,19 @@ impl Db {
3232
"WITH selected AS (
3333
SELECT task_id
3434
FROM tasks
35-
WHERE proving_system_id = $1 AND status = 'pending'
35+
WHERE proving_system_id = $1
36+
AND (
37+
status = 'pending'
38+
OR (
39+
status = 'processing'
40+
AND status_updated_at <= now() - interval '12 hours'
41+
)
42+
)
3643
LIMIT $2
3744
FOR UPDATE SKIP LOCKED
3845
)
3946
UPDATE tasks t
40-
SET status = 'processing'
47+
SET status = 'processing', status_updated_at = now()
4148
FROM selected s
4249
WHERE t.task_id = s.task_id
4350
RETURNING t.*;",
@@ -61,7 +68,7 @@ impl Db {
6168

6269
for (task_id, merkle_path) in updates {
6370
if let Err(e) = sqlx::query(
64-
"UPDATE tasks SET merkle_path = $1, status = 'verified', proof = NULL WHERE task_id = $2",
71+
"UPDATE tasks SET merkle_path = $1, status = 'verified', status_updated_at = now(), proof = NULL WHERE task_id = $2",
6572
)
6673
.bind(merkle_path)
6774
.bind(task_id)
@@ -83,6 +90,20 @@ impl Db {
8390
Ok(())
8491
}
8592

86-
// TODO: this should be used when rolling back processing proofs on unexpected errors
87-
pub async fn mark_tasks_as_pending(&self) {}
93+
pub async fn mark_tasks_as_pending(&self, tasks_id: &[Uuid]) -> Result<(), DbError> {
94+
if tasks_id.is_empty() {
95+
return Ok(());
96+
}
97+
98+
sqlx::query(
99+
"UPDATE tasks SET status = 'pending', status_updated_at = now()
100+
WHERE task_id = ANY($1) AND status = 'processing'",
101+
)
102+
.bind(tasks_id)
103+
.execute(&self.pool)
104+
.await
105+
.map_err(|e| DbError::Query(e.to_string()))?;
106+
107+
Ok(())
108+
}
88109
}

aggregation_mode/proof_aggregator/src/backend/mod.rs

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -119,28 +119,40 @@ impl ProofAggregator {
119119
info!("Starting proof aggregator service");
120120

121121
info!("About to aggregate and submit proof to be verified on chain");
122-
let res = self.aggregate_and_submit_proofs_on_chain().await;
122+
123+
let (proofs, tasks_id) = match self
124+
.fetcher
125+
.fetch_pending_proofs(self.engine.clone(), self.config.total_proofs_limit as i64)
126+
.await
127+
.map_err(AggregatedProofSubmissionError::FetchingProofs)
128+
{
129+
Ok(res) => res,
130+
Err(e) => {
131+
error!("Error while aggregating and submitting proofs: {:?}", e);
132+
return;
133+
}
134+
};
135+
136+
let res = self
137+
.aggregate_and_submit_proofs_on_chain((proofs, &tasks_id))
138+
.await;
123139

124140
match res {
125141
Ok(()) => {
126142
info!("Process finished successfully");
127143
}
128144
Err(err) => {
129145
error!("Error while aggregating and submitting proofs: {:?}", err);
146+
warn!("Marking tasks back to pending after failure");
147+
self.db.mark_tasks_as_pending(&tasks_id).await;
130148
}
131149
}
132150
}
133151

134-
// TODO: on failure, mark proofs as pending again
135152
async fn aggregate_and_submit_proofs_on_chain(
136153
&mut self,
154+
(proofs, tasks_id): (Vec<AlignedProof>, &[Uuid]),
137155
) -> Result<(), AggregatedProofSubmissionError> {
138-
let (proofs, tasks_id) = self
139-
.fetcher
140-
.fetch_pending_proofs(self.engine.clone(), self.config.total_proofs_limit as i64)
141-
.await
142-
.map_err(AggregatedProofSubmissionError::FetchingProofs)?;
143-
144156
if proofs.is_empty() {
145157
warn!("No proofs collected, skipping aggregation...");
146158
return Ok(());
@@ -226,7 +238,7 @@ impl ProofAggregator {
226238
.flat_map(|e| e.to_vec())
227239
.collect::<Vec<_>>();
228240

229-
merkle_paths_for_tasks.push((task_id, proof_bytes))
241+
merkle_paths_for_tasks.push((*task_id, proof_bytes))
230242
}
231243
self.db
232244
.insert_tasks_merkle_path_and_mark_them_as_verified(merkle_paths_for_tasks)

0 commit comments

Comments
 (0)