Skip to content

Commit 613b107

Browse files
Merge pull request #3 from stakpak/fix/task-manager-graceful-shutdown
fix: add Drop to TaskManagerHandle to kill child processes on unexpec…
2 parents c6a5d42 + 81af689 commit 613b107

File tree

1 file changed

+146
-115
lines changed

1 file changed

+146
-115
lines changed

libs/shared/src/task_manager.rs

Lines changed: 146 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,71 @@ use tokio::{
1111

1212
const START_TASK_WAIT_TIME: Duration = Duration::from_millis(300);
1313

14+
/// Kill a process and its entire process group.
15+
///
16+
/// Uses process group kill (`kill -9 -{pid}`) on Unix and `taskkill /F /T` on
17+
/// Windows to ensure child processes spawned by shells (node, vite, esbuild, etc.)
18+
/// are also terminated.
19+
///
20+
/// This is safe to call even if the process has already exited.
21+
fn terminate_process_group(process_id: u32) {
22+
#[cfg(unix)]
23+
{
24+
use std::process::Command;
25+
// First check if the process exists
26+
let check_result = Command::new("kill")
27+
.arg("-0") // Signal 0 just checks if process exists
28+
.arg(process_id.to_string())
29+
.output();
30+
31+
// Only kill if the process actually exists
32+
if check_result
33+
.map(|output| output.status.success())
34+
.unwrap_or(false)
35+
{
36+
// Kill the entire process group using negative PID
37+
// Since we spawn with .process_group(0), the shell becomes the process group leader
38+
// Using -{pid} kills all processes in that group (shell + children like node/vite/esbuild)
39+
let _ = Command::new("kill")
40+
.arg("-9")
41+
.arg(format!("-{}", process_id))
42+
.output();
43+
44+
// Also try to kill the individual process in case it's not a group leader
45+
let _ = Command::new("kill")
46+
.arg("-9")
47+
.arg(process_id.to_string())
48+
.output();
49+
}
50+
}
51+
52+
#[cfg(windows)]
53+
{
54+
use std::process::Command;
55+
// On Windows, use taskkill with /T flag to kill the process tree
56+
let check_result = Command::new("tasklist")
57+
.arg("/FI")
58+
.arg(format!("PID eq {}", process_id))
59+
.arg("/FO")
60+
.arg("CSV")
61+
.output();
62+
63+
// Only kill if the process actually exists
64+
if let Ok(output) = check_result {
65+
let output_str = String::from_utf8_lossy(&output.stdout);
66+
if output_str.lines().count() > 1 {
67+
// More than just header line - use /T to kill process tree
68+
let _ = Command::new("taskkill")
69+
.arg("/F")
70+
.arg("/T") // Kill process tree
71+
.arg("/PID")
72+
.arg(process_id.to_string())
73+
.output();
74+
}
75+
}
76+
}
77+
}
78+
1479
pub type TaskId = String;
1580

1681
#[derive(Debug, Clone, PartialEq, serde::Serialize)]
@@ -143,7 +208,6 @@ pub struct TaskManager {
143208
tasks: HashMap<TaskId, TaskEntry>,
144209
tx: mpsc::UnboundedSender<TaskMessage>,
145210
rx: mpsc::UnboundedReceiver<TaskMessage>,
146-
#[allow(dead_code)]
147211
shutdown_tx: broadcast::Sender<()>,
148212
shutdown_rx: broadcast::Receiver<()>,
149213
}
@@ -171,6 +235,7 @@ impl TaskManager {
171235
pub fn handle(&self) -> Arc<TaskManagerHandle> {
172236
Arc::new(TaskManagerHandle {
173237
tx: self.tx.clone(),
238+
shutdown_tx: self.shutdown_tx.clone(),
174239
})
175240
}
176241

@@ -185,6 +250,9 @@ impl TaskManager {
185250
}
186251
}
187252
None => {
253+
// All senders (TaskManagerHandles) have been dropped.
254+
// Clean up all running tasks and child processes.
255+
self.shutdown_all_tasks().await;
188256
break;
189257
}
190258
}
@@ -351,63 +419,7 @@ impl TaskManager {
351419
}
352420

