Skip to content

Commit b926b63

Browse files
committed
Add proper signal handling
The signals being handled are: - SIGTERM - SIGINT - SIGQUIT This is a refactor of how processes are managed. Instead of having a thread for each process which automatically starts over and over again, the main thread spawns processes and waits for a signal to stop the processes and end the main function. To allow for this design, there's now a QueueWorker struct, which holds the queue consumer name and the process that was started for that consumer. The struct also contains the required functions for the worker process lifetime.
1 parent 5ab5deb commit b926b63

File tree

5 files changed

+109
-30
lines changed

5 files changed

+109
-30
lines changed

Cargo.lock

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,5 @@ edition = "2021"
88
[dependencies]
99
clap = { version = "4.2.4", features = ["derive"] }
1010
log = "0.4.17"
11+
signal-hook = "0.3.15"
1112
simple_logger = { version = "4.1.0", features = ["stderr"] }

README.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ cargo run -- --working-directory /path/to/magento2
2323
```console
2424
$ magento2-worker-daemon
2525
2023-04-28T13:36:12.788Z INFO [magento2_worker_daemon] Found 19 consumers
26-
2023-04-28T13:36:12.793Z INFO [magento2_worker_daemon] Started 19 t
26+
2023-04-28T13:36:12.793Z INFO [magento2_worker_daemon] Started 19 consumers
2727
```
2828

