Skip to content

Commit cb81fdb

Browse files
authored
fix(cubestore): Fix error: Internal: channel closed on the next request after cubestore cloud process got OOM (#5238)
* in work * fix(cubestore): Fix error: Internal: channel closed on the next request after cubestore cloud process got OOM
1 parent 7626ed5 commit cb81fdb

File tree

1 file changed

+61
-9
lines changed

1 file changed

+61
-9
lines changed

rust/cubestore/cubestore/src/cluster/worker_pool.rs

Lines changed: 61 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::fmt::Debug;
22
use std::marker::PhantomData;
33
use std::panic;
4-
use std::process::Child;
4+
use std::process::{Child, ExitStatus};
55
use std::sync::Arc;
66
use std::time::Duration;
77

@@ -111,6 +111,35 @@ impl<
111111
}
112112
}
113113

114+
struct ProcessHandleGuard {
115+
handle: Child,
116+
}
117+
118+
impl ProcessHandleGuard {
119+
pub fn new(handle: Child) -> Self {
120+
Self { handle }
121+
}
122+
pub fn try_wait(&mut self) -> std::io::Result<Option<ExitStatus>> {
123+
self.handle.try_wait()
124+
}
125+
pub fn is_alive(&mut self) -> bool {
126+
self.handle.try_wait().map_or(false, |r| r.is_none())
127+
}
128+
pub fn kill(&mut self) {
129+
if let Err(e) = self.handle.kill() {
130+
error!("Error during kill: {:?}", e);
131+
}
132+
}
133+
}
134+
135+
impl Drop for ProcessHandleGuard {
136+
fn drop(&mut self) {
137+
if self.is_alive() {
138+
self.kill();
139+
}
140+
}
141+
}
142+
114143
pub struct WorkerProcess<
115144
T: Debug + Serialize + DeserializeOwned + Sync + Send + 'static,
116145
R: Serialize + DeserializeOwned + Sync + Send + 'static,
@@ -151,8 +180,8 @@ impl<
151180
let process = self.spawn_process();
152181

153182
match process {
154-
Ok((mut args_tx, mut res_rx, mut handle)) => {
155-
scopeguard::defer!(<WorkerProcess<T, R, P>>::kill(&mut handle));
183+
Ok((mut args_tx, mut res_rx, handle)) => {
184+
let mut handle_guard = ProcessHandleGuard::new(handle);
156185
loop {
157186
let mut stopped_rx = self.stopped_rx.write().await;
158187
let Message {
@@ -172,6 +201,35 @@ impl<
172201
message
173202
}
174203
};
204+
//Check if child process is killed
205+
match handle_guard.try_wait() {
206+
Ok(Some(_)) => {
207+
error!(
208+
"Worker process is killed, reshedule message in another process"
209+
);
210+
self.queue.push(Message {
211+
message,
212+
sender,
213+
span,
214+
dispatcher,
215+
});
216+
break;
217+
}
218+
Ok(None) => {}
219+
Err(_) => {
220+
error!(
221+
"Can't read worker process status, reshedule message in another process"
222+
);
223+
self.queue.push(Message {
224+
message,
225+
sender,
226+
span,
227+
dispatcher,
228+
});
229+
break;
230+
}
231+
}
232+
175233
let process_message_res_timeout = tokio::time::timeout(
176234
self.timeout,
177235
self.process_message(message, args_tx, res_rx),
@@ -214,12 +272,6 @@ impl<
214272
}
215273
}
216274

217-
fn kill(handle: &mut Child) {
218-
if let Err(e) = handle.kill() {
219-
error!("Error during kill: {:?}", e);
220-
}
221-
}
222-
223275
#[instrument(level = "trace", skip(self, message, args_tx, res_rx))]
224276
async fn process_message(
225277
&self,

0 commit comments

Comments
 (0)