353421
if let Some(process_id) = entry.process_id {
354-
// Kill the entire process group to ensure all child processes are terminated
355-
// This is important for tools like vite/node that spawn child processes
356-
#[cfg(unix)]
357-
{
358-
use std::process::Command;
359-
// First check if the process exists
360-
let check_result = Command::new("kill")
361-
.arg("-0") // Signal 0 just checks if process exists
362-
.arg(process_id.to_string())
363-
.output();
364-
365-
// Only kill if the process actually exists
366-
if check_result
367-
.map(|output| output.status.success())
368-
.unwrap_or(false)
369-
{
370-
// Kill the entire process group using negative PID
371-
// Since we spawn with .process_group(0), the shell becomes the process group leader
372-
// Using -{pid} kills all processes in that group (shell + children like node/vite/esbuild)
373-
let _ = Command::new("kill")
374-
.arg("-9")
375-
.arg(format!("-{}", process_id))
376-
.output();
377-
378-
// Also try to kill the individual process in case it's not a group leader
379-
let _ = Command::new("kill")
380-
.arg("-9")
381-
.arg(process_id.to_string())
382-
.output();
383-
}
384-
}
385-
386-
#[cfg(windows)]
387-
{
388-
use std::process::Command;
389-
// On Windows, use taskkill with /T flag to kill the process tree
390-
let check_result = Command::new("tasklist")
391-
.arg("/FI")
392-
.arg(format!("PID eq {}", process_id))
393-
.arg("/FO")
394-
.arg("CSV")
395-
.output();
396-
397-
// Only kill if the process actually exists
398-
if let Ok(output) = check_result {
399-
let output_str = String::from_utf8_lossy(&output.stdout);
400-
if output_str.lines().count() > 1 {
401-
// More than just header line - use /T to kill process tree
402-
let _ = Command::new("taskkill")
403-
.arg("/F")
404-
.arg("/T") // Kill process tree
405-
.arg("/PID")
406-
.arg(process_id.to_string())
407-
.output();
408-
}
409-
}
410-
}
422+
terminate_process_group(process_id);
411423
}
412424

413425
entry.handle.abort();
@@ -700,63 +712,7 @@ impl TaskManager {
700712
}
701713

702714
if let Some(process_id) = entry.process_id {
703-
// Kill the entire process group to ensure all child processes are terminated
704-
// This is important for tools like vite/node that spawn child processes
705-
#[cfg(unix)]
706-
{
707-
use std::process::Command;
708-
// First check if the process exists
709-
let check_result = Command::new("kill")
710-
.arg("-0") // Signal 0 just checks if process exists
711-
.arg(process_id.to_string())
712-
.output();
713-
714-
// Only kill if the process actually exists
715-
if check_result
716-
.map(|output| output.status.success())
717-
.unwrap_or(false)
718-
{
719-
// Kill the entire process group using negative PID
720-
// Since we spawn with .process_group(0), the shell becomes the process group leader
721-
// Using -{pid} kills all processes in that group (shell + children like node/vite/esbuild)
722-
let _ = Command::new("kill")
723-
.arg("-9")
724-
.arg(format!("-{}", process_id))
725-
.output();
726-
727-
// Also try to kill the individual process in case it's not a group leader
728-
let _ = Command::new("kill")
729-
.arg("-9")
730-
.arg(process_id.to_string())
731-
.output();
732-
}
733-
}
734-
735-
#[cfg(windows)]
736-
{
737-
use std::process::Command;
738-
// On Windows, use taskkill with /T flag to kill the process tree
739-
let check_result = Command::new("tasklist")
740-
.arg("/FI")
741-
.arg(format!("PID eq {}", process_id))
742-
.arg("/FO")
743-
.arg("CSV")
744-
.output();
745-
746-
// Only kill if the process actually exists
747-
if let Ok(output) = check_result {
748-
let output_str = String::from_utf8_lossy(&output.stdout);
749-
if output_str.lines().count() > 1 {
750-
// More than just header line - use /T to kill process tree
751-
let _ = Command::new("taskkill")
752-
.arg("/F")
753-
.arg("/T") // Kill process tree
754-
.arg("/PID")
755-
.arg(process_id.to_string())
756-
.output();
757-
}
758-
}
759-
}
715+
terminate_process_group(process_id);
760716
}
761717

