Skip to content

Commit a06b36b

Browse files
aster-voidclaude
andcommitted
scheduler: add jitter support
- Add task jitter (random delay before execution) - Add retry jitter (random variation in backoff delay) - Auto-infer retry jitter as 25% of delay when not specified - Add milliseconds (ms) support to duration parsing 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent 079f308 commit a06b36b

File tree

5 files changed

+268
-9
lines changed

5 files changed

+268
-9
lines changed

CLAUDE.md

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,16 +26,19 @@ struct Job {
2626
timeout: Duration,
2727
concurrency: Concurrency,
2828
retry: Option<RetryConfig>,
29+
jitter: Option<Duration>, // Random delay before execution (0 to jitter)
2930
}
3031

3132
struct RetryConfig {
3233
max: u32, // Max retry attempts
3334
delay: Duration, // Initial delay (exponential backoff)
35+
jitter: Option<Duration>, // Random variation added to retry delay (0 to jitter)
36+
// Auto-inferred as 25% of delay when not set
3437
}
3538

3639
// Parsed from rollcron.yaml
3740
struct Config { jobs: HashMap<String, JobConfig> }
38-
struct JobConfig { name: Option<String>, schedule: ScheduleConfig, run, timeout }
41+
struct JobConfig { name: Option<String>, schedule: ScheduleConfig, run, timeout, jitter }
3942
struct ScheduleConfig { cron: String }
4043
```
4144

@@ -49,10 +52,13 @@ jobs:
4952
cron: "*/5 * * * *"
5053
run: echo hello
5154
timeout: 10s # Optional (default: 10s)
55+
jitter: 30s # Optional: random delay 0-30s before execution
5256
concurrency: skip # Optional: parallel|wait|skip|replace (default: skip)
5357
retry: # Optional
5458
max: 3 # Max retry attempts
5559
delay: 1s # Initial delay (default: 1s), exponential backoff
60+
jitter: 500ms # Optional: random variation 0-500ms added to retry delay
61+
# If omitted, auto-inferred as 25% of delay (e.g., 250ms for 1s delay)
5662
```
5763

5864
## Runtime Directory Layout
@@ -92,7 +98,9 @@ jobs:
9298
1. Scheduler polls every 1 second
9399
2. Check each job's cron schedule
94100
3. If due: spawn task in job's directory (by ID) with timeout
95-
4. Log using display name
101+
4. Apply task jitter (random delay 0 to jitter) before first execution
102+
5. On failure: apply exponential backoff + retry jitter before retry
103+
6. Log using display name
96104

97105
## Constraints
98106

Cargo.lock

Lines changed: 60 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ clap = { version = "4", features = ["derive"] }
1313
dirs = "6"
1414
serde = { version = "1", features = ["derive"] }
1515
serde_yaml = "0.9"
16+
rand = "0.8"
1617

1718
[dev-dependencies]
1819
tempfile = "3"

src/config.rs

Lines changed: 76 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ pub struct JobConfig {
4444
pub concurrency: Concurrency,
4545
pub retry: Option<RetryConfigRaw>,
4646
pub working_dir: Option<String>,
47+
pub jitter: Option<String>,
4748
}
4849

4950
#[derive(Debug, Deserialize)]
@@ -52,6 +53,7 @@ pub struct RetryConfigRaw {
5253
pub max: u32,
5354
#[serde(default = "default_retry_delay")]
5455
pub delay: String,
56+
pub jitter: Option<String>,
5557
}
5658

