Skip to content

Commit 64995cc

Browse files
fix: cleanup zombie processes for child process client (#156)
* feat(client): cleanup of zombie processes in child process client Using process wrap library, Process Group for Unix and Job Object for Windows * fix: install extra dep based on feature, update examples take owned command Install process-wrap if transport-child-process feature is enabled. Update examples to take ownership of the command instead of mutable reference * fix: update other examples, comments and readme to take ownership of command Updated more examples, comments and readme to take command instead of mutable ref. Also small fix in cargo toml of example to use local path. * refactor: add configure command ext Added configure command ext to tokio process command so that you can use .configure() to use inline commands for mcp stdio client. Added warning if start kill process fails
1 parent 68ddea3 commit 64995cc

File tree

13 files changed

+115
-77
lines changed

13 files changed

+115
-77
lines changed

README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,17 @@ rmcp = { git = "https://github.com/modelcontextprotocol/rust-sdk", branch = "mai
2222

2323
### Quick start
2424

25-
Start a client in one line:
25+
Start a client:
2626

2727
```rust, ignore
28-
use rmcp::{ServiceExt, transport::TokioChildProcess};
28+
use rmcp::{ServiceExt, transport::{TokioChildProcess, ConfigureCommandExt}};
2929
use tokio::process::Command;
3030
3131
#[tokio::main]
3232
async fn main() -> Result<(), Box<dyn std::error::Error>> {
33-
let client = ().serve(
34-
TokioChildProcess::new(Command::new("npx").arg("-y").arg("@modelcontextprotocol/server-everything"))?
35-
).await?;
33+
let client = ().serve(TokioChildProcess::new(Command::new("npx").configure(|cmd| {
34+
cmd.arg("-y").arg("@modelcontextprotocol/server-everything");
35+
}))?).await?;
3636
Ok(())
3737
}
3838
```
@@ -92,7 +92,7 @@ let server = service.serve(transport).await?;
9292
Once the server is initialized, you can send requests or notifications:
9393

9494
```rust, ignore
95-
// request
95+
// request
9696
let roots = server.list_roots().await?;
9797
9898
// or send notification

crates/rmcp/Cargo.toml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ url = { version = "2.4", optional = true }
4545
# For tower compatibility
4646
tower-service = { version = "0.3", optional = true }
4747

48+
# for child process transport
49+
process-wrap = { version = "8.2", features = ["tokio1"], optional = true}
4850

4951
# for ws transport
5052
# tokio-tungstenite ={ version = "0.26", optional = true }
@@ -89,7 +91,11 @@ transport-streamable-http-client = [
8991

9092
transport-async-rw = ["tokio/io-util", "tokio-util/codec"]
9193
transport-io = ["transport-async-rw", "tokio/io-std"]
92-
transport-child-process = ["transport-async-rw", "tokio/process"]
94+
transport-child-process = [
95+
"transport-async-rw",
96+
"tokio/process",
97+
"dep:process-wrap",
98+
]
9399
transport-sse-server = [
94100
"transport-async-rw",
95101
"axum",

crates/rmcp/src/lib.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,15 +58,13 @@
5858
//!
5959
//! ```rust
6060
//! use anyhow::Result;
61-
//! use rmcp::{model::CallToolRequestParam, service::ServiceExt, transport::TokioChildProcess};
61+
//! use rmcp::{model::CallToolRequestParam, service::ServiceExt, transport::{TokioChildProcess, ConfigureCommandExt}};
6262
//! use tokio::process::Command;
6363
//!
6464
//! async fn client() -> Result<()> {
65-
//! let service = ()
66-
//! .serve(TokioChildProcess::new(
67-
//! Command::new("uvx").arg("mcp-server-git"),
68-
//! )?)
69-
//! .await?;
65+
//! let service = ().serve(TokioChildProcess::new(Command::new("uvx").configure(|cmd| {
66+
//! cmd.arg("mcp-server-git");
67+
//! }))?).await?;
7068
//!
7169
//! // Initialize
7270
//! let server_info = service.peer_info();

crates/rmcp/src/transport.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ pub use worker::WorkerTransport;
8484
pub mod child_process;
8585
#[cfg(feature = "transport-child-process")]
8686
#[cfg_attr(docsrs, doc(cfg(feature = "transport-child-process")))]
87-
pub use child_process::TokioChildProcess;
87+
pub use child_process::{ConfigureCommandExt, TokioChildProcess};
8888

8989
#[cfg(feature = "transport-io")]
9090
#[cfg_attr(docsrs, doc(cfg(feature = "transport-io")))]

crates/rmcp/src/transport/child_process.rs

Lines changed: 45 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use process_wrap::tokio::{TokioChildWrapper, TokioCommandWrap};
12
use tokio::{
23
io::AsyncRead,
34
process::{ChildStdin, ChildStdout},
@@ -7,29 +8,41 @@ use super::{IntoTransport, Transport};
78
use crate::service::ServiceRole;
89

910
pub(crate) fn child_process(
10-
mut child: tokio::process::Child,
11-
) -> std::io::Result<(tokio::process::Child, (ChildStdout, ChildStdin))> {
12-
if child.stdin.is_none() {
13-
return Err(std::io::Error::other("std in was taken"));
14-
}
15-
if child.stdout.is_none() {
16-
return Err(std::io::Error::other("std out was taken"));
17-
}
18-
let child_stdin = child.stdin.take().expect("already checked");
19-
let child_stdout = child.stdout.take().expect("already checked");
11+
mut child: Box<dyn TokioChildWrapper>,
12+
) -> std::io::Result<(Box<dyn TokioChildWrapper>, (ChildStdout, ChildStdin))> {
13+
let child_stdin = match child.inner_mut().stdin().take() {
14+
Some(stdin) => stdin,
15+
None => return Err(std::io::Error::other("std in was taken")),
16+
};
17+
let child_stdout = match child.inner_mut().stdout().take() {
18+
Some(stdout) => stdout,
19+
None => return Err(std::io::Error::other("std out was taken")),
20+
};
2021
Ok((child, (child_stdout, child_stdin)))
2122
}
2223

2324
pub struct TokioChildProcess {
24-
child: tokio::process::Child,
25+
child: ChildWithCleanup,
2526
child_stdin: ChildStdin,
2627
child_stdout: ChildStdout,
2728
}
2829

30+
pub struct ChildWithCleanup {
31+
inner: Box<dyn TokioChildWrapper>,
32+
}
33+
34+
impl Drop for ChildWithCleanup {
35+
fn drop(&mut self) {
36+
if let Err(e) = self.inner.start_kill() {
37+
tracing::warn!("Failed to kill child process: {e}");
38+
}
39+
}
40+
}
41+
2942
// we hold the child process with stdout, for it's easier to implement AsyncRead
3043
pin_project_lite::pin_project! {
3144
pub struct TokioChildProcessOut {
32-
child: tokio::process::Child,
45+
child: ChildWithCleanup,
3346
#[pin]
3447
child_stdout: ChildStdout,
3548
}
@@ -46,14 +59,18 @@ impl AsyncRead for TokioChildProcessOut {
4659
}
4760

4861
impl TokioChildProcess {
49-
pub fn new(child: &mut tokio::process::Command) -> std::io::Result<Self> {
50-
child
51-
.kill_on_drop(true)
62+
pub fn new(mut command: tokio::process::Command) -> std::io::Result<Self> {
63+
command
5264
.stdin(std::process::Stdio::piped())
5365
.stdout(std::process::Stdio::piped());
54-
let (child, (child_stdout, child_stdin)) = child_process(child.spawn()?)?;
66+
let mut command_wrap = TokioCommandWrap::from(command);
67+
#[cfg(unix)]
68+
command_wrap.wrap(process_wrap::tokio::ProcessGroup::leader());
69+
#[cfg(windows)]
70+
command_wrap.wrap(process_wrap::tokio::JobObject);
71+
let (child, (child_stdout, child_stdin)) = child_process(command_wrap.spawn()?)?;
5572
Ok(Self {
56-
child,
73+
child: ChildWithCleanup { inner: child },
5774
child_stdin,
5875
child_stdout,
5976
})
@@ -82,3 +99,14 @@ impl<R: ServiceRole> IntoTransport<R, std::io::Error, ()> for TokioChildProcess
8299
)
83100
}
84101
}
102+
103+
pub trait ConfigureCommandExt {
104+
fn configure(self, f: impl FnOnce(&mut Self)) -> Self;
105+
}
106+
107+
impl ConfigureCommandExt for tokio::process::Command {
108+
fn configure(mut self, f: impl FnOnce(&mut Self)) -> Self {
109+
f(&mut self);
110+
self
111+
}
112+
}

crates/rmcp/tests/test_with_js.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use rmcp::{
22
ServiceExt,
33
service::QuitReason,
44
transport::{
5-
SseServer, StreamableHttpClientTransport, TokioChildProcess,
5+
ConfigureCommandExt, SseServer, StreamableHttpClientTransport, TokioChildProcess,
66
streamable_http_server::axum::StreamableHttpServer,
77
},
88
};
@@ -59,9 +59,10 @@ async fn test_with_js_server() -> anyhow::Result<()> {
5959
.spawn()?
6060
.wait()
6161
.await?;
62-
let transport = TokioChildProcess::new(
63-
tokio::process::Command::new("node").arg("tests/test_with_js/server.js"),
64-
)?;
62+
let transport =
63+
TokioChildProcess::new(tokio::process::Command::new("node").configure(|cmd| {
64+
cmd.arg("tests/test_with_js/server.js");
65+
}))?;
6566

6667
let client = ().serve(transport).await?;
6768
let resources = client.list_all_resources().await?;

crates/rmcp/tests/test_with_python.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use rmcp::{
22
ServiceExt,
3-
transport::{SseServer, TokioChildProcess},
3+
transport::{ConfigureCommandExt, SseServer, TokioChildProcess},
44
};
55
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
66
mod common;
@@ -54,11 +54,9 @@ async fn test_with_python_server() -> anyhow::Result<()> {
5454
.spawn()?
5555
.wait()
5656
.await?;
57-
let transport = TokioChildProcess::new(
58-
tokio::process::Command::new("uv")
59-
.arg("run")
60-
.arg("tests/test_with_python/server.py"),
61-
)?;
57+
let transport = TokioChildProcess::new(tokio::process::Command::new("uv").configure(|cmd| {
58+
cmd.arg("run").arg("tests/test_with_python/server.py");
59+
}))?;
6260

6361
let client = ().serve(transport).await?;
6462
let resources = client.list_all_resources().await?;

docs/readme/README.zh-cn.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@ rmcp = { git = "https://github.com/modelcontextprotocol/rust-sdk", branch = "mai
1717
### 快速上手
1818
一行代码启动客户端:
1919
```rust
20-
use rmcp::{ServiceExt, transport::TokioChildProcess};
20+
use rmcp::{ServiceExt, transport::{TokioChildProcess, ConfigureCommandExt}};
2121
use tokio::process::Command;
2222

23-
let client = ().serve(
24-
TokioChildProcess::new(Command::new("npx").arg("-y").arg("@modelcontextprotocol/server-everything"))?
25-
).await?;
23+
let client = ().serve(TokioChildProcess::new(Command::new("npx").configure(|cmd| {
24+
cmd.arg("-y").arg("@modelcontextprotocol/server-everything");
25+
}))?).await?;
2626
```
2727

2828
#### 1. 构建传输层
@@ -63,7 +63,7 @@ let server = service.serve(transport).await?;
6363
一旦服务初始化完成,你可以发送请求或通知:
6464

6565
```rust, ignore
66-
// 请求
66+
// 请求
6767
let roots = server.list_roots().await?;
6868
6969
// 或发送通知

examples/clients/src/collection.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
use std::collections::HashMap;
22

33
use anyhow::Result;
4-
use rmcp::{model::CallToolRequestParam, service::ServiceExt, transport::TokioChildProcess};
4+
use rmcp::{
5+
model::CallToolRequestParam,
6+
service::ServiceExt,
7+
transport::{ConfigureCommandExt, TokioChildProcess},
8+
};
59
use tokio::process::Command;
610
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
711

@@ -20,9 +24,11 @@ async fn main() -> Result<()> {
2024
for idx in 0..10 {
2125
let service = ()
2226
.into_dyn()
23-
.serve(TokioChildProcess::new(
24-
Command::new("uvx").arg("mcp-server-git"),
25-
)?)
27+
.serve(TokioChildProcess::new(Command::new("uvx").configure(
28+
|cmd| {
29+
cmd.arg("mcp-client-git");
30+
},
31+
))?)
2632
.await?;
2733
client_list.insert(idx, service);
2834
}

examples/clients/src/everything_stdio.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use rmcp::{
33
ServiceExt,
44
model::{CallToolRequestParam, GetPromptRequestParam, ReadResourceRequestParam},
55
object,
6-
transport::TokioChildProcess,
6+
transport::{ConfigureCommandExt, TokioChildProcess},
77
};
88
use tokio::process::Command;
99
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
@@ -21,11 +21,11 @@ async fn main() -> Result<()> {
2121

2222
// Start server
2323
let service = ()
24-
.serve(TokioChildProcess::new(
25-
Command::new("npx")
26-
.arg("-y")
27-
.arg("@modelcontextprotocol/server-everything"),
28-
)?)
24+
.serve(TokioChildProcess::new(Command::new("npx").configure(
25+
|cmd| {
26+
cmd.arg("-y").arg("@modelcontextprotocol/server-everything");
27+
},
28+
))?)
2929
.await?;
3030

3131
// Initialize

0 commit comments

Comments
 (0)