Skip to content

Commit 09856e5

Browse files
committed
Avoid races on cron
1 parent c963d7a commit 09856e5

File tree

2 files changed

+57
-4
lines changed

2 files changed

+57
-4
lines changed

src/cron/mod.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,13 @@ pub struct CronService<C: Clock> {
6060
impl<C: Clock> CronService<C> {
6161
/// Create a new cron service.
6262
pub fn new(clock: C, config: CronConfig) -> Result<Self> {
63-
let store = CronStore::load()?;
63+
let mut store = CronStore::load()?;
64+
65+
let recovered = store.recover_stuck_jobs(clock.now_millis());
66+
if recovered > 0 {
67+
info!("Recovered {} stuck cron job(s) from previous run", recovered);
68+
let _ = store.save();
69+
}
6470

6571
Ok(Self {
6672
clock,
@@ -93,12 +99,12 @@ impl<C: Clock> CronService<C> {
9399
break;
94100
}
95101
_ = clock.sleep(tick_interval) => {
96-
// Reload store from disk to pick up external changes
97-
// (e.g., agent modifying cron.json directly)
102+
// Merge disk changes (e.g., agent modifying cron.json)
103+
// while preserving in-memory state for running jobs
98104
{
99105
let mut store_guard = store.lock().await;
100106
match CronStore::load() {
101-
Ok(fresh) => *store_guard = fresh,
107+
Ok(fresh) => store_guard.merge_from_disk(fresh),
102108
Err(e) => warn!("Failed to reload cron store: {}", e),
103109
}
104110
}

src/cron/store.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,53 @@ impl CronStore {
243243
pub fn get_enabled_jobs(&self) -> Vec<&CronJob> {
244244
self.jobs.values().filter(|j| j.enabled).collect()
245245
}
246+
247+
/// Reset any jobs stuck in Running state (e.g., after a crash).
248+
/// Recalculates next_run_at so they get scheduled again.
249+
pub fn recover_stuck_jobs(&mut self, now_ms: u64) -> usize {
250+
let stuck_ids: Vec<JobId> = self
251+
.jobs
252+
.values()
253+
.filter(|j| j.state.last_status == JobStatus::Running)
254+
.map(|j| j.id.clone())
255+
.collect();
256+
257+
let count = stuck_ids.len();
258+
for id in &stuck_ids {
259+
if let Some(job) = self.jobs.get_mut(id) {
260+
job.state.last_status = JobStatus::Success;
261+
job.update_next_run(now_ms);
262+
}
263+
}
264+
265+
count
266+
}
267+
268+
/// Merge disk state into the current store, preserving in-flight job states.
269+
/// Jobs currently marked as Running in memory keep their in-memory state
270+
/// to avoid losing completion updates from concurrent tasks.
271+
pub fn merge_from_disk(&mut self, disk: CronStore) {
272+
let running_ids: std::collections::HashSet<String> = self
273+
.jobs
274+
.values()
275+
.filter(|j| j.state.last_status == JobStatus::Running)
276+
.map(|j| j.id.clone())
277+
.collect();
278+
279+
let disk_ids: std::collections::HashSet<String> =
280+
disk.jobs.keys().cloned().collect();
281+
282+
for (id, disk_job) in disk.jobs {
283+
if running_ids.contains(&id) {
284+
continue;
285+
}
286+
self.jobs.insert(id, disk_job);
287+
}
288+
289+
self.jobs.retain(|id, _| {
290+
disk_ids.contains(id) || running_ids.contains(id)
291+
});
292+
}
246293
}
247294

248295
/// Generate a unique job ID.

0 commit comments

Comments
 (0)