fix: defer report scheduler checkpoint until enqueue succeeds#1590
fix: defer report scheduler checkpoint until enqueue succeeds#1590ixchio wants to merge 1 commit intolmnr-ai:devfrom
Conversation
Greptile SummaryThis PR fixes a correctness bug in the reports scheduler where the checkpoint ( Changes:
Issues found:
Confidence Score: 3/5The PR fixes a real correctness bug but introduces a retry path that can cause duplicate user-facing notifications due to the absence of consumer-side idempotency. The original bug (checkpoint advancing before enqueue) is correctly fixed and the incremental advancement logic is sound. However, the retry semantics now activated expose a pre-existing gap: the RabbitMQ consumer (
Important Files Changed
Sequence DiagramsequenceDiagram
participant Sched as Reports Scheduler
participant Cache as Redis Cache
participant DB as PostgreSQL
participant MQ as RabbitMQ
participant Gen as ReportsGenerator
Sched->>Cache: try_acquire_lock(LOCK_KEY, 900s)
Cache-->>Sched: Ok(true)
Sched->>Cache: get(LAST_CHECK_KEY)
Cache-->>Sched: last_check_ts
Sched->>Sched: hour_boundaries_between(last_check, now)
alt No hour boundaries
Sched->>Cache: insert(LAST_CHECK_KEY, now)
Sched-->>Sched: return Ok(())
else Has hour boundaries
loop For each (weekday, hour, triggered_at)
Sched->>DB: get_reports_for_weekday_and_hour(weekday, hour)
DB-->>Sched: reports[]
loop For each report
Sched->>MQ: push_to_reports_queue(ReportTriggerMessage)
alt Enqueue succeeds
MQ-->>Sched: Ok(())
else Enqueue fails
MQ-->>Sched: Err(e)
Sched->>Sched: all_enqueued = false
Note over Sched: Logs error, continues loop
end
end
alt all_enqueued = false
Sched->>Sched: warn + return Ok(())
Note over Sched,Cache: Checkpoint NOT advanced; bucket retried next tick
Note over MQ,Gen: Already-enqueued reports will be processed AGAIN (no idempotency)
else all_enqueued = true
Sched->>Cache: insert(LAST_CHECK_KEY, triggered_at)
Cache-->>Sched: Ok(())
end
end
end
Sched->>Cache: release_lock(LOCK_KEY)
MQ-->>Gen: deliver ReportTriggerMessage
Gen->>Gen: generate report + send notifications
Note over Gen: No dedup check on (report_id, triggered_at)
Reviews (1): Last reviewed commit: "fix: defer report scheduler checkpoint u..." | Re-trigger Greptile |
Move the REPORT_SCHEDULER_LAST_CHECK_CACHE_KEY checkpoint update from before the enqueue loop to after each hour bucket is fully processed. Previously the checkpoint was advanced to 'now' before any messages were published. If the process crashed or the message queue was unavailable, the pending hour buckets were permanently skipped because the next run would start from the already-advanced checkpoint. Changes: - Advance checkpoint incrementally per hour bucket, only after all reports in that bucket have been successfully enqueued. - Stop advancing checkpoint on first enqueue failure so the failed hour bucket (and any remaining ones) are retried on the next tick. - Propagate cache.insert errors via '?' instead of silently ignoring them with 'let _ ='. - When no hour boundaries need processing, still advance checkpoint to avoid re-scanning the same sub-hour window. Fixes lmnr-ai#1586
8242593 to
3279eb5
Compare
|
Hey @dinmukhamedm — this addresses #1586, the checkpoint race in the reports scheduler. The core issue is that the checkpoint was moving forward before the enqueue loop finished, so a crash or MQ hiccup would silently drop scheduled reports with no retry. The fix is pretty small — just defers the checkpoint write to after each hour bucket is fully enqueued, and returns an error on partial failure so it actually shows up in logs. I also addressed the Greptile bot's review feedback (returning Err instead of Ok on failure for proper error-level logging). Re: the duplicate notification concern the bot flagged — that's a pre-existing at-least-once delivery gap in the consumer, not something this PR introduces. Happy to open a follow-up issue for adding idempotency on the generator side if that'd be useful. Would appreciate a review when you get a chance, thanks! |
Summary
Fixes #1586 — the reports scheduler was advancing its checkpoint before the enqueue loop completed, causing silently missed report triggers on process crash or MQ failure.
Problem
In
check_and_enqueue, theREPORT_SCHEDULER_LAST_CHECK_CACHE_KEYwas updated tonowat the top of the function (line 99), before any messages were actually published to the queue. This created a race condition:push_to_reports_queuefailed (e.g. RabbitMQ down), errors were only logged andOk(())was still returned, so the checkpoint moved forward and the failed triggers were never retried.cache.insert(...)was discarded withlet _ =, hiding any cache write failures.Fix
cache.insert(...)errors are now propagated via?instead of silently ignored, so the caller can log and handle them.nowto prevent re-scanning the same sub-hour window.Testing
cargo checkpasses with no compilation errors.app-server/src/reports/scheduler.rsis modified (23 insertions, 4 deletions).Note
Medium Risk
Changes the report scheduling checkpointing semantics and error handling; a mistake could cause duplicate triggers or stalled scheduling if failures aren’t handled as expected.
Overview
Fixes the reports scheduler so the
REPORT_SCHEDULER_LAST_CHECK_CACHE_KEYcheckpoint is not advanced until report triggers are actually enqueued.Checkpoint writes now happen per processed hour bucket (and propagate cache write errors), while enqueue failures cause the function to error out and hold the checkpoint for retry; when there are no hour boundaries, the checkpoint still advances to
nowto avoid rescanning the same sub-hour window.Reviewed by Cursor Bugbot for commit 3279eb5. Bugbot is set up for automated code reviews on this repo. Configure here.