Skip to content

Commit e3c3cfb

Browse files
aster-voidclaude
andcommitted
treewide: fix security vulnerabilities and race conditions
- scheduler: use tokio::process with kill_on_drop for proper timeout handling - config: validate job_id against path traversal (alphanumeric, underscore, hyphen only) - config: reject retry.max=0 (must be at least 1) - git: add --no-absolute-file-names to tar extraction - git: fix TOCTOU race with atomic temp dir + rename - git: check tar exit code and cleanup on failure - git: replace unwrap() with proper error handling - main: handle scheduler errors with 5s restart delay - main: skip sync for running jobs to prevent workdir corruption - main: persist job_handles across scheduler restarts 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent 01cf082 commit e3c3cfb

File tree

4 files changed

+273
-60
lines changed

4 files changed

+273
-60
lines changed

src/config.rs

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,22 @@ use std::collections::HashMap;
66
use std::str::FromStr;
77
use std::time::Duration;
88

9+
fn validate_job_id(id: &str) -> Result<()> {
10+
if id.is_empty() {
11+
anyhow::bail!("Job ID cannot be empty");
12+
}
13+
if !id
14+
.chars()
15+
.all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '-')
16+
{
17+
anyhow::bail!(
18+
"Invalid job ID '{}': must contain only alphanumeric characters, underscores, and hyphens",
19+
id
20+
);
21+
}
22+
Ok(())
23+
}
24+
925
#[derive(Debug, Clone, Default, PartialEq)]
1026
pub enum TimezoneConfig {
1127
#[default]
@@ -116,6 +132,8 @@ pub fn parse_config(content: &str) -> Result<(RunnerConfig, Vec<Job>)> {
116132
.jobs
117133
.into_iter()
118134
.map(|(id, job)| {
135+
validate_job_id(&id)?;
136+
119137
let schedule_str = format!("{} *", job.schedule.cron);
120138
let schedule = Schedule::from_str(&schedule_str)
121139
.map_err(|e| anyhow!("Invalid cron '{}' in job '{}': {}", job.schedule.cron, id, e))?;
@@ -128,6 +146,12 @@ pub fn parse_config(content: &str) -> Result<(RunnerConfig, Vec<Job>)> {
128146
let retry = job
129147
.retry
130148
.map(|r| {
149+
if r.max == 0 {
150+
anyhow::bail!(
151+
"Invalid retry.max '0' in job '{}': must be at least 1 (use no retry config to disable retries)",
152+
id
153+
);
154+
}
131155
let delay = parse_duration(&r.delay)
132156
.map_err(|e| anyhow!("Invalid retry delay '{}' in job '{}': {}", r.delay, id, e))?;
133157
let jitter = r.jitter
@@ -469,4 +493,55 @@ jobs:
469493
assert_eq!(parse_duration("500ms").unwrap(), Duration::from_millis(500));
470494
assert_eq!(parse_duration("1000ms").unwrap(), Duration::from_millis(1000));
471495
}
496+
497+
#[test]
498+
fn reject_invalid_job_id() {
499+
let yaml = r#"
500+
jobs:
501+
"../escape":
502+
schedule:
503+
cron: "* * * * *"
504+
run: echo test
505+
"#;
506+
assert!(parse_config(yaml).is_err());
507+
}
508+
509+
#[test]
510+
fn reject_job_id_with_slash() {
511+
let yaml = r#"
512+
jobs:
513+
"foo/bar":
514+
schedule:
515+
cron: "* * * * *"
516+
run: echo test
517+
"#;
518+
assert!(parse_config(yaml).is_err());
519+
}
520+
521+
#[test]
522+
fn accept_valid_job_id() {
523+
let yaml = r#"
524+
jobs:
525+
my-job_123:
526+
schedule:
527+
cron: "* * * * *"
528+
run: echo test
529+
"#;
530+
let (_, jobs) = parse_config(yaml).unwrap();
531+
assert_eq!(jobs[0].id, "my-job_123");
532+
}
533+
534+
#[test]
535+
fn reject_retry_max_zero() {
536+
let yaml = r#"
537+
jobs:
538+
test:
539+
schedule:
540+
cron: "* * * * *"
541+
run: echo test
542+
retry:
543+
max: 0
544+
"#;
545+
assert!(parse_config(yaml).is_err());
546+
}
472547
}

src/git.rs

Lines changed: 68 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
1-
use anyhow::Result;
1+
use anyhow::{Context, Result};
22
use std::path::{Path, PathBuf};
33
use std::process::Command;
44

55
/// Ensures repo is cloned/synced to cache. Returns cache path.
66
pub fn ensure_repo(source: &str) -> Result<PathBuf> {
77
let cache_dir = get_cache_dir(source)?;
8-
std::fs::create_dir_all(cache_dir.parent().unwrap())?;
8+
if let Some(parent) = cache_dir.parent() {
9+
std::fs::create_dir_all(parent)?;
10+
}
911

1012
if cache_dir.exists() {
1113
sync_repo(source, &cache_dir)?;
@@ -25,8 +27,11 @@ fn is_remote(source: &str) -> bool {
2527

2628
fn clone_repo(source: &str, dest: &Path) -> Result<()> {
2729
if is_remote(source) {
30+
let dest_str = dest
31+
.to_str()
32+
.context("Destination path contains invalid UTF-8")?;
2833
let output = Command::new("git")
29-
.args(["clone", source, dest.to_str().unwrap()])
34+
.args(["clone", source, dest_str])
3035
.output()?;
3136

3237
if !output.status.success() {
@@ -73,13 +78,17 @@ fn sync_repo(source: &str, dest: &Path) -> Result<()> {
7378
fn rsync_local(source: &str, dest: &Path) -> Result<()> {
7479
std::fs::create_dir_all(dest)?;
7580

81+
let dest_str = dest
82+
.to_str()
83+
.context("Destination path contains invalid UTF-8")?;
7684
let output = Command::new("rsync")
7785
.args([
7886
"-a",
7987
"--delete",
80-
"--exclude", ".git",
88+
"--exclude",
89+
".git",
8190
&format!("{}/", source),
82-
dest.to_str().unwrap(),
91+
dest_str,
8392
])
8493
.output()?;
8594

@@ -121,16 +130,33 @@ pub fn get_job_dir(sot_path: &Path, job_id: &str) -> PathBuf {
121130
.unwrap_or_else(|| PathBuf::from("/tmp"))
122131
.join("rollcron");
123132

124-
let sot_name = sot_path.file_name().unwrap().to_str().unwrap();
133+
let sot_name = sot_path
134+
.file_name()
135+
.and_then(|s| s.to_str())
136+
.unwrap_or("unknown");
125137

126138
cache_base.join(format!("{}@{}", sot_name, job_id))
127139
}
128140

129141
pub fn sync_to_job_dir(sot_path: &Path, job_dir: &Path) -> Result<()> {
130-
if job_dir.exists() {
131-
std::fs::remove_dir_all(job_dir)?;
142+
let sot_str = sot_path
143+
.to_str()
144+
.context("Source path contains invalid UTF-8")?;
145+
let job_dir_str = job_dir
146+
.to_str()
147+
.context("Job directory path contains invalid UTF-8")?;
148+
149+
// Use atomic temp directory to avoid TOCTOU race condition
150+
let temp_dir = job_dir.with_extension("tmp");
151+
let temp_dir_str = temp_dir
152+
.to_str()
153+
.context("Temp directory path contains invalid UTF-8")?;
154+
155+
// Clean up any leftover temp directory
156+
if temp_dir.exists() {
157+
std::fs::remove_dir_all(&temp_dir)?;
132158
}
133-
std::fs::create_dir_all(job_dir)?;
159+
std::fs::create_dir_all(&temp_dir)?;
134160

135161
// Check if .git exists (remote repos have it, local rsync'd repos don't)
136162
if sot_path.join(".git").exists() {
@@ -141,35 +167,61 @@ pub fn sync_to_job_dir(sot_path: &Path, job_dir: &Path) -> Result<()> {
141167
.output()?;
142168

143169
if !archive.status.success() {
170+
std::fs::remove_dir_all(&temp_dir)?;
144171
let stderr = String::from_utf8_lossy(&archive.stderr);
145172
anyhow::bail!("git archive failed: {}", stderr);
146173
}
147174

148-
let extract = Command::new("tar")
149-
.args(["-x"])
150-
.current_dir(job_dir)
175+
// Extract with security flags to prevent path traversal
176+
let mut extract = Command::new("tar")
177+
.args(["--no-absolute-file-names", "-x"])
178+
.current_dir(&temp_dir)
151179
.stdin(std::process::Stdio::piped())
152180
.spawn()?;
153181

154-
use std::io::Write;
155-
extract.stdin.unwrap().write_all(&archive.stdout)?;
182+
{
183+
use std::io::Write;
184+
let stdin = extract
185+
.stdin
186+
.as_mut()
187+
.context("Failed to open tar stdin")?;
188+
stdin.write_all(&archive.stdout)?;
189+
}
190+
191+
let status = extract.wait()?;
192+
if !status.success() {
193+
std::fs::remove_dir_all(&temp_dir)?;
194+
anyhow::bail!("tar extraction failed with exit code: {:?}", status.code());
195+
}
156196
} else {
157197
// For non-git dirs (rsync'd local repos), use rsync
158198
let output = Command::new("rsync")
159199
.args([
160200
"-a",
161201
"--delete",
162-
&format!("{}/", sot_path.to_str().unwrap()),
163-
job_dir.to_str().unwrap(),
202+
&format!("{}/", sot_str),
203+
temp_dir_str,
164204
])
165205
.output()?;
166206

167207
if !output.status.success() {
208+
std::fs::remove_dir_all(&temp_dir)?;
168209
let stderr = String::from_utf8_lossy(&output.stderr);
169210
anyhow::bail!("rsync failed: {}", stderr);
170211
}
171212
}
172213

214+
// Atomic swap: remove old, rename temp to target
215+
if job_dir.exists() {
216+
std::fs::remove_dir_all(job_dir)?;
217+
}
218+
std::fs::rename(&temp_dir, job_dir).with_context(|| {
219+
format!(
220+
"Failed to rename {} to {}",
221+
temp_dir_str, job_dir_str
222+
)
223+
})?;
224+
173225
Ok(())
174226
}
175227

src/main.rs

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,19 @@ mod scheduler;
44

55
use anyhow::Result;
66
use clap::Parser;
7+
use std::collections::HashMap;
78
use std::path::PathBuf;
9+
use std::sync::Arc;
810
use std::time::Duration;
11+
use tokio::sync::RwLock;
912
use tokio::time::interval;
1013

14+
/// Tracks running job counts (supports multiple concurrent instances)
15+
pub type RunningJobs = Arc<RwLock<HashMap<String, usize>>>;
16+
17+
/// Tracks running job handles (persists across scheduler restarts)
18+
pub type JobHandles = Arc<tokio::sync::Mutex<HashMap<String, Vec<tokio::task::JoinHandle<()>>>>>;
19+
1120
const CONFIG_FILE: &str = "rollcron.yaml";
1221

1322
#[derive(Parser)]
@@ -44,13 +53,20 @@ async fn main() -> Result<()> {
4453
println!("[rollcron] Cache: {}", sot_path.display());
4554

4655
let (initial_runner, initial_jobs) = load_config(&sot_path)?;
47-
sync_job_dirs(&sot_path, &initial_jobs)?;
56+
sync_job_dirs(&sot_path, &initial_jobs, &HashMap::new())?;
4857

4958
let (tx, mut rx) = tokio::sync::watch::channel((initial_runner, initial_jobs));
5059

60+
// Shared map of currently running job IDs -> instance count
61+
let running_jobs: RunningJobs = Arc::new(RwLock::new(HashMap::new()));
62+
63+
// Shared map of job handles (persists across scheduler restarts)
64+
let job_handles: JobHandles = Arc::new(tokio::sync::Mutex::new(HashMap::new()));
65+
5166
// Spawn auto-sync task
5267
let source_clone = source.clone();
5368
let pull_interval = args.pull_interval;
69+
let running_jobs_clone = Arc::clone(&running_jobs);
5470
tokio::spawn(async move {
5571
let mut ticker = interval(Duration::from_secs(pull_interval));
5672
loop {
@@ -67,10 +83,12 @@ async fn main() -> Result<()> {
6783

6884
match load_config(&sot) {
6985
Ok((runner, jobs)) => {
70-
if let Err(e) = sync_job_dirs(&sot, &jobs) {
86+
let running = running_jobs_clone.read().await;
87+
if let Err(e) = sync_job_dirs(&sot, &jobs, &running) {
7188
eprintln!("[rollcron] Failed to sync job dirs: {}", e);
7289
continue;
7390
}
91+
drop(running);
7492
let _ = tx.send((runner, jobs));
7593
println!("[rollcron] Synced job directories");
7694
}
@@ -83,8 +101,22 @@ async fn main() -> Result<()> {
83101
loop {
84102
let (runner, jobs) = rx.borrow_and_update().clone();
85103
let sot = sot_path.clone();
104+
let running_jobs_ref = Arc::clone(&running_jobs);
105+
let job_handles_ref = Arc::clone(&job_handles);
86106
tokio::select! {
87-
_ = scheduler::run_scheduler(jobs, sot, runner) => {}
107+
result = scheduler::run_scheduler(jobs, sot, runner, running_jobs_ref, job_handles_ref) => {
108+
match result {
109+
Ok(()) => {
110+
// Scheduler exited normally (shouldn't happen with infinite loop)
111+
eprintln!("[rollcron] Scheduler exited unexpectedly, restarting...");
112+
}
113+
Err(e) => {
114+
eprintln!("[rollcron] Scheduler error: {}", e);
115+
eprintln!("[rollcron] Restarting scheduler in 5 seconds...");
116+
tokio::time::sleep(Duration::from_secs(5)).await;
117+
}
118+
}
119+
}
88120
_ = rx.changed() => {
89121
println!("[rollcron] Config updated, restarting scheduler");
90122
}
@@ -99,8 +131,19 @@ fn load_config(sot_path: &PathBuf) -> Result<(config::RunnerConfig, Vec<config::
99131
config::parse_config(&content)
100132
}
101133

102-
fn sync_job_dirs(sot_path: &PathBuf, jobs: &[config::Job]) -> Result<()> {
134+
fn sync_job_dirs(
135+
sot_path: &PathBuf,
136+
jobs: &[config::Job],
137+
running_jobs: &HashMap<String, usize>,
138+
) -> Result<()> {
103139
for job in jobs {
140+
if running_jobs.get(&job.id).copied().unwrap_or(0) > 0 {
141+
println!(
142+
"[rollcron] Skipping sync for job '{}' (currently running)",
143+
job.id
144+
);
145+
continue;
146+
}
104147
let job_dir = git::get_job_dir(sot_path, &job.id);
105148
git::sync_to_job_dir(sot_path, &job_dir)?;
106149
}

0 commit comments

Comments
 (0)