5759
fn default_retry_delay() -> String {
@@ -77,12 +79,14 @@ pub struct Job {
7779
pub concurrency: Concurrency,
7880
pub retry: Option<RetryConfig>,
7981
pub working_dir: Option<String>,
82+
pub jitter: Option<Duration>,
8083
}
8184

8285
#[derive(Debug, Clone)]
8386
pub struct RetryConfig {
8487
pub max: u32,
8588
pub delay: Duration,
89+
pub jitter: Option<Duration>,
8690
}
8791

8892
pub fn parse_config(content: &str) -> Result<(RunnerConfig, Vec<Job>)> {
@@ -116,10 +120,20 @@ pub fn parse_config(content: &str) -> Result<(RunnerConfig, Vec<Job>)> {
116120
.map(|r| {
117121
let delay = parse_duration(&r.delay)
118122
.map_err(|e| anyhow!("Invalid retry delay '{}' in job '{}': {}", r.delay, id, e))?;
119-
Ok::<_, anyhow::Error>(RetryConfig { max: r.max, delay })
123+
let jitter = r.jitter
124+
.map(|j| parse_duration(&j)
125+
.map_err(|e| anyhow!("Invalid retry jitter '{}' in job '{}': {}", j, id, e)))
126+
.transpose()?;
127+
Ok::<_, anyhow::Error>(RetryConfig { max: r.max, delay, jitter })
120128
})
121129
.transpose()?;
122130

131+
let jitter = job
132+
.jitter
133+
.map(|j| parse_duration(&j)
134+
.map_err(|e| anyhow!("Invalid jitter '{}' in job '{}': {}", j, id, e)))
135+
.transpose()?;
136+
123137
Ok(Job {
124138
id,
125139
name,
@@ -129,6 +143,7 @@ pub fn parse_config(content: &str) -> Result<(RunnerConfig, Vec<Job>)> {
129143
concurrency: job.concurrency,
130144
retry,
131145
working_dir: job.working_dir,
146+
jitter,
132147
})
133148
})
134149
.collect::<Result<Vec<_>>>()?;
@@ -138,7 +153,9 @@ pub fn parse_config(content: &str) -> Result<(RunnerConfig, Vec<Job>)> {
138153

139154
fn parse_duration(s: &str) -> Result<Duration> {
140155
let s = s.trim();
141-
if let Some(secs) = s.strip_suffix('s') {
156+
if let Some(millis) = s.strip_suffix("ms") {
157+
Ok(Duration::from_millis(millis.parse()?))
158+
} else if let Some(secs) = s.strip_suffix('s') {
142159
Ok(Duration::from_secs(secs.parse()?))
143160
} else if let Some(mins) = s.strip_suffix('m') {
144161
Ok(Duration::from_secs(mins.parse::<u64>()? * 60))
@@ -370,4 +387,61 @@ jobs:
370387
"#;
371388
assert!(parse_config(yaml).is_err());
372389
}
390+
391+
#[test]
392+
fn parse_jitter() {
393+
let yaml = r#"
394+
jobs:
395+
with_jitter:
396+
schedule:
397+
cron: "* * * * *"
398+
run: echo test
399+
jitter: 30s
400+
no_jitter:
401+
schedule:
402+
cron: "* * * * *"
403+
run: echo test
404+
"#;
405+
let (_, jobs) = parse_config(yaml).unwrap();
406+
let find = |id: &str| jobs.iter().find(|j| j.id == id).unwrap();
407+
408+
assert_eq!(find("with_jitter").jitter, Some(Duration::from_secs(30)));
409+
assert!(find("no_jitter").jitter.is_none());
410+
}
411+
412+
#[test]
413+
fn parse_retry_jitter() {
414+
let yaml = r#"
415+
jobs:
416+
with_retry_jitter:
417+
schedule:
418+
cron: "* * * * *"
419+
run: echo test
420+
retry:
421+
max: 3
422+
delay: 1s
423+
jitter: 500ms
424+
retry_no_jitter:
425+
schedule:
426+
cron: "* * * * *"
427+
run: echo test
428+
retry:
429+
max: 2
430+
delay: 1s
431+
"#;
432+
let (_, jobs) = parse_config(yaml).unwrap();
433+
let find = |id: &str| jobs.iter().find(|j| j.id == id).unwrap();
434+
435+
let retry1 = find("with_retry_jitter").retry.as_ref().unwrap();
436+
assert_eq!(retry1.jitter, Some(Duration::from_millis(500)));
437+
438+
let retry2 = find("retry_no_jitter").retry.as_ref().unwrap();
439+
assert!(retry2.jitter.is_none());
440+
}
441+
442+
#[test]
443+
fn parse_duration_milliseconds() {
444+
assert_eq!(parse_duration("500ms").unwrap(), Duration::from_millis(500));
445+
assert_eq!(parse_duration("1000ms").unwrap(), Duration::from_millis(1000));
446+
}
373447
}

0 commit comments

Comments
 (0)