Skip to content

Commit 3279eb5

Browse files
committed
fix: defer report scheduler checkpoint until enqueue succeeds
Move the REPORT_SCHEDULER_LAST_CHECK_CACHE_KEY checkpoint update from before the enqueue loop to after each hour bucket is fully processed. Previously the checkpoint was advanced to 'now' before any messages were published. If the process crashed or the message queue was unavailable, the pending hour buckets were permanently skipped because the next run would start from the already-advanced checkpoint. Changes: - Advance checkpoint incrementally per hour bucket, only after all reports in that bucket have been successfully enqueued. - Stop advancing checkpoint on first enqueue failure so the failed hour bucket (and any remaining ones) are retried on the next tick. - Propagate cache.insert errors via '?' instead of silently ignoring them with 'let _ ='. - When no hour boundaries need processing, still advance checkpoint to avoid re-scanning the same sub-hour window. Fixes #1586
1 parent 9bafeb5 commit 3279eb5

File tree

1 file changed

+21
-4
lines changed

1 file changed

+21
-4
lines changed

app-server/src/reports/scheduler.rs

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,13 @@ async fn check_and_enqueue(
9696
now
9797
);
9898

99-
let _ = cache
100-
.insert(REPORT_SCHEDULER_LAST_CHECK_CACHE_KEY, now.timestamp())
101-
.await;
102-
10399
let hours_to_check = hour_boundaries_between(last_check, now);
104100
if hours_to_check.is_empty() {
101+
// No hour boundaries to process; advance checkpoint so the same
102+
// sub-hour window is not re-scanned on the next tick.
103+
cache
104+
.insert(REPORT_SCHEDULER_LAST_CHECK_CACHE_KEY, now.timestamp())
105+
.await?;
105106
return Ok(());
106107
}
107108
log::info!(
@@ -113,6 +114,7 @@ async fn check_and_enqueue(
113114
for (weekday, hour, triggered_at) in hours_to_check {
114115
let reports = get_reports_for_weekday_and_hour(pool, weekday, hour).await?;
115116

117+
let mut all_enqueued = true;
116118
for report in reports {
117119
let message = ReportTriggerMessage {
118120
id: report.id,
@@ -129,8 +131,23 @@ async fn check_and_enqueue(
129131
report.id,
130132
e
131133
);
134+
all_enqueued = false;
132135
}
133136
}
137+
138+
if !all_enqueued {
139+
// Stop advancing the checkpoint so this hour bucket (and any
140+
// remaining ones) will be retried on the next tick.
141+
return Err(anyhow::anyhow!(
142+
"One or more reports failed to enqueue for hour {triggered_at}; checkpoint held for retry"
143+
));
144+
}
145+
146+
// Advance checkpoint only after all reports in this hour bucket
147+
// have been successfully enqueued.
148+
cache
149+
.insert(REPORT_SCHEDULER_LAST_CHECK_CACHE_KEY, triggered_at)
150+
.await?;
134151
}
135152

136153
Ok(())

0 commit comments

Comments
 (0)