Skip to content

Commit 8159d42

Browse files
committed
fix: make stdio shoutdown more graceful
According to the protocol specifications Signed-off-by: jokemanfire <[email protected]>
1 parent 209dbac commit 8159d42

File tree

1 file changed

+75
-7
lines changed

1 file changed

+75
-7
lines changed

crates/rmcp/src/transport/child_process.rs

Lines changed: 75 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use std::process::Stdio;
1+
use std::{
2+
process::Stdio,
3+
sync::{Arc, Mutex},
4+
};
25

36
use process_wrap::tokio::{TokioChildWrapper, TokioCommandWrap};
47
use tokio::{
@@ -9,6 +12,7 @@ use tokio::{
912
use super::{IntoTransport, Transport};
1013
use crate::service::ServiceRole;
1114

15+
const MAX_WAIT_ON_DROP_SECS: u64 = 3;
1216
/// The parts of a child process.
1317
type ChildProcessParts = (
1418
Box<dyn TokioChildWrapper>,
@@ -41,13 +45,35 @@ pub struct TokioChildProcess {
4145
}
4246

4347
pub struct ChildWithCleanup {
44-
inner: Box<dyn TokioChildWrapper>,
48+
inner: Option<Box<dyn TokioChildWrapper>>,
4549
}
4650

4751
impl Drop for ChildWithCleanup {
4852
fn drop(&mut self) {
49-
if let Err(e) = self.inner.start_kill() {
50-
tracing::warn!("Failed to kill child process: {e}");
53+
// Close child more graceful
54+
if let Some(mut child) = self.inner.take() {
55+
// Spawn a background task to clean up
56+
tokio::spawn(async move {
57+
// Wait will drop the stdin
58+
let wait_fut = Box::into_pin(child.wait());
59+
tokio::select! {
60+
_ = tokio::time::sleep(std::time::Duration::from_secs(MAX_WAIT_ON_DROP_SECS)) => {
61+
if let Err(e) = Box::into_pin(child.kill()).await{
62+
tracing::warn!("Error killing child: {e}");
63+
}
64+
},
65+
res = wait_fut => {
66+
match res {
67+
Ok(status) => {
68+
tracing::info!("Child exited gracefully {}", status);
69+
}
70+
Err(e) => {
71+
tracing::warn!("Error waiting for child: {e}");
72+
}
73+
}
74+
}
75+
}
76+
});
5177
}
5278
}
5379
}
@@ -64,7 +90,7 @@ pin_project_lite::pin_project! {
6490
impl TokioChildProcessOut {
6591
/// Get the process ID of the child process.
6692
pub fn id(&self) -> Option<u32> {
67-
self.child.inner.id()
93+
self.child.inner.as_ref()?.id()
6894
}
6995
}
7096

@@ -92,7 +118,7 @@ impl TokioChildProcess {
92118

93119
/// Get the process ID of the child process.
94120
pub fn id(&self) -> Option<u32> {
95-
self.child.inner.id()
121+
self.child.inner.as_ref()?.id()
96122
}
97123

98124
/// Split this helper into a reader (stdout) and writer (stdin).
@@ -157,7 +183,7 @@ impl TokioChildProcessBuilder {
157183
let (child, stdout, stdin, stderr_opt) = child_process(self.cmd.spawn()?)?;
158184

159185
let proc = TokioChildProcess {
160-
child: ChildWithCleanup { inner: child },
186+
child: ChildWithCleanup { inner: Some(child) },
161187
child_stdin: stdin,
162188
child_stdout: stdout,
163189
};
@@ -183,3 +209,45 @@ impl ConfigureCommandExt for tokio::process::Command {
183209
self
184210
}
185211
}
212+
213+
#[cfg(unix)]
214+
#[cfg(test)]
215+
mod tests {
216+
use tokio::process::Command;
217+
218+
use super::*;
219+
220+
#[tokio::test]
221+
async fn test_tokio_child_process_drop() {
222+
let r = TokioChildProcess::new(Command::new("sleep").configure(|cmd| {
223+
cmd.arg("30");
224+
}));
225+
assert!(r.is_ok());
226+
let child_process = r.unwrap();
227+
let id = child_process.id();
228+
assert!(id.is_some());
229+
let id = id.unwrap();
230+
// Drop the child process
231+
drop(child_process);
232+
// Wait a moment to allow the cleanup task to run
233+
tokio::time::sleep(std::time::Duration::from_secs(MAX_WAIT_ON_DROP_SECS + 1)).await;
234+
// Check if the process is still running
235+
let status = Command::new("ps")
236+
.arg("-p")
237+
.arg(id.to_string())
238+
.status()
239+
.await;
240+
match status {
241+
Ok(status) => {
242+
assert!(
243+
!status.success(),
244+
"Process with PID {} is still running",
245+
id
246+
);
247+
}
248+
Err(e) => {
249+
panic!("Failed to check process status: {}", e);
250+
}
251+
}
252+
}
253+
}

0 commit comments

Comments
 (0)