Skip to content

Commit 5098d61

Browse files
committed
worker: Add graceful process termination and process cleanup
In the previous implementation, the processes were first signaled with SIGTERM and almost immediately signaled with SIGKILL, not allowing the process to stop by itself and causing potential locking problems in the application (although not experienced as of yet). By first signaling SIGTERM and waiting for a certain graceful period, the process has some time to stop itself before it's forcefully killed. This also fixes the situation where the program may have subprocesses lingering in the Z (zombie) state, which means that a process has exited but hasn't been waited on (with waitpid) yet.
1 parent 77573eb commit 5098d61

File tree

1 file changed

+36
-13
lines changed

1 file changed

+36
-13
lines changed

src/worker.rs

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
1-
use std::process::Command;
1+
use std::{process::Command, time::Duration};
22

33
use crate::{
44
config::{DaemonConfig, DaemonContext},
5-
util,
5+
util::terminate_process_child,
66
};
77

88
const RABBITMQ_CONSUMER_NAMES: [&str; 1] = ["async.operations.all"];
9+
const PROCESS_GRACEFUL_KILL_PERIOD: Duration = Duration::from_millis(500);
10+
const PROCESS_GRACEFUL_POLL_RESOLUTION: Duration = Duration::from_millis(20);
911

1012
#[derive(Debug)]
1113
pub struct WorkerProcess {
@@ -19,16 +21,13 @@ impl WorkerProcess {
1921
pub fn terminate(&mut self) {
2022
log::debug!("Terminating consumer: {}", self.consumer);
2123
for p in self.processes.iter_mut() {
22-
util::terminate_process_child(&p).unwrap();
23-
if p.is_running() {
24-
log::debug!("Force killing consumer: {}", self.consumer);
25-
p.kill().unwrap();
26-
}
24+
p.try_stop_gracefully(PROCESS_GRACEFUL_KILL_PERIOD);
2725
}
2826
}
2927

3028
pub fn ensure_running(&mut self, context: &DaemonContext) {
31-
if !self.processes.is_running() {
29+
let is_running = self.processes.iter_mut().all(|p| p.is_running());
30+
if !is_running {
3231
self.restart(context);
3332
}
3433
}
@@ -41,13 +40,17 @@ impl WorkerProcess {
4140

4241
trait WorkerChildProcess {
4342
fn is_running(&mut self) -> bool;
43+
fn try_stop_gracefully(&mut self, grace_period: Duration);
4444
}
4545

4646
impl WorkerChildProcess for std::process::Child {
4747
fn is_running(&mut self) -> bool {
4848
match self.try_wait() {
4949
Ok(Some(status)) => {
50-
log::debug!("Process exited with status {:?}", status);
50+
match status.code() {
51+
Some(code) => log::debug!("Process {} exited with status {}", self.id(), code),
52+
None => log::debug!("Process {} was terminated", self.id()),
53+
}
5154
return false;
5255
}
5356
Ok(None) => true,
@@ -57,11 +60,31 @@ impl WorkerChildProcess for std::process::Child {
5760
}
5861
}
5962
}
60-
}
6163

62-
impl WorkerChildProcess for Vec<std::process::Child> {
63-
fn is_running(&mut self) -> bool {
64-
self.iter_mut().all(|p| p.is_running())
64+
fn try_stop_gracefully(&mut self, grace_period: Duration) {
65+
if !self.is_running() {
66+
return;
67+
}
68+
69+
let terminate_result = terminate_process_child(self);
70+
if terminate_result.is_err() {
71+
log::error!("Failed to SIGTERM process");
72+
}
73+
74+
let mut waiting_time = 0;
75+
while self.is_running() {
76+
if waiting_time >= grace_period.as_millis() {
77+
self.kill().unwrap();
78+
log::debug!("Force killing process");
79+
break;
80+
}
81+
std::thread::sleep(PROCESS_GRACEFUL_POLL_RESOLUTION);
82+
waiting_time += PROCESS_GRACEFUL_POLL_RESOLUTION.as_millis();
83+
}
84+
85+
// After it's killed, we need to call wait for the process to be removed from the process
86+
// table. For more information, see NOTES in man waitpid(2).
87+
self.wait().expect("Failed to wait for process to exit");
6588
}
6689
}
6790

0 commit comments

Comments
 (0)