2929
### Command line options
@@ -35,7 +35,6 @@ Usage: magento2-worker-daemon [OPTIONS]
3535
Options:
3636
-v, --verbose
3737
-w, --working-directory <WORKING_DIRECTORY>
38-
-s, --sleep-time <SLEEP_TIME> [default: 5]
3938
-h, --help Print help
4039
-V, --version Print version
4140
```

src/main.rs

Lines changed: 77 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,16 @@
1-
use std::{env, path::Path, process::Command, sync::Arc, thread, time::Duration};
1+
mod util;
2+
3+
use std::{
4+
env,
5+
path::Path,
6+
process::Command,
7+
sync::{atomic::AtomicBool, Arc},
8+
thread,
9+
time::Duration,
10+
};
211

312
use clap::Parser;
13+
use signal_hook::consts::TERM_SIGNALS;
414

515
//
616
// Magento 2 Worker Daemon
@@ -17,7 +27,6 @@ use clap::Parser;
1727
//
1828
// TODO:
1929
// - Add unit tests
20-
// - Add proper signal handling with signal_hook
2130
//
2231

2332
#[derive(Parser, Debug)]
@@ -27,18 +36,47 @@ struct Args {
2736
verbose: bool,
2837
#[arg(short, long)]
2938
working_directory: Option<std::path::PathBuf>,
30-
#[arg(short, long, default_value_t = 5)]
31-
sleep_time: u32,
3239
}
3340

3441
#[derive(Clone, Debug)]
3542
struct DaemonConfig {
36-
// The amount of time to sleep between each iteration of the consumer loop.
37-
sleep_time: u32,
3843
// The working directory of the Magento 2 installation.
3944
magento_dir: String,
4045
}
4146

47+
#[derive(Debug)]
48+
struct WorkerProcess {
49+
// The consumer name
50+
consumer: String,
51+
// The process handle
52+
process: std::process::Child,
53+
}
54+
55+
impl WorkerProcess {
56+
fn terminate(&mut self) {
57+
log::debug!("Terminating consumer: {}", self.consumer);
58+
util::terminate_process_child(&self.process).unwrap();
59+
if self.process.try_wait().unwrap().is_none() {
60+
self.process.kill().unwrap();
61+
}
62+
}
63+
64+
fn is_running(&mut self) -> bool {
65+
match self.process.try_wait() {
66+
Ok(Some(_)) => false,
67+
Ok(None) => true,
68+
Err(_) => false,
69+
}
70+
}
71+
72+
fn restart(&mut self, config: &DaemonConfig) {
73+
if self.is_running() {
74+
self.terminate();
75+
}
76+
self.process = run_consumer(config, &self.consumer).process;
77+
}
78+
}
79+
4280
enum EnvironmentError {
4381
MagentoDirNotFound,
4482
MagentoBinNotFound,
@@ -47,7 +85,6 @@ enum EnvironmentError {
4785

4886
fn read_config(args: &Args) -> DaemonConfig {
4987
DaemonConfig {
50-
sleep_time: args.sleep_time,
5188
magento_dir: match args.working_directory {
5289
Some(ref path) => path.to_str().unwrap().to_string(),
5390
None => env::current_dir().unwrap().to_str().unwrap().to_string(),
@@ -82,6 +119,7 @@ fn validate_config(config: &DaemonConfig) -> Result<(), EnvironmentError> {
82119
return Err(EnvironmentError::MagentoBinNotFound);
83120
}
84121

122+
// Check if cron worker spawner is disabled
85123
if !magento_cron_worker_is_disabled(&config) {
86124
return Err(EnvironmentError::MagentoCronWorkerEnabled);
87125
}
@@ -106,21 +144,18 @@ fn read_consumer_list(config: &DaemonConfig) -> Vec<String> {
106144
.collect()
107145
}
108146

109-
fn run_consumer(config: &DaemonConfig, consumer: &String) {
147+
fn run_consumer(config: &DaemonConfig, consumer: &String) -> WorkerProcess {
110148
log::debug!("Running consumer: {}", consumer);
111-
Command::new("bin/magento")
149+
let process = Command::new("bin/magento")
112150
.current_dir(&config.magento_dir)
113151
.arg("queue:consumers:start")
114152
.arg(consumer)
115-
.output()
153+
.spawn()
116154
.expect("failed to run bin/magento queue:consumers:start");
117-
}
118-
119-
fn run_consumer_thread(config: Arc<DaemonConfig>, consumer: String) -> thread::JoinHandle<()> {
120-
thread::spawn(move || loop {
121-
run_consumer(&config, &consumer);
122-
thread::sleep(Duration::from_secs(config.sleep_time as u64));
123-
})
155+
WorkerProcess {
156+
consumer: consumer.clone(),
157+
process,
158+
}
124159
}
125160

126161
fn configure_logging(args: &Args) {
@@ -140,9 +175,9 @@ fn main() {
140175
Ok(_) => {}
141176
Err(err) => {
142177
match err {
143-
EnvironmentError::MagentoDirNotFound => log::error!("error: Magento directory not found"),
144-
EnvironmentError::MagentoBinNotFound => log::error!("error: Magento bin not found"),
145-
EnvironmentError::MagentoCronWorkerEnabled => log::error!("error: Magento cron worker is enabled. Please see https://devdocs.magento.com/guides/v2.3/config-guide/mq/manage-message-queues.html#configuration to see how to disable the cron_run variable."),
178+
EnvironmentError::MagentoDirNotFound => log::error!("Magento directory not found"),
179+
EnvironmentError::MagentoBinNotFound => log::error!("Magento bin not found"),
180+
EnvironmentError::MagentoCronWorkerEnabled => log::error!("Magento cron worker is enabled. Please see https://devdocs.magento.com/guides/v2.3/config-guide/mq/manage-message-queues.html#configuration to see how to disable the cron_run variable."),
146181
}
147182
std::process::exit(1);
148183
}
@@ -152,15 +187,29 @@ fn main() {
152187
let consumers = read_consumer_list(&config);
153188
log::info!("Found {} consumers", consumers.len());
154189

155-
let mut threads: Vec<thread::JoinHandle<()>> = Vec::new();
156-
for consumer in consumers {
157-
let config = Arc::new(config.clone());
158-
let thread = run_consumer_thread(config, consumer);
159-
threads.push(thread);
190+
let mut processes: Vec<WorkerProcess> = consumers
191+
.iter()
192+
.map(|consumer| run_consumer(&config, &consumer))
193+
.collect();
194+
log::info!("Started {} consumers", processes.len());
195+
196+
let term = Arc::new(AtomicBool::new(false));
197+
for sig in TERM_SIGNALS {
198+
signal_hook::flag::register(*sig, Arc::clone(&term)).unwrap();
199+
}
200+
201+
while !term.load(std::sync::atomic::Ordering::Relaxed) {
202+
// If any of the processes have exited, restart them
203+
for process in &mut processes {
204+
if !process.is_running() {
205+
process.restart(&config);
206+
}
207+
}
208+
thread::sleep(Duration::from_secs(2));
160209
}
161-
log::info!("Started {} threads", threads.len());
162210

163-
for thread in threads {
164-
thread.join().unwrap();
211+
log::info!("Stopping {} consumers", processes.len());
212+
for mut process in processes {
213+
process.terminate();
165214
}
166215
}

src/util.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
use std::process::{Command, ExitStatus};
2+
3+
pub fn terminate_process_child(process: &std::process::Child) -> std::io::Result<ExitStatus> {
4+
Command::new("kill")
5+
.arg("-SIGTERM")
6+
.arg(process.id().to_string())
7+
.spawn()
8+
.expect("failed to kill process")
9+
.wait()
10+
}

0 commit comments

Comments
 (0)