Skip to content

Commit 994bd4a

Browse files
committed
Process-group cleanup for stdio MCP servers to prevent orphan process storms
This PR changes stdio MCP child processes to run in their own process group (process_group(0) on Unix). * Add guarded teardown in codex-rmcp-client: send SIGTERM to the group first, then SIGKILL after a short grace period. * Add terminate_process_group helper in process_group.rs. * Add Unix regression test in process_group_cleanup.rs to verify wrapper + grandchild are reaped on client drop. Addresses reported MCP process/thread storm: #10581
1 parent 224c9f7 commit 994bd4a

File tree

5 files changed

+170
-5
lines changed

5 files changed

+170
-5
lines changed

codex-rs/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

codex-rs/rmcp-client/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ axum = { workspace = true, default-features = false, features = [
1515
] }
1616
codex-keyring-store = { workspace = true }
1717
codex-protocol = { workspace = true }
18+
codex-utils-pty = { workspace = true }
1819
codex-utils-home-dir = { workspace = true }
1920
futures = { workspace = true, default-features = false, features = ["std"] }
2021
keyring = { workspace = true, features = ["crypto-rust"] }

codex-rs/rmcp-client/src/rmcp_client.rs

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,10 @@ use crate::utils::create_env_for_mcp_server;
6060
use crate::utils::run_with_timeout;
6161

6262
enum PendingTransport {
63-
ChildProcess(TokioChildProcess),
63+
ChildProcess {
64+
transport: TokioChildProcess,
65+
process_group_guard: Option<ProcessGroupGuard>,
66+
},
6467
StreamableHttp {
6568
transport: StreamableHttpClientTransport<reqwest::Client>,
6669
},
@@ -75,11 +78,47 @@ enum ClientState {
7578
transport: Option<PendingTransport>,
7679
},
7780
Ready {
81+
_process_group_guard: Option<ProcessGroupGuard>,
7882
service: Arc<RunningService<RoleClient, LoggingClientHandler>>,
7983
oauth: Option<OAuthPersistor>,
8084
},
8185
}
8286

87+
const PROCESS_GROUP_TERM_GRACE_PERIOD: Duration = Duration::from_secs(2);
88+
89+
struct ProcessGroupGuard {
90+
process_group_id: u32,
91+
}
92+
93+
impl ProcessGroupGuard {
94+
fn new(process_group_id: u32) -> Self {
95+
Self { process_group_id }
96+
}
97+
}
98+
99+
impl Drop for ProcessGroupGuard {
100+
fn drop(&mut self) {
101+
#[cfg(unix)]
102+
{
103+
let process_group_id = self.process_group_id;
104+
if let Err(error) =
105+
codex_utils_pty::process_group::terminate_process_group(process_group_id)
106+
{
107+
warn!("Failed to terminate MCP process group {process_group_id}: {error}");
108+
}
109+
110+
std::thread::spawn(move || {
111+
std::thread::sleep(PROCESS_GROUP_TERM_GRACE_PERIOD);
112+
if let Err(error) =
113+
codex_utils_pty::process_group::kill_process_group(process_group_id)
114+
{
115+
warn!("Failed to kill MCP process group {process_group_id}: {error}");
116+
}
117+
});
118+
}
119+
}
120+
}
121+
83122
pub type Elicitation = CreateElicitationRequestParam;
84123
pub type ElicitationResponse = CreateElicitationResult;
85124

