Skip to content

Commit 8dd9297

Browse files
committed
fix: make stdio shoutdown more graceful
According to the protocol specifications
1 parent 209dbac commit 8dd9297

File tree

1 file changed

+76
-7
lines changed

1 file changed

+76
-7
lines changed

crates/rmcp/src/transport/child_process.rs

Lines changed: 76 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
use std::process::Stdio;
1+
use std::{
2+
process::Stdio,
3+
sync::{Arc, Mutex},
4+
};
25

36
use process_wrap::tokio::{TokioChildWrapper, TokioCommandWrap};
47
use tokio::{
@@ -9,6 +12,7 @@ use tokio::{
912
use super::{IntoTransport, Transport};
1013
use crate::service::ServiceRole;
1114

15+
const MAX_WAIT_ON_DROP_SECS: u64 = 3;
1216
/// The parts of a child process.
1317
type ChildProcessParts = (
1418
Box<dyn TokioChildWrapper>,
@@ -41,13 +45,35 @@ pub struct TokioChildProcess {
4145
}
4246

4347
pub struct ChildWithCleanup {
44-
inner: Box<dyn TokioChildWrapper>,
48+
inner: Arc<Mutex<Option<Box<dyn TokioChildWrapper>>>>,
4549
}
4650

4751
impl Drop for ChildWithCleanup {
4852
fn drop(&mut self) {
49-
if let Err(e) = self.inner.start_kill() {
50-
tracing::warn!("Failed to kill child process: {e}");
53+
// Close child more graceful
54+
if let Some(mut child) = self.inner.lock().unwrap().take() {
55+
// Spawn a background task to clean up
56+
tokio::spawn(async move {
57+
// Wait will drop the stdin
58+
let wait_fut = Box::into_pin(child.wait());
59+
tokio::select! {
60+
_ = tokio::time::sleep(std::time::Duration::from_secs(MAX_WAIT_ON_DROP_SECS)) => {
61+
if let Err(e) = Box::into_pin(child.kill()).await{
62+
tracing::warn!("Error killing child: {e}");
63+
}
64+
},
65+
res = wait_fut => {
66+
match res {
67+
Ok(status) => {
68+
tracing::info!("Child exited gracefully {}", status);
69+
}
70+
Err(e) => {
71+
tracing::warn!("Error waiting for child: {e}");
72+
}
73+
}
74+
}
75+
}
76+
});
5177
}
5278
}
5379
}
@@ -64,7 +90,7 @@ pin_project_lite::pin_project! {
6490
impl TokioChildProcessOut {
6591
/// Get the process ID of the child process.
6692
pub fn id(&self) -> Option<u32> {
67-
self.child.inner.id()
93+
self.child.inner.lock().unwrap().as_ref()?.inner().id()
6894
}
6995
}
7096

@@ -92,7 +118,7 @@ impl TokioChildProcess {
92118

93119
/// Get the process ID of the child process.
94120
pub fn id(&self) -> Option<u32> {
95-
self.child.inner.id()
121+
self.child.inner.lock().unwrap().as_ref()?.inner().id()
96122
}
97123

98124
/// Split this helper into a reader (stdout) and writer (stdin).
@@ -157,7 +183,9 @@ impl TokioChildProcessBuilder {
157183
let (child, stdout, stdin, stderr_opt) = child_process(self.cmd.spawn()?)?;
158184

159185
let proc = TokioChildProcess {
160-
child: ChildWithCleanup { inner: child },
186+
child: ChildWithCleanup {
187+
inner: Arc::new(Mutex::new(Some(child))),
188+
},
161189
child_stdin: stdin,
162190
child_stdout: stdout,
163191
};
@@ -183,3 +211,44 @@ impl ConfigureCommandExt for tokio::process::Command {
183211
self
184212
}
185213
}
214+
215+
#[cfg(unix)]
216+
#[cfg(test)]
217+
mod tests {
218+
use super::*;
219+
use tokio::process::Command;
220+
221+
#[tokio::test]
222+
async fn test_tokio_child_process_drop() {
223+
let r = TokioChildProcess::new(Command::new("sleep").configure(|cmd| {
224+
cmd.arg("30");
225+
}));
226+
assert!(r.is_ok());
227+
let child_process = r.unwrap();
228+
let id = child_process.id();
229+
assert!(id.is_some());
230+
let id = id.unwrap();
231+
// Drop the child process
232+
drop(child_process);
233+
// Wait a moment to allow the cleanup task to run
234+
tokio::time::sleep(std::time::Duration::from_secs(MAX_WAIT_ON_DROP_SECS+1)).await;
235+
// Check if the process is still running
236+
let status = Command::new("ps")
237+
.arg("-p")
238+
.arg(id.to_string())
239+
.status()
240+
.await;
241+
match status {
242+
Ok(status) => {
243+
assert!(
244+
!status.success(),
245+
"Process with PID {} is still running",
246+
id
247+
);
248+
}
249+
Err(e) => {
250+
panic!("Failed to check process status: {}", e);
251+
}
252+
}
253+
}
254+
}

0 commit comments

Comments
 (0)