Skip to content

Commit 5f3159f

Browse files
committed
fix: make stdio shutdown more graceful
According to the protocol specifications Signed-off-by: jokemanfire <[email protected]>
1 parent 209dbac commit 5f3159f

File tree

1 file changed

+150
-29
lines changed

1 file changed

+150
-29
lines changed

crates/rmcp/src/transport/child_process.rs

Lines changed: 150 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
use std::process::Stdio;
22

3+
use futures::future::Future;
34
use process_wrap::tokio::{TokioChildWrapper, TokioCommandWrap};
45
use tokio::{
56
io::AsyncRead,
67
process::{ChildStderr, ChildStdin, ChildStdout},
78
};
89

9-
use super::{IntoTransport, Transport};
10-
use crate::service::ServiceRole;
10+
use super::{RxJsonRpcMessage, Transport, TxJsonRpcMessage, async_rw::AsyncRwTransport};
11+
use crate::RoleClient;
1112

13+
const MAX_WAIT_ON_DROP_SECS: u64 = 3;
1214
/// The parts of a child process.
1315
type ChildProcessParts = (
1416
Box<dyn TokioChildWrapper>,
@@ -36,18 +38,23 @@ fn child_process(mut child: Box<dyn TokioChildWrapper>) -> std::io::Result<Child
3638

3739
pub struct TokioChildProcess {
3840
child: ChildWithCleanup,
39-
child_stdin: ChildStdin,
40-
child_stdout: ChildStdout,
41+
transport: AsyncRwTransport<RoleClient, ChildStdout, ChildStdin>,
4142
}
4243

4344
pub struct ChildWithCleanup {
44-
inner: Box<dyn TokioChildWrapper>,
45+
inner: Option<Box<dyn TokioChildWrapper>>,
4546
}
4647

4748
impl Drop for ChildWithCleanup {
4849
fn drop(&mut self) {
49-
if let Err(e) = self.inner.start_kill() {
50-
tracing::warn!("Failed to kill child process: {e}");
50+
// We should not use start_kill(), instead we should use kill() to avoid zombies
51+
if let Some(mut inner) = self.inner.take() {
52+
// We don't care about the result, just try to kill it
53+
tokio::spawn(async move {
54+
if let Err(e) = Box::into_pin(inner.kill()).await {
55+
tracing::warn!("Error killing child process: {}", e);
56+
}
57+
});
5158
}
5259
}
5360
}
@@ -64,7 +71,7 @@ pin_project_lite::pin_project! {
6471
impl TokioChildProcessOut {
6572
/// Get the process ID of the child process.
6673
pub fn id(&self) -> Option<u32> {
67-
self.child.inner.id()
74+
self.child.inner.as_ref()?.id()
6875
}
6976
}
7077

@@ -92,23 +99,51 @@ impl TokioChildProcess {
9299

93100
/// Get the process ID of the child process.
94101
pub fn id(&self) -> Option<u32> {
95-
self.child.inner.id()
102+
self.child.inner.as_ref()?.id()
103+
}
104+
105+
/// Gracefully shutdown the child process
106+
///
107+
/// This will first wait for the child process to exit normally with a timeout.
108+
/// If the child process doesn't exit within the timeout, it will be killed.
109+
pub async fn graceful_shutdown(&mut self) -> std::io::Result<()> {
110+
if let Some(mut child) = self.child.inner.take() {
111+
let wait_fut = Box::into_pin(child.wait());
112+
tokio::select! {
113+
_ = tokio::time::sleep(std::time::Duration::from_secs(MAX_WAIT_ON_DROP_SECS)) => {
114+
if let Err(e) = Box::into_pin(child.kill()).await {
115+
tracing::warn!("Error killing child: {e}");
116+
return Err(e);
117+
}
118+
},
119+
res = wait_fut => {
120+
match res {
121+
Ok(status) => {
122+
tracing::info!("Child exited gracefully {}", status);
123+
}
124+
Err(e) => {
125+
tracing::warn!("Error waiting for child: {e}");
126+
return Err(e);
127+
}
128+
}
129+
}
130+
}
131+
}
132+
Ok(())
133+
}
134+
135+
/// Take ownership of the inner child process
136+
pub fn into_inner(mut self) -> Option<Box<dyn TokioChildWrapper>> {
137+
self.child.inner.take()
96138
}
97139

98140
/// Split this helper into a reader (stdout) and writer (stdin).
141+
#[deprecated(
142+
since = "0.5.0",
143+
note = "use the Transport trait implementation instead"
144+
)]
99145
pub fn split(self) -> (TokioChildProcessOut, ChildStdin) {
100-
let TokioChildProcess {
101-
child,
102-
child_stdin,
103-
child_stdout,
104-
} = self;
105-
(
106-
TokioChildProcessOut {
107-
child,
108-
child_stdout,
109-
},
110-
child_stdin,
111-
)
146+
unimplemented!("This method is deprecated, use the Transport trait implementation instead");
112147
}
113148
}
114149

@@ -156,20 +191,31 @@ impl TokioChildProcessBuilder {
156191

157192
let (child, stdout, stdin, stderr_opt) = child_process(self.cmd.spawn()?)?;
158193

194+
let transport = AsyncRwTransport::new(stdout, stdin);
159195
let proc = TokioChildProcess {
160-
child: ChildWithCleanup { inner: child },
161-
child_stdin: stdin,
162-
child_stdout: stdout,
196+
child: ChildWithCleanup { inner: Some(child) },
197+
transport,
163198
};
164199
Ok((proc, stderr_opt))
165200
}
166201
}
167202