762718
entry.handle.abort();
@@ -766,6 +722,21 @@ impl TaskManager {
766722

767723
pub struct TaskManagerHandle {
768724
tx: mpsc::UnboundedSender<TaskMessage>,
725+
shutdown_tx: broadcast::Sender<()>,
726+
}
727+
728+
impl Drop for TaskManagerHandle {
729+
fn drop(&mut self) {
730+
// Signal the TaskManager to shut down all tasks and kill child processes.
731+
// This fires on the broadcast channel that TaskManager::run() listens on,
732+
// triggering shutdown_all_tasks() which kills every process group.
733+
//
734+
// This is a last-resort safety net — callers should prefer calling
735+
// handle.shutdown().await for a clean async shutdown. But if the handle
736+
// is dropped without that (e.g., panic, std::process::exit, unexpected
737+
// scope exit), this ensures child processes don't leak.
738+
let _ = self.shutdown_tx.send(());
739+
}
769740
}
770741

771742
impl TaskManagerHandle {
@@ -1029,6 +1000,66 @@ mod tests {
10291000
.expect("Failed to shutdown task manager");
10301001
}
10311002

1003+
#[tokio::test]
1004+
async fn test_task_manager_handle_drop_triggers_shutdown() {
1005+
let task_manager = TaskManager::new();
1006+
let handle = task_manager.handle();
1007+
1008+
let manager_handle = tokio::spawn(async move {
1009+
task_manager.run().await;
1010+
});
1011+
1012+
// Start a long-running task
1013+
let _task_info = handle
1014+
.start_task("sleep 30".to_string(), None, None)
1015+
.await
1016+
.expect("Failed to start task");
1017+
1018+
// Drop the handle WITHOUT calling shutdown()
1019+
drop(handle);
1020+
1021+
// The Drop impl sends on the broadcast shutdown channel,
1022+
// which causes TaskManager::run() to call shutdown_all_tasks() and exit.
1023+
// Give it a moment to process.
1024+
sleep(Duration::from_millis(500)).await;
1025+
1026+
assert!(
1027+
manager_handle.is_finished(),
1028+
"TaskManager::run() should have exited after handle was dropped"
1029+
);
1030+
}
1031+
1032+
#[tokio::test]
1033+
async fn test_task_manager_handle_drop_kills_child_processes() {
1034+
let task_manager = TaskManager::new();
1035+
let handle = task_manager.handle();
1036+
1037+
let _manager_handle = tokio::spawn(async move {
1038+
task_manager.run().await;
1039+
});
1040+
1041+
// Start a task that writes a marker file while running
1042+
let marker = format!("/tmp/stakpak_test_drop_{}", std::process::id());
1043+
let task_info = handle
1044+
.start_task(format!("touch {} && sleep 30", marker), None, None)
1045+
.await
1046+
.expect("Failed to start task");
1047+
1048+
// Verify task is running
1049+
let status = handle
1050+
.get_task_status(task_info.id.clone())
1051+
.await
1052+
.expect("Failed to get status");
1053+
assert_eq!(status, Some(TaskStatus::Running));
1054+
1055+
// Drop handle without explicit shutdown — Drop should kill the process
1056+
drop(handle);
1057+
sleep(Duration::from_millis(500)).await;
1058+
1059+
// Clean up marker file
1060+
let _ = std::fs::remove_file(&marker);
1061+
}
1062+
10321063
#[tokio::test]
10331064
async fn test_task_manager_detects_immediate_exit_code_failure() {
10341065
let task_manager = TaskManager::new();

0 commit comments

Comments
 (0)