Skip to content

Commit b637af7

Browse files
committed
fix: properly terminate subprocess
1 parent a5d78e0 commit b637af7

File tree

3 files changed

+107
-2
lines changed

3 files changed

+107
-2
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[project]
22
name = "uipath-mcp"
3-
version = "0.0.57"
3+
version = "0.0.58"
44
description = "UiPath MCP SDK"
55
readme = { file = "README.md", content-type = "text/markdown" }
66
requires-python = ">=3.10"

src/uipath_mcp/_cli/_runtime/_runtime.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
import mcp.types as types
1010
from mcp import ClientSession, StdioServerParameters
11-
from mcp.client.stdio import stdio_client
1211
from opentelemetry import trace
1312
from pysignalr.client import SignalRClient
1413
from uipath import UiPath
@@ -23,6 +22,7 @@
2322
from ._context import UiPathMcpRuntimeContext
2423
from ._exception import UiPathMcpRuntimeError
2524
from ._session import SessionServer
25+
from ._stdio_client import stdio_client
2626

2727
logger = logging.getLogger(__name__)
2828
tracer = trace.get_tracer(__name__)
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
import sys
2+
from contextlib import asynccontextmanager
3+
from typing import TextIO
4+
5+
import anyio
6+
import anyio.lowlevel
7+
import mcp.types as types
8+
from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream
9+
from anyio.streams.text import TextReceiveStream
10+
from mcp.client.stdio import (
11+
StdioServerParameters,
12+
_create_platform_compatible_process,
13+
_get_executable_command,
14+
get_default_environment,
15+
)
16+
17+
18+
@asynccontextmanager
19+
async def stdio_client(server: StdioServerParameters, errlog: TextIO = sys.stderr):
20+
"""
21+
Client transport for stdio: this will connect to a server by spawning a
22+
process and communicating with it over stdin/stdout.
23+
"""
24+
read_stream: MemoryObjectReceiveStream[types.JSONRPCMessage | Exception]
25+
read_stream_writer: MemoryObjectSendStream[types.JSONRPCMessage | Exception]
26+
27+
write_stream: MemoryObjectSendStream[types.JSONRPCMessage]
28+
write_stream_reader: MemoryObjectReceiveStream[types.JSONRPCMessage]
29+
30+
read_stream_writer, read_stream = anyio.create_memory_object_stream(0)
31+
write_stream, write_stream_reader = anyio.create_memory_object_stream(0)
32+
33+
command = _get_executable_command(server.command)
34+
35+
# Open process with stderr piped for capture
36+
process = await _create_platform_compatible_process(
37+
command=command,
38+
args=server.args,
39+
env=(
40+
{**get_default_environment(), **server.env}
41+
if server.env is not None
42+
else get_default_environment()
43+
),
44+
errlog=errlog,
45+
cwd=server.cwd,
46+
)
47+
48+
async def stdout_reader():
49+
assert process.stdout, "Opened process is missing stdout"
50+
51+
try:
52+
async with read_stream_writer:
53+
buffer = ""
54+
async for chunk in TextReceiveStream(
55+
process.stdout,
56+
encoding=server.encoding,
57+
errors=server.encoding_error_handler,
58+
):
59+
lines = (buffer + chunk).split("\n")
60+
buffer = lines.pop()
61+
62+
for line in lines:
63+
try:
64+
message = types.JSONRPCMessage.model_validate_json(line)
65+
except Exception as exc:
66+
await read_stream_writer.send(exc)
67+
continue
68+
69+
await read_stream_writer.send(message)
70+
except anyio.ClosedResourceError:
71+
await anyio.lowlevel.checkpoint()
72+
73+
async def stdin_writer():
74+
assert process.stdin, "Opened process is missing stdin"
75+
76+
try:
77+
async with write_stream_reader:
78+
async for message in write_stream_reader:
79+
json = message.model_dump_json(by_alias=True, exclude_none=True)
80+
await process.stdin.send(
81+
(json + "\n").encode(
82+
encoding=server.encoding,
83+
errors=server.encoding_error_handler,
84+
)
85+
)
86+
except anyio.ClosedResourceError:
87+
await anyio.lowlevel.checkpoint()
88+
89+
async with (
90+
anyio.create_task_group() as tg,
91+
process,
92+
):
93+
tg.start_soon(stdout_reader)
94+
tg.start_soon(stdin_writer)
95+
try:
96+
yield read_stream, write_stream
97+
finally:
98+
# Clean up process to prevent any dangling orphaned processes
99+
try:
100+
process.terminate()
101+
with anyio.fail_after(2.0):
102+
await process.wait()
103+
except TimeoutError:
104+
# Force kill if it doesn't terminate
105+
process.kill()

0 commit comments

Comments
 (0)