168-
impl<R: ServiceRole> IntoTransport<R, std::io::Error, ()> for TokioChildProcess {
169-
fn into_transport(self) -> impl Transport<R, Error = std::io::Error> + 'static {
170-
IntoTransport::<R, std::io::Error, super::async_rw::TransportAdapterAsyncRW>::into_transport(
171-
self.split(),
172-
)
203+
impl Transport<RoleClient> for TokioChildProcess {
204+
type Error = std::io::Error;
205+
206+
fn send(
207+
&mut self,
208+
item: TxJsonRpcMessage<RoleClient>,
209+
) -> impl Future<Output = Result<(), Self::Error>> + Send + 'static {
210+
self.transport.send(item)
211+
}
212+
213+
fn receive(&mut self) -> impl Future<Output = Option<RxJsonRpcMessage<RoleClient>>> + Send {
214+
self.transport.receive()
215+
}
216+
217+
fn close(&mut self) -> impl Future<Output = Result<(), Self::Error>> + Send {
218+
self.graceful_shutdown()
173219
}
174220
}
175221

@@ -183,3 +229,78 @@ impl ConfigureCommandExt for tokio::process::Command {
183229
self
184230
}
185231
}
232+
233+
#[cfg(unix)]
234+
#[cfg(test)]
235+
mod tests {
236+
use tokio::process::Command;
237+
238+
use super::*;
239+
240+
#[tokio::test]
241+
async fn test_tokio_child_process_drop() {
242+
let r = TokioChildProcess::new(Command::new("sleep").configure(|cmd| {
243+
cmd.arg("30");
244+
}));
245+
assert!(r.is_ok());
246+
let child_process = r.unwrap();
247+
let id = child_process.id();
248+
assert!(id.is_some());
249+
let id = id.unwrap();
250+
// Drop the child process
251+
drop(child_process);
252+
// Wait a moment to allow the cleanup task to run
253+
tokio::time::sleep(std::time::Duration::from_secs(MAX_WAIT_ON_DROP_SECS + 1)).await;
254+
// Check if the process is still running
255+
let status = Command::new("ps")
256+
.arg("-p")
257+
.arg(id.to_string())
258+
.status()
259+
.await;
260+
match status {
261+
Ok(status) => {
262+
assert!(
263+
!status.success(),
264+
"Process with PID {} is still running",
265+
id
266+
);
267+
}
268+
Err(e) => {
269+
panic!("Failed to check process status: {}", e);
270+
}
271+
}
272+
}
273+
274+
#[tokio::test]
275+
async fn test_tokio_child_process_graceful_shutdown() {
276+
let r = TokioChildProcess::new(Command::new("sleep").configure(|cmd| {
277+
cmd.arg("30");
278+
}));
279+
assert!(r.is_ok());
280+
let mut child_process = r.unwrap();
281+
let id = child_process.id();
282+
assert!(id.is_some());
283+
let id = id.unwrap();
284+
child_process.graceful_shutdown().await.unwrap();
285+
// Wait a moment to allow the cleanup task to run
286+
tokio::time::sleep(std::time::Duration::from_secs(MAX_WAIT_ON_DROP_SECS + 1)).await;
287+
// Check if the process is still running
288+
let status = Command::new("ps")
289+
.arg("-p")
290+
.arg(id.to_string())
291+
.status()
292+
.await;
293+
match status {
294+
Ok(status) => {
295+
assert!(
296+
!status.success(),
297+
"Process with PID {} is still running",
298+
id
299+
);
300+
}
301+
Err(e) => {
302+
panic!("Failed to check process status: {}", e);
303+
}
304+
}
305+
}
306+
}

0 commit comments

Comments
 (0)