Skip to content

Commit 2dc2a16

Browse files
committed
Respect SIGTERM & SIGINT so that application can exit cleanly (closes #127)
1 parent f48d0a4 commit 2dc2a16

File tree

7 files changed

+264
-0
lines changed

7 files changed

+264
-0
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,5 @@ libc = "0.2.17"
2222
ifaces = "0.0.3"
2323
dns-lookup = "0.2.1"
2424
native-tls = "=0.2.13"
25+
signal-hook = "0.3"
26+
lazy_static = "1.4"

samples/sigterm-resistant.factfile

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
{
2+
"schema": "iglu:com.snowplowanalytics.factotum/factfile/jsonschema/1-0-0",
3+
"data": {
4+
"name": "SIGTERM Resistant Task Example",
5+
"tasks": [
6+
{
7+
"name": "IgnoresSIGTERM",
8+
"executor": "shell",
9+
"command": "sh",
10+
"arguments": [
11+
"-c",
12+
"trap '' TERM; echo 'Task started, will ignore SIGTERM...'; sleep 30; echo 'Task completed normally'"
13+
],
14+
"dependsOn": [],
15+
"onResult": {
16+
"terminateJobWithSuccess": [],
17+
"continueJob": [ 0 ]
18+
}
19+
},
20+
{
21+
"name": "ShouldNotRun",
22+
"executor": "shell",
23+
"command": "echo",
24+
"arguments": [ "This task should never run if shutdown happens" ],
25+
"dependsOn": [ "IgnoresSIGTERM" ],
26+
"onResult": {
27+
"terminateJobWithSuccess": [],
28+
"continueJob": [ 0 ]
29+
}
30+
}
31+
]
32+
}
33+
}

src/factotum/executor/execution_strategy/mod.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@ pub fn execute_os(name: &str, command: &mut Command) -> RunResult {
7474

7575
match command.spawn() {
7676
Ok(mut child) => {
77+
// Register child process for tracking during shutdown
78+
let child_pid = child.id();
79+
crate::factotum::shutdown::register_child_process(child_pid);
80+
7781
let stdout = child.stdout.take().expect("Failed to capture stdout");
7882
let stderr = child.stderr.take().expect("Failed to capture stderr");
7983

@@ -104,8 +108,47 @@ pub fn execute_os(name: &str, command: &mut Command) -> RunResult {
104108
lines.join("\n")
105109
});
106110

111+
// Check if shutdown was requested before waiting
112+
if crate::factotum::shutdown::is_shutting_down() {
113+
// Kill the child process
114+
let _ = child.kill();
115+
116+
// Wait for it to exit (reap zombie)
117+
let _ = child.wait();
118+
119+
// Unregister since we're handling shutdown
120+
crate::factotum::shutdown::unregister_child_process(child_pid);
121+
122+
// Join the streaming threads to capture any final output
123+
let task_stdout = match stdout_handle.join() {
124+
Ok(output) => output,
125+
Err(_) => {
126+
warn!("stdout reader thread panicked for task '{}'", name);
127+
String::new()
128+
}
129+
};
130+
let task_stderr = match stderr_handle.join() {
131+
Ok(output) => output,
132+
Err(_) => {
133+
warn!("stderr reader thread panicked for task '{}'", name);
134+
String::new()
135+
}
136+
};
137+
138+
return RunResult {
139+
duration: run_start.elapsed(),
140+
task_execution_error: Some("Task cancelled due to shutdown".to_string()),
141+
stdout: if task_stdout.is_empty() { None } else { Some(task_stdout) },
142+
stderr: if task_stderr.is_empty() { None } else { Some(task_stderr) },
143+
return_code: -1,
144+
};
145+
}
146+
107147
match child.wait() {
108148
Ok(status) => {
149+
// Unregister child process after it completes
150+
crate::factotum::shutdown::unregister_child_process(child_pid);
151+
109152
let run_duration = run_start.elapsed();
110153
let return_code = status.code().unwrap_or(1);
111154

src/factotum/executor/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,12 @@ pub fn execute_factfile<'a, F>(factfile: &'a Factfile,
181181
}
182182

183183
for task_grp_idx in 0..tasklist.tasks.len() {
184+
// Check if shutdown has been requested before starting new task group
185+
if crate::factotum::shutdown::is_shutting_down() {
186+
eprintln!("Shutdown requested, skipping remaining task groups");
187+
break;
188+
}
189+
184190
// everything in a task "group" gets run together
185191
let (tx, rx) = mpsc::channel::<(usize, RunResult)>();
186192

src/factotum/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ pub mod parser;
1717
pub mod executor;
1818
pub mod sequencer;
1919
pub mod webhook;
20+
pub mod shutdown;
2021

2122
#[cfg(test)]
2223
mod tests;

src/factotum/shutdown.rs

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
use std::sync::Mutex;
2+
use std::sync::atomic::{AtomicBool, Ordering};
3+
use std::thread;
4+
use std::time::Duration;
5+
6+
lazy_static! {
7+
/// Global flag indicating shutdown has been requested
8+
static ref SHUTDOWN_FLAG: AtomicBool = AtomicBool::new(false);
9+
10+
/// List of child process IDs that need to be terminated during shutdown
11+
static ref CHILD_PROCESSES: Mutex<Vec<u32>> = Mutex::new(Vec::new());
12+
}
13+
14+
/// Check if shutdown has been requested
15+
pub fn is_shutting_down() -> bool {
16+
SHUTDOWN_FLAG.load(Ordering::SeqCst)
17+
}
18+
19+
/// Signal that shutdown should begin
20+
pub fn request_shutdown() {
21+
SHUTDOWN_FLAG.store(true, Ordering::SeqCst);
22+
}
23+
24+
/// Register a child process for tracking during shutdown
25+
pub fn register_child_process(pid: u32) {
26+
let mut processes = CHILD_PROCESSES.lock()
27+
.unwrap_or_else(|poisoned| {
28+
warn!("CHILD_PROCESSES mutex was poisoned during registration, recovering data");
29+
poisoned.into_inner()
30+
});
31+
processes.push(pid);
32+
}
33+
34+
/// Remove a child process from tracking (called when process exits normally)
35+
pub fn unregister_child_process(pid: u32) {
36+
let mut processes = CHILD_PROCESSES.lock()
37+
.unwrap_or_else(|poisoned| {
38+
warn!("CHILD_PROCESSES mutex was poisoned during unregistration, recovering data");
39+
poisoned.into_inner()
40+
});
41+
processes.retain(|&p| p != pid);
42+
}
43+
44+
/// Gracefully terminate all child processes using a hybrid approach
45+
/// Phase 1: Broadcasts SIGTERM to process group (includes factotum but it has a handler)
46+
/// Phase 2: Sends SIGKILL only to individual child PIDs (excludes factotum)
47+
pub fn terminate_all_children() {
48+
let pids = {
49+
let processes = CHILD_PROCESSES.lock()
50+
.unwrap_or_else(|poisoned| {
51+
warn!("CHILD_PROCESSES mutex was poisoned during shutdown, recovering data");
52+
poisoned.into_inner()
53+
});
54+
processes.clone()
55+
};
56+
57+
if pids.is_empty() {
58+
return;
59+
}
60+
61+
println!("Terminating {} child process(es) gracefully...", pids.len());
62+
63+
// Phase 1: Broadcast SIGTERM to entire process group
64+
// This includes factotum, but factotum has a signal handler so it won't die
65+
// This is race-free - all processes get SIGTERM atomically
66+
unsafe {
67+
#[cfg(unix)]
68+
{
69+
// SAFETY: Sending SIGTERM to process group (pid 0).
70+
// - Process group targeting (pid 0) is atomic and race-free
71+
// - Factotum has signal handlers installed (see main.rs), so it will set
72+
// the shutdown flag but not terminate
73+
// - All children spawned by factotum inherit the same process group and
74+
// will receive SIGTERM for graceful shutdown
75+
// - PID recycling is not a concern here because we're targeting the entire
76+
// process group by ID, not individual PIDs
77+
// - Platform: Unix-only. Non-Unix platforms don't reach this code path.
78+
libc::kill(0, libc::SIGTERM);
79+
}
80+
}
81+
82+
// Wait 5 seconds for graceful shutdown
83+
thread::sleep(Duration::from_secs(5));
84+
85+
// Phase 2: Check which child processes are still alive and SIGKILL them individually
86+
// This excludes factotum - only tracked children are killed
87+
let surviving_pids: Vec<u32> = pids.iter()
88+
.filter(|&&pid| is_process_alive(pid))
89+
.cloned()
90+
.collect();
91+
92+
if !surviving_pids.is_empty() {
93+
println!("Force-killing {} remaining process(es)...", surviving_pids.len());
94+
for &pid in &surviving_pids {
95+
unsafe {
96+
#[cfg(unix)]
97+
{
98+
// SAFETY: Sending SIGKILL to individual child PIDs.
99+
// - PIDs are obtained from CHILD_PROCESSES, which tracks all spawned children
100+
// - PIDs are valid because they were registered immediately after spawn
101+
// - PIDs are filtered by is_process_alive() so we only kill surviving processes
102+
// - PID recycling edge case: If a PID is recycled between the is_process_alive()
103+
// check and this kill() call, we might signal an unrelated process. This is
104+
// extremely rare because:
105+
// 1. We only target PIDs from children spawned seconds ago
106+
// 2. The check-to-kill window is microseconds
107+
// 3. Unix systems typically have PID wraparound delays
108+
// 4. Recycled PIDs are usually given to unrelated processes, not critical ones
109+
// - This approach ensures factotum survives to send webhooks and write logs
110+
// - Platform: Unix-only. Non-Unix platforms don't reach this code path.
111+
libc::kill(pid as i32, libc::SIGKILL);
112+
}
113+
}
114+
}
115+
}
116+
}
117+
118+
/// Check if a process is still running
119+
fn is_process_alive(pid: u32) -> bool {
120+
unsafe {
121+
#[cfg(unix)]
122+
{
123+
// SAFETY: Sending signal 0 checks if process exists without sending a real signal.
124+
// - Signal 0 is a null signal that performs permission and existence checks only
125+
// - Returns 0 (true) if process exists and we have permission to signal it
126+
// - Returns -1 (false) if process doesn't exist or permission is denied
127+
// - No side effects on the target process - completely safe for checking
128+
// - PID recycling: Could return true for a recycled PID (false positive), but this
129+
// is acceptable because we'll send SIGKILL to confirm termination. A false positive
130+
// just means we attempt to kill a process that may have changed identity, which is
131+
// handled by the SIGKILL safety invariants.
132+
// - Platform: Unix-only, where signal 0 is well-defined behavior
133+
libc::kill(pid as i32, 0) == 0
134+
}
135+
#[cfg(not(unix))]
136+
{
137+
// SAFETY: On non-Unix platforms (Windows, etc.), we conservatively return false.
138+
// This means we'll always attempt SIGKILL in terminate_all_children(), which is
139+
// safe but suboptimal (no process liveness check).
140+
// TODO: Implement Windows-specific process checking using OpenProcess/GetExitCodeProcess
141+
// if Windows support is needed in the future.
142+
false
143+
}
144+
}
145+
}

src/main.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ extern crate hyper_native_tls;
2929
extern crate libc;
3030
extern crate ifaces;
3131
extern crate dns_lookup;
32+
extern crate signal_hook;
33+
#[macro_use]
34+
extern crate lazy_static;
3235

3336
use docopt::Docopt;
3437
use std::fs;
@@ -630,6 +633,31 @@ fn init_logger() -> Result<(), String> {
630633
log4rs::init_config(log_config).map_err(|e| format!("couldn't initialize log configuration. Reason: {}", e.description()))
631634
}
632635

636+
fn install_signal_handlers() -> Result<(), String> {
637+
use signal_hook::consts::signal::{SIGTERM, SIGINT};
638+
use signal_hook::iterator::Signals;
639+
use std::thread;
640+
641+
let mut signals = Signals::new(&[SIGTERM, SIGINT])
642+
.map_err(|e| format!("Failed to register signal handlers: {}", e))?;
643+
644+
thread::spawn(move || {
645+
for sig in signals.forever() {
646+
match sig {
647+
SIGTERM | SIGINT => {
648+
println!("\nReceived shutdown signal, terminating gracefully...");
649+
factotum::shutdown::request_shutdown();
650+
factotum::shutdown::terminate_all_children();
651+
break;
652+
}
653+
_ => unreachable!(),
654+
}
655+
}
656+
});
657+
658+
Ok(())
659+
}
660+
633661
fn main() {
634662
std::process::exit(factotum())
635663
}
@@ -640,6 +668,12 @@ fn factotum() -> i32 {
640668
return PROC_OTHER_ERROR;
641669
}
642670

671+
// Install signal handlers for graceful shutdown
672+
if let Err(e) = install_signal_handlers() {
673+
println!("Failed to install signal handlers: {}", e);
674+
return PROC_OTHER_ERROR;
675+
}
676+
643677
let args: Args = match Docopt::new(USAGE).and_then(|d| d.decode()) {
644678
Ok(a) => a,
645679
Err(e) => {

0 commit comments

Comments
 (0)