Skip to content

Commit 5b271e7

Browse files
aster-voidclaude
andcommitted
scheduler: add retry with exponential backoff
🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent 1152456 commit 5b271e7

File tree

3 files changed

+179
-7
lines changed

3 files changed

+179
-7
lines changed

CLAUDE.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,13 @@ struct Job {
2424
schedule: cron::Schedule,
2525
command: String,
2626
timeout: Duration,
27+
concurrency: Concurrency,
28+
retry: Option<RetryConfig>,
29+
}
30+
31+
struct RetryConfig {
32+
max: u32, // Max retry attempts
33+
delay: Duration, // Initial delay (exponential backoff)
2734
}
2835

2936
// Parsed from rollcron.yaml
@@ -42,6 +49,10 @@ jobs:
4249
cron: "*/5 * * * *"
4350
run: echo hello
4451
timeout: 10s # Optional (default: 10s)
52+
concurrency: skip # Optional: parallel|wait|skip|replace (default: skip)
53+
retry: # Optional
54+
max: 3 # Max retry attempts
55+
delay: 1s # Initial delay (default: 1s), exponential backoff
4556
```
4657
4758
## Runtime Directory Layout

src/config.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,19 @@ pub struct JobConfig {
2929
pub timeout: String,
3030
#[serde(default)]
3131
pub concurrency: Concurrency,
32+
pub retry: Option<RetryConfigRaw>,
33+
}
34+
35+
#[derive(Debug, Deserialize)]
36+
pub struct RetryConfigRaw {
37+
#[serde(default)]
38+
pub max: u32,
39+
#[serde(default = "default_retry_delay")]
40+
pub delay: String,
41+
}
42+
43+
fn default_retry_delay() -> String {
44+
"1s".to_string()
3245
}
3346

3447
#[derive(Debug, Deserialize)]
@@ -48,6 +61,13 @@ pub struct Job {
4861
pub command: String,
4962
pub timeout: Duration,
5063
pub concurrency: Concurrency,
64+
pub retry: Option<RetryConfig>,
65+
}
66+
67+
#[derive(Debug, Clone)]
68+
pub struct RetryConfig {
69+
pub max: u32,
70+
pub delay: Duration,
5171
}
5272

5373
pub fn parse_config(content: &str) -> Result<Vec<Job>> {
@@ -67,13 +87,23 @@ pub fn parse_config(content: &str) -> Result<Vec<Job>> {
6787

6888
let name = job.name.unwrap_or_else(|| id.clone());
6989

90+
let retry = job
91+
.retry
92+
.map(|r| {
93+
let delay = parse_duration(&r.delay)
94+
.map_err(|e| anyhow!("Invalid retry delay '{}' in job '{}': {}", r.delay, id, e))?;
95+
Ok::<_, anyhow::Error>(RetryConfig { max: r.max, delay })
96+
})
97+
.transpose()?;
98+
7099
Ok(Job {
71100
id,
72101
name,
73102
schedule,
74103
command: job.run,
75104
timeout,
76105
concurrency: job.concurrency,
106+
retry,
77107
})
78108
})
79109
.collect()
@@ -214,4 +244,47 @@ jobs:
214244
assert_eq!(find("skip_job").concurrency, Concurrency::Skip);
215245
assert_eq!(find("replace_job").concurrency, Concurrency::Replace);
216246
}
247+
248+
#[test]
249+
fn parse_retry_config() {
250+
let yaml = r#"
251+
jobs:
252+
with_retry:
253+
schedule:
254+
cron: "* * * * *"
255+
run: echo test
256+
retry:
257+
max: 3
258+
delay: 2s
259+
no_retry:
260+
schedule:
261+
cron: "* * * * *"
262+
run: echo test
263+
"#;
264+
let jobs = parse_config(yaml).unwrap();
265+
let find = |id: &str| jobs.iter().find(|j| j.id == id).unwrap();
266+
267+
let retry = find("with_retry").retry.as_ref().unwrap();
268+
assert_eq!(retry.max, 3);
269+
assert_eq!(retry.delay, Duration::from_secs(2));
270+
271+
assert!(find("no_retry").retry.is_none());
272+
}
273+
274+
#[test]
275+
fn parse_retry_default_delay() {
276+
let yaml = r#"
277+
jobs:
278+
test:
279+
schedule:
280+
cron: "* * * * *"
281+
run: echo test
282+
retry:
283+
max: 2
284+
"#;
285+
let jobs = parse_config(yaml).unwrap();
286+
let retry = jobs[0].retry.as_ref().unwrap();
287+
assert_eq!(retry.max, 2);
288+
assert_eq!(retry.delay, Duration::from_secs(1)); // default delay
289+
}
217290
}

src/scheduler.rs

Lines changed: 95 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::config::{Concurrency, Job};
1+
use crate::config::{Concurrency, Job, RetryConfig};
22
use crate::git;
33
use anyhow::Result;
44
use chrono::Utc;
@@ -104,10 +104,36 @@ fn spawn_job(
104104

105105
async fn execute_job(job: &Job, work_dir: &PathBuf) {
106106
let tag = format!("[job:{}]", job.id);
107+
let max_attempts = job.retry.as_ref().map(|r| r.max + 1).unwrap_or(1);
107108

108-
println!("{} Starting '{}'", tag, job.name);
109-
println!("{} command: {}", tag, job.command);
109+
for attempt in 0..max_attempts {
110+
if attempt > 0 {
111+
let delay = calculate_backoff(job.retry.as_ref().unwrap(), attempt - 1);
112+
println!("{} Retry {}/{} after {:?}", tag, attempt, max_attempts - 1, delay);
113+
sleep(delay).await;
114+
}
115+
116+
println!("{} Starting '{}'", tag, job.name);
117+
println!("{} command: {}", tag, job.command);
118+
119+
let result = run_command(job, work_dir).await;
120+
let success = handle_result(&tag, job, &result);
121+
122+
if success {
123+
return;
124+
}
110125

126+
if attempt + 1 < max_attempts {
127+
println!("{} Will retry...", tag);
128+
}
129+
}
130+
}
131+
132+
fn calculate_backoff(retry: &RetryConfig, attempt: u32) -> Duration {
133+
retry.delay.saturating_mul(2u32.saturating_pow(attempt))
134+
}
135+
136+
async fn run_command(job: &Job, work_dir: &PathBuf) -> CommandResult {
111137
let result = tokio::time::timeout(job.timeout, async {
112138
tokio::task::spawn_blocking({
113139
let cmd = job.command.clone();
@@ -124,7 +150,23 @@ async fn execute_job(job: &Job, work_dir: &PathBuf) {
124150
.await;
125151

126152
match result {
127-
Ok(Ok(Ok(output))) => {
153+
Ok(Ok(Ok(output))) => CommandResult::Completed(output),
154+
Ok(Ok(Err(e))) => CommandResult::ExecError(e.to_string()),
155+
Ok(Err(e)) => CommandResult::TaskError(e.to_string()),
156+
Err(_) => CommandResult::Timeout,
157+
}
158+
}
159+
160+
enum CommandResult {
161+
Completed(std::process::Output),
162+
ExecError(String),
163+
TaskError(String),
164+
Timeout,
165+
}
166+
167+
fn handle_result(tag: &str, job: &Job, result: &CommandResult) -> bool {
168+
match result {
169+
CommandResult::Completed(output) => {
128170
let stdout = String::from_utf8_lossy(&output.stdout);
129171
let stderr = String::from_utf8_lossy(&output.stderr);
130172

@@ -135,6 +177,7 @@ async fn execute_job(job: &Job, work_dir: &PathBuf) {
135177
println!("{} | {}", tag, line);
136178
}
137179
}
180+
true
138181
} else {
139182
eprintln!("{} ✗ Failed (exit code: {:?})", tag, output.status.code());
140183
if !stderr.trim().is_empty() {
@@ -147,16 +190,20 @@ async fn execute_job(job: &Job, work_dir: &PathBuf) {
147190
eprintln!("{} | {}", tag, line);
148191
}
149192
}
193+
false
150194
}
151195
}
152-
Ok(Ok(Err(e))) => {
196+
CommandResult::ExecError(e) => {
153197
eprintln!("{} ✗ Failed to execute: {}", tag, e);
198+
false
154199
}
155-
Ok(Err(e)) => {
200+
CommandResult::TaskError(e) => {
156201
eprintln!("{} ✗ Task error: {}", tag, e);
202+
false
157203
}
158-
Err(_) => {
204+
CommandResult::Timeout => {
159205
eprintln!("{} ✗ Timeout after {:?}", tag, job.timeout);
206+
false
160207
}
161208
}
162209
}
@@ -176,6 +223,7 @@ mod tests {
176223
command: cmd.to_string(),
177224
timeout: Duration::from_secs(timeout_secs),
178225
concurrency: Concurrency::Skip,
226+
retry: None,
179227
}
180228
}
181229

@@ -192,4 +240,44 @@ mod tests {
192240
let dir = tempdir().unwrap();
193241
execute_job(&job, &dir.path().to_path_buf()).await;
194242
}
243+
244+
#[test]
245+
fn exponential_backoff_calculation() {
246+
let retry = RetryConfig {
247+
max: 5,
248+
delay: Duration::from_secs(1),
249+
};
250+
assert_eq!(calculate_backoff(&retry, 0), Duration::from_secs(1));
251+
assert_eq!(calculate_backoff(&retry, 1), Duration::from_secs(2));
252+
assert_eq!(calculate_backoff(&retry, 2), Duration::from_secs(4));
253+
assert_eq!(calculate_backoff(&retry, 3), Duration::from_secs(8));
254+
}
255+
256+
#[tokio::test]
257+
async fn job_retry_on_failure() {
258+
let mut job = make_job("exit 1", 10);
259+
job.retry = Some(RetryConfig {
260+
max: 2,
261+
delay: Duration::from_millis(10),
262+
});
263+
let dir = tempdir().unwrap();
264+
let start = std::time::Instant::now();
265+
execute_job(&job, &dir.path().to_path_buf()).await;
266+
// Should have waited at least 10ms + 20ms = 30ms for 2 retries
267+
assert!(start.elapsed() >= Duration::from_millis(30));
268+
}
269+
270+
#[tokio::test]
271+
async fn job_success_no_retry() {
272+
let mut job = make_job("echo ok", 10);
273+
job.retry = Some(RetryConfig {
274+
max: 3,
275+
delay: Duration::from_millis(100),
276+
});
277+
let dir = tempdir().unwrap();
278+
let start = std::time::Instant::now();
279+
execute_job(&job, &dir.path().to_path_buf()).await;
280+
// Should complete quickly without retries
281+
assert!(start.elapsed() < Duration::from_millis(100));
282+
}
195283
}

0 commit comments

Comments
 (0)