Skip to content

Commit 7356f5c

Browse files
aster-voidclaude
andcommitted
scheduler: add per-job concurrency control
Add concurrency strategies to control overlapping job executions: - parallel: allow concurrent runs - wait: wait for previous run to complete - skip: skip if previous run still active (default) - replace: kill previous run, start new 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent 5fc9f7d commit 7356f5c

File tree

3 files changed

+153
-5
lines changed

3 files changed

+153
-5
lines changed

SPEC.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,18 @@ jobs:
4848
cron: "<cron-expr>" # Required: 5-field cron expression
4949
run: <string> # Required: shell command to execute
5050
timeout: <duration> # Optional: default "10s"
51+
concurrency: <strategy> # Optional: default "skip"
5152
```
5253
54+
### Concurrency Strategies
55+
56+
| Strategy | Behavior |
57+
|----------|----------|
58+
| `parallel` | Allow concurrent runs |
59+
| `wait` | Wait for previous run to complete |
60+
| `skip` | Skip if previous run still active (default) |
61+
| `replace` | Kill previous run, start new |
62+
5363
### Example
5464

5565
```yaml
@@ -60,11 +70,13 @@ jobs:
6070
cron: "0 * * * *"
6171
run: cargo build --release
6272
timeout: 5m
73+
concurrency: wait
6374
6475
health-check:
6576
schedule:
6677
cron: "*/5 * * * *"
6778
run: curl -f http://localhost/health
79+
concurrency: skip
6880
```
6981

7082
### Cron Expression

src/config.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,16 @@ use std::collections::HashMap;
55
use std::str::FromStr;
66
use std::time::Duration;
77

8+
#[derive(Debug, Clone, Default, Deserialize, PartialEq)]
9+
#[serde(rename_all = "lowercase")]
10+
pub enum Concurrency {
11+
Parallel,
12+
Wait,
13+
#[default]
14+
Skip,
15+
Replace,
16+
}
17+
818
#[derive(Debug, Deserialize)]
919
pub struct Config {
1020
pub jobs: HashMap<String, JobConfig>,
@@ -17,6 +27,8 @@ pub struct JobConfig {
1727
pub run: String,
1828
#[serde(default = "default_timeout")]
1929
pub timeout: String,
30+
#[serde(default)]
31+
pub concurrency: Concurrency,
2032
}
2133

2234
#[derive(Debug, Deserialize)]
@@ -35,6 +47,7 @@ pub struct Job {
3547
pub schedule: Schedule,
3648
pub command: String,
3749
pub timeout: Duration,
50+
pub concurrency: Concurrency,
3851
}
3952

4053
pub fn parse_config(content: &str) -> Result<Vec<Job>> {
@@ -60,6 +73,7 @@ pub fn parse_config(content: &str) -> Result<Vec<Job>> {
6073
schedule,
6174
command: job.run,
6275
timeout,
76+
concurrency: job.concurrency,
6377
})
6478
})
6579
.collect()
@@ -153,4 +167,51 @@ jobs:
153167
assert_eq!(parse_duration("5m").unwrap(), Duration::from_secs(300));
154168
assert_eq!(parse_duration("1h").unwrap(), Duration::from_secs(3600));
155169
}
170+
171+
#[test]
172+
fn parse_concurrency_default() {
173+
let yaml = r#"
174+
jobs:
175+
test:
176+
schedule:
177+
cron: "* * * * *"
178+
run: echo test
179+
"#;
180+
let jobs = parse_config(yaml).unwrap();
181+
assert_eq!(jobs[0].concurrency, Concurrency::Skip);
182+
}
183+
184+
#[test]
185+
fn parse_concurrency_all_variants() {
186+
let yaml = r#"
187+
jobs:
188+
parallel_job:
189+
schedule:
190+
cron: "* * * * *"
191+
run: echo 1
192+
concurrency: parallel
193+
wait_job:
194+
schedule:
195+
cron: "* * * * *"
196+
run: echo 2
197+
concurrency: wait
198+
skip_job:
199+
schedule:
200+
cron: "* * * * *"
201+
run: echo 3
202+
concurrency: skip
203+
replace_job:
204+
schedule:
205+
cron: "* * * * *"
206+
run: echo 4
207+
concurrency: replace
208+
"#;
209+
let jobs = parse_config(yaml).unwrap();
210+
let find = |id: &str| jobs.iter().find(|j| j.id == id).unwrap();
211+
212+
assert_eq!(find("parallel_job").concurrency, Concurrency::Parallel);
213+
assert_eq!(find("wait_job").concurrency, Concurrency::Wait);
214+
assert_eq!(find("skip_job").concurrency, Concurrency::Skip);
215+
assert_eq!(find("replace_job").concurrency, Concurrency::Replace);
216+
}
156217
}

src/scheduler.rs

Lines changed: 80 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,29 @@
1-
use crate::config::Job;
1+
use crate::config::{Concurrency, Job};
22
use crate::git;
33
use anyhow::Result;
44
use chrono::Utc;
5+
use std::collections::HashMap;
56
use std::path::PathBuf;
67
use std::process::Command;
8+
use std::sync::Arc;
79
use std::time::Duration;
10+
use tokio::sync::Mutex;
11+
use tokio::task::JoinHandle;
812
use tokio::time::sleep;
913

1014
pub async fn run_scheduler(jobs: Vec<Job>, sot_path: PathBuf) -> Result<()> {
15+
let running: Arc<Mutex<HashMap<String, Vec<JoinHandle<()>>>>> =
16+
Arc::new(Mutex::new(HashMap::new()));
17+
1118
loop {
1219
let now = Utc::now();
1320

1421
for job in &jobs {
1522
if let Some(next) = job.schedule.upcoming(Utc).next() {
1623
let until_next = (next - now).num_milliseconds();
1724
if until_next <= 1000 && until_next >= 0 {
18-
let job = job.clone();
1925
let work_dir = git::get_job_dir(&sot_path, &job.id);
20-
tokio::spawn(async move {
21-
execute_job(&job, &work_dir).await;
22-
});
26+
handle_job_trigger(job, work_dir, &running).await;
2327
}
2428
}
2529
}
@@ -28,6 +32,76 @@ pub async fn run_scheduler(jobs: Vec<Job>, sot_path: PathBuf) -> Result<()> {
2832
}
2933
}
3034

35+
async fn handle_job_trigger(
36+
job: &Job,
37+
work_dir: PathBuf,
38+
running: &Arc<Mutex<HashMap<String, Vec<JoinHandle<()>>>>>,
39+
) {
40+
let tag = format!("[job:{}]", job.id);
41+
let mut map = running.lock().await;
42+
43+
// Clean up finished jobs
44+
if let Some(handles) = map.get_mut(&job.id) {
45+
handles.retain(|h| !h.is_finished());
46+
if handles.is_empty() {
47+
map.remove(&job.id);
48+
}
49+
}
50+
51+
let running_count = map.get(&job.id).map(|v| v.len()).unwrap_or(0);
52+
53+
match job.concurrency {
54+
Concurrency::Parallel => {
55+
spawn_job(job, work_dir, &mut map);
56+
}
57+
Concurrency::Wait => {
58+
if running_count > 0 {
59+
println!("{} Waiting for {} previous run(s) to complete", tag, running_count);
60+
let handles = map.remove(&job.id).unwrap();
61+
drop(map); // Release lock while waiting
62+
for handle in handles {
63+
let _ = handle.await;
64+
}
65+
let mut map = running.lock().await;
66+
spawn_job(job, work_dir, &mut map);
67+
} else {
68+
spawn_job(job, work_dir, &mut map);
69+
}
70+
}
71+
Concurrency::Skip => {
72+
if running_count > 0 {
73+
println!("{} Skipped ({} instance(s) still active)", tag, running_count);
74+
} else {
75+
spawn_job(job, work_dir, &mut map);
76+
}
77+
}
78+
Concurrency::Replace => {
79+
if running_count > 0 {
80+
println!("{} Replacing {} previous run(s)", tag, running_count);
81+
let handles = map.remove(&job.id).unwrap();
82+
for handle in handles {
83+
handle.abort();
84+
}
85+
}
86+
spawn_job(job, work_dir, &mut map);
87+
}
88+
}
89+
}
90+
91+
fn spawn_job(
92+
job: &Job,
93+
work_dir: PathBuf,
94+
map: &mut HashMap<String, Vec<JoinHandle<()>>>,
95+
) {
96+
let job_id = job.id.clone();
97+
let job = job.clone();
98+
let handle = tokio::spawn(async move {
99+
execute_job(&job, &work_dir).await;
100+
});
101+
102+
map.entry(job_id).or_default().push(handle);
103+
}
104+
31105
async fn execute_job(job: &Job, work_dir: &PathBuf) {
32106
let tag = format!("[job:{}]", job.id);
33107

@@ -101,6 +175,7 @@ mod tests {
101175
schedule: Schedule::from_str("* * * * * *").unwrap(),
102176
command: cmd.to_string(),
103177
timeout: Duration::from_secs(timeout_secs),
178+
concurrency: Concurrency::Skip,
104179
}
105180
}
106181

0 commit comments

Comments
 (0)