Skip to content

Improve child process termination on POSIX & Windows #1078

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ dependencies = [
"pydantic-settings>=2.5.2",
"uvicorn>=0.23.1; sys_platform != 'emscripten'",
"jsonschema>=4.20.0",
"pywin32>=310; sys_platform == 'win32'",
]

[project.optional-dependencies]
Expand Down Expand Up @@ -125,4 +126,6 @@ filterwarnings = [
"ignore::DeprecationWarning:websockets",
"ignore:websockets.server.WebSocketServerProtocol is deprecated:DeprecationWarning",
"ignore:Returning str or bytes.*:DeprecationWarning:mcp.server.lowlevel",
# pywin32 internal deprecation warning
"ignore:getargs.*The 'u' format is deprecated:DeprecationWarning"
]
45 changes: 39 additions & 6 deletions src/mcp/client/stdio/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import os
import sys
from contextlib import asynccontextmanager
Expand All @@ -6,17 +7,22 @@

import anyio
import anyio.lowlevel
from anyio.abc import Process
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
from anyio.streams.text import TextReceiveStream
from pydantic import BaseModel, Field

import mcp.types as types
from mcp.shared.message import SessionMessage

from .win32 import (
from mcp.os.posix.utilities import terminate_posix_process_tree
from mcp.os.win32.utilities import (
FallbackProcess,
create_windows_process,
get_windows_executable_command,
terminate_windows_process_tree,
)
from mcp.shared.message import SessionMessage

logger = logging.getLogger(__name__)

# Environment variables to inherit by default
DEFAULT_INHERITED_ENV_VARS = (
Expand Down Expand Up @@ -187,7 +193,7 @@ async def stdin_writer():
await process.wait()
except TimeoutError:
# If process doesn't terminate in time, force kill it
process.kill()
await _terminate_process_tree(process)
except ProcessLookupError:
# Process already exited, which is fine
pass
Expand Down Expand Up @@ -222,11 +228,38 @@ async def _create_platform_compatible_process(
):
"""
Creates a subprocess in a platform-compatible way.
Returns a process handle.
Unix: Creates process in a new session/process group for killpg support
Windows: Creates process in a Job Object for reliable child termination
"""
if sys.platform == "win32":
process = await create_windows_process(command, args, env, errlog, cwd)
else:
process = await anyio.open_process([command, *args], env=env, stderr=errlog, cwd=cwd)
process = await anyio.open_process(
[command, *args],
env=env,
stderr=errlog,
cwd=cwd,
start_new_session=True,
)

return process


async def _terminate_process_tree(process: Process | FallbackProcess, timeout_seconds: float = 2.0) -> None:
"""
Terminate a process and all its children using platform-specific methods.
Unix: Uses os.killpg() for atomic process group termination
Windows: Uses Job Objects via pywin32 for reliable child process cleanup
Args:
process: The process to terminate
timeout_seconds: Timeout in seconds before force killing (default: 2.0)
"""
if sys.platform == "win32":
await terminate_windows_process_tree(process, timeout_seconds)
else:
# FallbackProcess should only be used for Windows compatibility
assert isinstance(process, Process)
await terminate_posix_process_tree(process, timeout_seconds)
1 change: 1 addition & 0 deletions src/mcp/os/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Platform-specific utilities for MCP."""
1 change: 1 addition & 0 deletions src/mcp/os/posix/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""POSIX-specific utilities for MCP."""
60 changes: 60 additions & 0 deletions src/mcp/os/posix/utilities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""
POSIX-specific functionality for stdio client operations.
"""

import logging
import os
import signal

import anyio
from anyio.abc import Process

logger = logging.getLogger(__name__)


async def terminate_posix_process_tree(process: Process, timeout_seconds: float = 2.0) -> None:
"""
Terminate a process and all its children on POSIX systems.
Uses os.killpg() for atomic process group termination.
Args:
process: The process to terminate
timeout_seconds: Timeout in seconds before force killing (default: 2.0)
"""
pid = getattr(process, "pid", None) or getattr(getattr(process, "popen", None), "pid", None)
if not pid:
# No PID means there's no process to terminate - it either never started,
# already exited, or we have an invalid process object
return

try:
pgid = os.getpgid(pid)
os.killpg(pgid, signal.SIGTERM)

with anyio.move_on_after(timeout_seconds):
while True:
try:
# Check if process group still exists (signal 0 = check only)
os.killpg(pgid, 0)
await anyio.sleep(0.1)
except ProcessLookupError:
return

try:
os.killpg(pgid, signal.SIGKILL)
except ProcessLookupError:
pass

except (ProcessLookupError, PermissionError, OSError) as e:
logger.warning(f"Process group termination failed for PID {pid}: {e}, falling back to simple terminate")
try:
process.terminate()
with anyio.fail_after(timeout_seconds):
await process.wait()
except Exception as term_error:
logger.warning(f"Process termination failed for PID {pid}: {term_error}, attempting force kill")
try:
process.kill()
except Exception as kill_error:
logger.error(f"Failed to kill process {pid}: {kill_error}")
1 change: 1 addition & 0 deletions src/mcp/os/win32/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Windows-specific utilities for MCP."""
129 changes: 120 additions & 9 deletions src/mcp/client/stdio/win32.py → src/mcp/os/win32/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Windows-specific functionality for stdio client operations.
"""

import logging
import shutil
import subprocess
import sys
Expand All @@ -14,6 +15,23 @@
from anyio.streams.file import FileReadStream, FileWriteStream
from typing_extensions import deprecated

logger = logging.getLogger("client.stdio.win32")

# Windows-specific imports for Job Objects
if sys.platform == "win32":
import pywintypes
import win32api
import win32con
import win32job
else:
# Type stubs for non-Windows platforms
win32api = None
win32con = None
win32job = None
pywintypes = None

JobHandle = int


def get_windows_executable_command(command: str) -> str:
"""
Expand Down Expand Up @@ -104,6 +122,11 @@ def kill(self) -> None:
"""Kill the subprocess immediately (alias for terminate)."""
self.terminate()

@property
def pid(self) -> int:
"""Return the process ID."""
return self.popen.pid


# ------------------------
# Updated function
Expand All @@ -118,13 +141,16 @@ async def create_windows_process(
cwd: Path | str | None = None,
) -> Process | FallbackProcess:
"""
Creates a subprocess in a Windows-compatible way.
Creates a subprocess in a Windows-compatible way with Job Object support.
Attempt to use anyio's open_process for async subprocess creation.
In some cases this will throw NotImplementedError on Windows, e.g.
when using the SelectorEventLoop which does not support async subprocesses.
In that case, we fall back to using subprocess.Popen.
The process is automatically added to a Job Object to ensure all child
processes are terminated when the parent is terminated.
Args:
command (str): The executable to run
args (list[str]): List of command line arguments
Expand All @@ -133,8 +159,11 @@ async def create_windows_process(
cwd (Path | str | None): Working directory for the subprocess
Returns:
FallbackProcess: Async-compatible subprocess with stdin and stdout streams
Process | FallbackProcess: Async-compatible subprocess with stdin and stdout streams
"""
job = _create_job_object()
process = None

try:
# First try using anyio with Windows-specific flags to hide console window
process = await anyio.open_process(
Expand All @@ -147,10 +176,9 @@ async def create_windows_process(
stderr=errlog,
cwd=cwd,
)
return process
except NotImplementedError:
# Windows often doesn't support async subprocess creation, use fallback
return await _create_windows_fallback_process(command, args, env, errlog, cwd)
# If Windows doesn't support async subprocess creation, use fallback
process = await _create_windows_fallback_process(command, args, env, errlog, cwd)
except Exception:
# Try again without creation flags
process = await anyio.open_process(
Expand All @@ -159,7 +187,9 @@ async def create_windows_process(
stderr=errlog,
cwd=cwd,
)
return process

_maybe_assign_process_to_job(process, job)
return process


async def _create_windows_fallback_process(
Expand All @@ -186,8 +216,6 @@ async def _create_windows_fallback_process(
bufsize=0, # Unbuffered output
creationflags=getattr(subprocess, "CREATE_NO_WINDOW", 0),
)
return FallbackProcess(popen_obj)

except Exception:
# If creationflags failed, fallback without them
popen_obj = subprocess.Popen(
Expand All @@ -199,7 +227,90 @@ async def _create_windows_fallback_process(
cwd=cwd,
bufsize=0,
)
return FallbackProcess(popen_obj)
return FallbackProcess(popen_obj)


def _create_job_object() -> int | None:
"""
Create a Windows Job Object configured to terminate all processes when closed.
"""
if sys.platform != "win32" or not win32job:
return None

try:
job = win32job.CreateJobObject(None, "")
extended_info = win32job.QueryInformationJobObject(job, win32job.JobObjectExtendedLimitInformation)

extended_info["BasicLimitInformation"]["LimitFlags"] |= win32job.JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE
win32job.SetInformationJobObject(job, win32job.JobObjectExtendedLimitInformation, extended_info)
return job
except Exception as e:
logger.warning(f"Failed to create Job Object for process tree management: {e}")
return None


def _maybe_assign_process_to_job(process: Process | FallbackProcess, job: JobHandle | None) -> None:
"""
Try to assign a process to a job object. If assignment fails
for any reason, the job handle is closed.
"""
if not job:
return

if sys.platform != "win32" or not win32api or not win32con or not win32job:
return

try:
process_handle = win32api.OpenProcess(
win32con.PROCESS_SET_QUOTA | win32con.PROCESS_TERMINATE, False, process.pid
)
if not process_handle:
raise Exception("Failed to open process handle")

try:
win32job.AssignProcessToJobObject(job, process_handle)
process._job_object = job
finally:
win32api.CloseHandle(process_handle)
except Exception as e:
logger.warning(f"Failed to assign process {process.pid} to Job Object: {e}")
if win32api:
win32api.CloseHandle(job)


async def terminate_windows_process_tree(process: Process | FallbackProcess, timeout_seconds: float = 2.0) -> None:
"""
Terminate a process and all its children on Windows.
If the process has an associated job object, it will be terminated.
Otherwise, falls back to basic process termination.
Args:
process: The process to terminate
timeout_seconds: Timeout in seconds before force killing (default: 2.0)
"""
if sys.platform != "win32":
return

job = getattr(process, "_job_object", None)
if job and win32job:
try:
win32job.TerminateJobObject(job, 1)
except Exception:
# Job might already be terminated
pass
finally:
if win32api:
try:
win32api.CloseHandle(job)
except Exception:
pass

# Always try to terminate the process itself as well
try:
process.terminate()
except Exception:
pass


@deprecated(
Expand Down
Loading
Loading