Skip to content

Commit 77573eb

Browse files
committed
Spawn multiple processes for multi-process consumers
After doing some digging, I found that the --single-thread AND --multi-process options are solely used for making sure that no other process is running. So if you run a single consumer, you add the --single-thread option. With this option, Magento will check if another process has a lock for that consumer. The same thing goes for the --multi-process n option, where Magento will check the lock in combination with specified n.
1 parent 43c28bc commit 77573eb

File tree

2 files changed

+67
-34
lines changed

2 files changed

+67
-34
lines changed

src/main.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,7 @@ fn main() {
7575
while !term.load(std::sync::atomic::Ordering::Relaxed) {
7676
// If any of the processes have exited, restart them
7777
for process in &mut processes {
78-
if !process.is_running() {
79-
process.restart(&context);
80-
}
78+
process.ensure_running(&context);
8179
}
8280
thread::sleep(Duration::from_secs(2));
8381
}

src/worker.rs

Lines changed: 66 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -11,42 +11,66 @@ const RABBITMQ_CONSUMER_NAMES: [&str; 1] = ["async.operations.all"];
1111
pub struct WorkerProcess {
1212
// The consumer name
1313
consumer: String,
14-
// The process handle
15-
process: std::process::Child,
14+
// The process handles
15+
processes: Vec<std::process::Child>,
1616
}
1717

1818
impl WorkerProcess {
1919
pub fn terminate(&mut self) {
2020
log::debug!("Terminating consumer: {}", self.consumer);
21-
util::terminate_process_child(&self.process).unwrap();
22-
if self.process.try_wait().unwrap().is_none() {
23-
self.process.kill().unwrap();
21+
for p in self.processes.iter_mut() {
22+
util::terminate_process_child(&p).unwrap();
23+
if p.is_running() {
24+
log::debug!("Force killing consumer: {}", self.consumer);
25+
p.kill().unwrap();
26+
}
2427
}
2528
}
2629

27-
pub fn is_running(&mut self) -> bool {
28-
match self.process.try_wait() {
29-
Ok(Some(_)) => false,
30-
Ok(None) => true,
31-
Err(_) => false,
30+
pub fn ensure_running(&mut self, context: &DaemonContext) {
31+
if !self.processes.is_running() {
32+
self.restart(context);
3233
}
3334
}
3435

3536
pub fn restart(&mut self, context: &DaemonContext) {
36-
if self.is_running() {
37-
self.terminate();
37+
self.terminate();
38+
self.processes = run_worker(&context, &self.consumer).processes;
39+
}
40+
}
41+
42+
trait WorkerChildProcess {
43+
fn is_running(&mut self) -> bool;
44+
}
45+
46+
impl WorkerChildProcess for std::process::Child {
47+
fn is_running(&mut self) -> bool {
48+
match self.try_wait() {
49+
Ok(Some(status)) => {
50+
log::debug!("Process exited with status {:?}", status);
51+
return false;
52+
}
53+
Ok(None) => true,
54+
Err(err) => {
55+
log::debug!("Process has error {:?}", err);
56+
return false;
57+
}
3858
}
39-
self.process = run_worker(&context, &self.consumer).process;
59+
}
60+
}
61+
62+
impl WorkerChildProcess for Vec<std::process::Child> {
63+
fn is_running(&mut self) -> bool {
64+
self.iter_mut().all(|p| p.is_running())
4065
}
4166
}
4267

4368
pub fn read_consumer_list(config: &DaemonConfig) -> Vec<String> {
44-
// Read consumer list by running bin/magento queue:consumers:list
4569
let output = Command::new("bin/magento")
4670
.current_dir(&config.magento_dir)
4771
.arg("queue:consumers:list")
4872
.output()
49-
.expect("failed to run bin/magento queue:consumers:list");
73+
.expect("Failed to run bin/magento queue:consumers:list");
5074

5175
// Split output by newline and convert from u8 sequences to String
5276
output
@@ -67,30 +91,41 @@ pub fn read_consumer_list(config: &DaemonConfig) -> Vec<String> {
6791

6892
pub fn run_worker(context: &DaemonContext, consumer: &String) -> WorkerProcess {
6993
log::debug!("Running consumer: {}", consumer);
70-
let mut command = Command::new("bin/magento");
71-
let command = command
72-
.current_dir(&context.daemon_config.magento_dir)
73-
.arg("queue:consumers:start")
74-
.arg(consumer)
75-
.arg("--max-messages")
76-
.arg(context.consumer_config.max_messages.to_string());
7794

95+
let mut number_of_processes = 1;
7896
if let Some(processes) = context.consumer_config.multiple_processes.get(consumer) {
79-
if *processes > 1 {
80-
command.arg("--multi-process").arg(processes.to_string());
97+
number_of_processes = *processes;
98+
}
99+
100+
let mut processes = Vec::<std::process::Child>::new();
101+
102+
for i in 0..number_of_processes {
103+
let mut command = Command::new("bin/magento");
104+
let command = command
105+
.current_dir(&context.daemon_config.magento_dir)
106+
.arg("queue:consumers:start")
107+
.arg(consumer)
108+
.arg("--max-messages")
109+
.arg(context.consumer_config.max_messages.to_string());
110+
111+
// We could disable the --multi-process or --single-thread options with a --no-strict-mode flag,
112+
// but not sure if users need that, so this is the default for now.
113+
if number_of_processes > 1 {
114+
command.arg("--multi-process");
115+
command.arg(i.to_string());
81116
} else {
82117
command.arg("--single-thread");
83118
}
84-
} else {
85-
command.arg("--single-thread");
86-
}
87119

88-
let process = command
89-
.spawn()
90-
.expect("failed to run bin/magento queue:consumers:start");
120+
let process = command
121+
.spawn()
122+
.expect("Failed to run bin/magento queue:consumers:start");
123+
124+
processes.push(process);
125+
}
91126

92127
WorkerProcess {
93128
consumer: consumer.clone(),
94-
process,
129+
processes,
95130
}
96131
}

0 commit comments

Comments
 (0)