@@ -129,13 +168,16 @@ impl RmcpClient {
129168
.env_clear()
130169
.envs(envs)
131170
.args(&args);
171+
#[cfg(unix)]
172+
command.process_group(0);
132173
if let Some(cwd) = cwd {
133174
command.current_dir(cwd);
134175
}
135176

136177
let (transport, stderr) = TokioChildProcess::builder(command)
137178
.stderr(Stdio::piped())
138179
.spawn()?;
180+
let process_group_guard = transport.id().map(ProcessGroupGuard::new);
139181

140182
if let Some(stderr) = stderr {
141183
tokio::spawn(async move {
@@ -157,7 +199,10 @@ impl RmcpClient {
157199

158200
Ok(Self {
159201
state: Mutex::new(ClientState::Connecting {
160-
transport: Some(PendingTransport::ChildProcess(transport)),
202+
transport: Some(PendingTransport::ChildProcess {
203+
transport,
204+
process_group_guard,
205+
}),
161206
}),
162207
})
163208
}
@@ -226,24 +271,30 @@ impl RmcpClient {
226271
) -> Result<InitializeResult> {
227272
let client_handler = LoggingClientHandler::new(params.clone(), send_elicitation);
228273

229-
let (transport, oauth_persistor) = {
274+
let (transport, oauth_persistor, process_group_guard) = {
230275
let mut guard = self.state.lock().await;
231276
match &mut *guard {
232277
ClientState::Connecting { transport } => match transport.take() {
233-
Some(PendingTransport::ChildProcess(transport)) => (
278+
Some(PendingTransport::ChildProcess {
279+
transport,
280+
process_group_guard,
281+
}) => (
234282
service::serve_client(client_handler.clone(), transport).boxed(),
235283
None,
284+
process_group_guard,
236285
),
237286
Some(PendingTransport::StreamableHttp { transport }) => (
238287
service::serve_client(client_handler.clone(), transport).boxed(),
239288
None,
289+
None,
240290
),
241291
Some(PendingTransport::StreamableHttpWithOAuth {
242292
transport,
243293
oauth_persistor,
244294
}) => (
245295
service::serve_client(client_handler.clone(), transport).boxed(),
246296
Some(oauth_persistor),
297+
None,
247298
),
248299
None => return Err(anyhow!("client already initializing")),
249300
},
@@ -270,6 +321,7 @@ impl RmcpClient {
270321
{
271322
let mut guard = self.state.lock().await;
272323
*guard = ClientState::Ready {
324+
_process_group_guard: process_group_guard,
273325
service: Arc::new(service),
274326
oauth: oauth_persistor.clone(),
275327
};
@@ -448,7 +500,7 @@ impl RmcpClient {
448500
match &*guard {
449501
ClientState::Ready {
450502
oauth: Some(runtime),
451-
service: _,
503+
..
452504
} => Some(runtime.clone()),
453505
_ => None,
454506
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
#![cfg(unix)]
2+
3+
use std::collections::HashMap;
4+
use std::ffi::OsString;
5+
use std::fs;
6+
use std::path::Path;
7+
use std::time::Duration;
8+
9+
use anyhow::Context;
10+
use anyhow::Result;
11+
use codex_rmcp_client::RmcpClient;
12+
13+
fn process_exists(pid: u32) -> bool {
14+
std::process::Command::new("kill")
15+
.arg("-0")
16+
.arg(pid.to_string())
17+
.stderr(std::process::Stdio::null())
18+
.status()
19+
.map(|status| status.success())
20+
.unwrap_or(false)
21+
}
22+
23+
async fn wait_for_pid_file(path: &Path) -> Result<u32> {
24+
for _ in 0..50 {
25+
match fs::read_to_string(path) {
26+
Ok(content) => {
27+
let pid = content
28+
.trim()
29+
.parse::<u32>()
30+
.with_context(|| format!("failed to parse pid from {}", path.display()))?;
31+
return Ok(pid);
32+
}
33+
Err(error) if error.kind() == std::io::ErrorKind::NotFound => {
34+
tokio::time::sleep(Duration::from_millis(100)).await;
35+
}
36+
Err(error) => {
37+
return Err(error).with_context(|| format!("failed to read {}", path.display()));
38+
}
39+
}
40+
}
41+
42+
anyhow::bail!("timed out waiting for child pid file at {}", path.display());
43+
}
44+
45+
async fn wait_for_process_exit(pid: u32) -> Result<()> {
46+
for _ in 0..50 {
47+
if !process_exists(pid) {
48+
return Ok(());
49+
}
50+
tokio::time::sleep(Duration::from_millis(100)).await;
51+
}
52+
53+
anyhow::bail!("process {pid} still running after timeout");
54+
}
55+
56+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
57+
async fn drop_kills_wrapper_process_group() -> Result<()> {
58+
let temp_dir = tempfile::tempdir()?;
59+
let child_pid_file = temp_dir.path().join("child.pid");
60+
let child_pid_file_str = child_pid_file.to_string_lossy().into_owned();
61+
62+
let client = RmcpClient::new_stdio_client(
63+
OsString::from("/bin/sh"),
64+
vec![
65+
OsString::from("-c"),
66+
OsString::from(
67+
"sleep 300 & child_pid=$!; echo \"$child_pid\" > \"$CHILD_PID_FILE\"; cat >/dev/null",
68+
),
69+
],
70+
Some(HashMap::from([(
71+
"CHILD_PID_FILE".to_string(),
72+
child_pid_file_str,
73+
)])),
74+
&[],
75+
None,
76+
)
77+
.await?;
78+
79+
let grandchild_pid = wait_for_pid_file(&child_pid_file).await?;
80+
assert!(
81+
process_exists(grandchild_pid),
82+
"expected grandchild process {grandchild_pid} to be running before dropping client"
83+
);
84+
85+
drop(client);
86+
87+
wait_for_process_exit(grandchild_pid).await
88+
}

codex-rs/utils/pty/src/process_group.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,29 @@ pub fn kill_process_group_by_pid(_pid: u32) -> io::Result<()> {
117117
Ok(())
118118
}
119119

120+
#[cfg(unix)]
121+
/// Send SIGTERM to a specific process group ID (best-effort).
122+
pub fn terminate_process_group(process_group_id: u32) -> io::Result<()> {
123+
use std::io::ErrorKind;
124+
125+
let pgid = process_group_id as libc::pid_t;
126+
let result = unsafe { libc::killpg(pgid, libc::SIGTERM) };
127+
if result == -1 {
128+
let err = io::Error::last_os_error();
129+
if err.kind() != ErrorKind::NotFound {
130+
return Err(err);
131+
}
132+
}
133+
134+
Ok(())
135+
}
136+
137+
#[cfg(not(unix))]
138+
/// No-op on non-Unix platforms.
139+
pub fn terminate_process_group(_process_group_id: u32) -> io::Result<()> {
140+
Ok(())
141+
}
142+
120143
#[cfg(unix)]
121144
/// Kill a specific process group ID (best-effort).
122145
pub fn kill_process_group(process_group_id: u32) -> io::Result<()> {

0 commit comments

Comments
 (0)