Skip to content

Commit 9783115

Browse files
committed
refactor: rename get_pending_tasks_and_mark_them_as_processing + add explanation comment
1 parent 48c8918 commit 9783115

File tree

2 files changed

+13
-2
lines changed

2 files changed

+13
-2
lines changed

aggregation_mode/proof_aggregator/src/backend/db.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,18 @@ impl Db {
2323
Ok(Self { pool })
2424
}
2525

26-
pub async fn get_pending_tasks_and_mark_them_as_processing(
26+
/// Fetches tasks that are ready to be processed and atomically updates their status.
27+
///
28+
/// This function selects up to `limit` tasks for the given `proving_system_id` that are
29+
/// either:
30+
/// - in `pending` status, or
31+
/// - in `processing` status but whose `status_updated_at` timestamp is older than 12 hours
32+
/// (to recover tasks that may have been abandoned or stalled).
33+
///
34+
/// The selected rows are locked using `FOR UPDATE SKIP LOCKED` to ensure safe concurrent
35+
/// processing by multiple workers. All selected tasks have their status set to
36+
/// `processing` and their `status_updated_at` updated to `now()` before being returned.
37+
pub async fn get_tasks_to_process_and_update_their_status(
2738
&self,
2839
proving_system_id: i32,
2940
limit: i64,

aggregation_mode/proof_aggregator/src/backend/fetcher.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ impl ProofsFetcher {
3030
) -> Result<(Vec<AlignedProof>, Vec<Uuid>), ProofsFetcherError> {
3131
let tasks = self
3232
.db
33-
.get_pending_tasks_and_mark_them_as_processing(engine.proving_system_id() as i32, limit)
33+
.get_tasks_to_process_and_update_their_status(engine.proving_system_id() as i32, limit)
3434
.await
3535
.map_err(ProofsFetcherError::Query)?;
3636

0 commit comments

Comments
 (0)