Skip to content

Commit da87299

Browse files
committed
fix: make stdio shutdown more graceful
According to the protocol specifications Signed-off-by: jokemanfire <[email protected]>
1 parent 209dbac commit da87299

File tree

1 file changed

+71
-6
lines changed

1 file changed

+71
-6
lines changed

crates/rmcp/src/transport/child_process.rs

Lines changed: 71 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use tokio::{
99
use super::{IntoTransport, Transport};
1010
use crate::service::ServiceRole;
1111

12+
const MAX_WAIT_ON_DROP_SECS: u64 = 3;
1213
/// The parts of a child process.
1314
type ChildProcessParts = (
1415
Box<dyn TokioChildWrapper>,
@@ -41,13 +42,35 @@ pub struct TokioChildProcess {
4142
}
4243

4344
pub struct ChildWithCleanup {
44-
inner: Box<dyn TokioChildWrapper>,
45+
inner: Option<Box<dyn TokioChildWrapper>>,
4546
}
4647

4748
impl Drop for ChildWithCleanup {
4849
fn drop(&mut self) {
49-
if let Err(e) = self.inner.start_kill() {
50-
tracing::warn!("Failed to kill child process: {e}");
50+
// Close child more graceful
51+
if let Some(mut child) = self.inner.take() {
52+
// Spawn a background task to clean up
53+
tokio::spawn(async move {
54+
// Wait will drop the stdin
55+
let wait_fut = Box::into_pin(child.wait());
56+
tokio::select! {
57+
_ = tokio::time::sleep(std::time::Duration::from_secs(MAX_WAIT_ON_DROP_SECS)) => {
58+
if let Err(e) = Box::into_pin(child.kill()).await{
59+
tracing::warn!("Error killing child: {e}");
60+
}
61+
},
62+
res = wait_fut => {
63+
match res {
64+
Ok(status) => {
65+
tracing::info!("Child exited gracefully {}", status);
66+
}
67+
Err(e) => {
68+
tracing::warn!("Error waiting for child: {e}");
69+
}
70+
}
71+
}
72+
}
73+
});
5174
}
5275
}
5376
}
@@ -64,7 +87,7 @@ pin_project_lite::pin_project! {
6487
impl TokioChildProcessOut {
6588
/// Get the process ID of the child process.
6689
pub fn id(&self) -> Option<u32> {
67-
self.child.inner.id()
90+
self.child.inner.as_ref()?.id()
6891
}
6992
}
7093

@@ -92,7 +115,7 @@ impl TokioChildProcess {
92115

93116
/// Get the process ID of the child process.
94117
pub fn id(&self) -> Option<u32> {
95-
self.child.inner.id()
118+
self.child.inner.as_ref()?.id()
96119
}
97120

98121
/// Split this helper into a reader (stdout) and writer (stdin).
@@ -157,7 +180,7 @@ impl TokioChildProcessBuilder {
157180
let (child, stdout, stdin, stderr_opt) = child_process(self.cmd.spawn()?)?;
158181

159182
let proc = TokioChildProcess {
160-
child: ChildWithCleanup { inner: child },
183+
child: ChildWithCleanup { inner: Some(child) },
161184
child_stdin: stdin,
162185
child_stdout: stdout,
163186
};
@@ -183,3 +206,45 @@ impl ConfigureCommandExt for tokio::process::Command {
183206
self
184207
}
185208
}
209+
210+
#[cfg(unix)]
211+
#[cfg(test)]
212+
mod tests {
213+
use tokio::process::Command;
214+
215+
use super::*;
216+
217+
#[tokio::test]
218+
async fn test_tokio_child_process_drop() {
219+
let r = TokioChildProcess::new(Command::new("sleep").configure(|cmd| {
220+
cmd.arg("30");
221+
}));
222+
assert!(r.is_ok());
223+
let child_process = r.unwrap();
224+
let id = child_process.id();
225+
assert!(id.is_some());
226+
let id = id.unwrap();
227+
// Drop the child process
228+
drop(child_process);
229+
// Wait a moment to allow the cleanup task to run
230+
tokio::time::sleep(std::time::Duration::from_secs(MAX_WAIT_ON_DROP_SECS + 1)).await;
231+
// Check if the process is still running
232+
let status = Command::new("ps")
233+
.arg("-p")
234+
.arg(id.to_string())
235+
.status()
236+
.await;
237+
match status {
238+
Ok(status) => {
239+
assert!(
240+
!status.success(),
241+
"Process with PID {} is still running",
242+
id
243+
);
244+
}
245+
Err(e) => {
246+
panic!("Failed to check process status: {}", e);
247+
}
248+
}
249+
}
250+
}

0 commit comments

Comments
 (0)