|
| 1 | +""" |
| 2 | +Utilities for working with stdio-based MCP transports. |
| 3 | +
|
| 4 | +In MCP 1.19 the stdio client started forwarding JSON parsing errors from the |
| 5 | +server's stdout stream as exceptions on the transport. Many MCP servers still |
| 6 | +emit setup logs on stdout (e.g. package managers), which now surface as noisy |
| 7 | +tracebacks for every log line. This module wraps the upstream stdio transport |
| 8 | +and filters out clearly non-JSON stdout lines so that normal logging output |
| 9 | +does not bubble up as transport errors. |
| 10 | +""" |
| 11 | + |
| 12 | +from __future__ import annotations |
| 13 | + |
| 14 | +from contextlib import asynccontextmanager |
| 15 | +from typing import AsyncGenerator, Iterable |
| 16 | + |
| 17 | +import anyio |
| 18 | +from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream |
| 19 | +from pydantic import ValidationError |
| 20 | + |
| 21 | +from mcp.client.stdio import StdioServerParameters, stdio_client |
| 22 | +from mcp.shared.message import SessionMessage |
| 23 | + |
| 24 | +from mcp_agent.logging.logger import get_logger |
| 25 | + |
| 26 | +logger = get_logger(__name__) |
| 27 | + |
| 28 | +# JSON-RPC messages should always be JSON objects, but we keep literal checks |
| 29 | +# to stay conservative if upstream ever sends arrays or literals. |
| 30 | +_LITERAL_PREFIXES: tuple[str, ...] = ("true", "false", "null") |
| 31 | +_MESSAGE_START_CHARS = {"{", "["} |
| 32 | + |
| 33 | + |
| 34 | +def _should_ignore_exception(exc: Exception) -> bool: |
| 35 | + """ |
| 36 | + Returns True when the exception represents a non-JSON stdout line that we can |
| 37 | + safely drop. |
| 38 | + """ |
| 39 | + if not isinstance(exc, ValidationError): |
| 40 | + return False |
| 41 | + |
| 42 | + errors: Iterable[dict] = exc.errors() |
| 43 | + first = next(iter(errors), None) |
| 44 | + if not first or first.get("type") != "json_invalid": |
| 45 | + return False |
| 46 | + |
| 47 | + input_value = first.get("input") |
| 48 | + if not isinstance(input_value, str): |
| 49 | + return False |
| 50 | + |
| 51 | + stripped = input_value.strip() |
| 52 | + if not stripped: |
| 53 | + return True |
| 54 | + |
| 55 | + first_char = stripped[0] |
| 56 | + lowered = stripped.lower() |
| 57 | + if first_char in _MESSAGE_START_CHARS or any( |
| 58 | + lowered.startswith(prefix) for prefix in _LITERAL_PREFIXES |
| 59 | + ): |
| 60 | + # Likely a legitimate JSON payload; don't swallow |
| 61 | + return False |
| 62 | + |
| 63 | + return True |
| 64 | + |
| 65 | + |
| 66 | +def _truncate(value: str, length: int = 120) -> str: |
| 67 | + """ |
| 68 | + Truncate long log lines so debug output remains readable. |
| 69 | + """ |
| 70 | + if len(value) <= length: |
| 71 | + return value |
| 72 | + return value[: length - 3] + "..." |
| 73 | + |
| 74 | + |
| 75 | +@asynccontextmanager |
| 76 | +async def filtered_stdio_client( |
| 77 | + server_name: str, server: StdioServerParameters |
| 78 | +) -> AsyncGenerator[ |
| 79 | + tuple[ |
| 80 | + MemoryObjectReceiveStream[SessionMessage | Exception], |
| 81 | + MemoryObjectSendStream[SessionMessage], |
| 82 | + ], |
| 83 | + None, |
| 84 | +]: |
| 85 | + """ |
| 86 | + Wrap the upstream stdio_client so obviously non-JSON stdout lines are filtered. |
| 87 | + """ |
| 88 | + async with stdio_client(server=server) as (read_stream, write_stream): |
| 89 | + filtered_send, filtered_recv = anyio.create_memory_object_stream[ |
| 90 | + SessionMessage | Exception |
| 91 | + ](0) |
| 92 | + |
| 93 | + async def _forward_stdout() -> None: |
| 94 | + try: |
| 95 | + async with read_stream: |
| 96 | + async for item in read_stream: |
| 97 | + if isinstance(item, Exception) and _should_ignore_exception( |
| 98 | + item |
| 99 | + ): |
| 100 | + try: |
| 101 | + errors = item.errors() # type: ignore[attr-defined] |
| 102 | + offending = errors[0].get("input", "") if errors else "" |
| 103 | + except Exception: |
| 104 | + offending = "" |
| 105 | + if offending: |
| 106 | + logger.debug( |
| 107 | + "%s: ignoring non-JSON stdout: %s", |
| 108 | + server_name, |
| 109 | + _truncate(str(offending)), |
| 110 | + ) |
| 111 | + else: |
| 112 | + logger.debug( |
| 113 | + "%s: ignoring non-JSON stdout (unable to capture)", |
| 114 | + server_name, |
| 115 | + ) |
| 116 | + continue |
| 117 | + |
| 118 | + try: |
| 119 | + await filtered_send.send(item) |
| 120 | + except anyio.ClosedResourceError: |
| 121 | + break |
| 122 | + except anyio.ClosedResourceError: |
| 123 | + # Consumer closed; nothing else to forward |
| 124 | + pass |
| 125 | + finally: |
| 126 | + await filtered_send.aclose() |
| 127 | + |
| 128 | + async with anyio.create_task_group() as tg: |
| 129 | + tg.start_soon(_forward_stdout) |
| 130 | + try: |
| 131 | + yield filtered_recv, write_stream |
| 132 | + finally: |
| 133 | + tg.cancel_scope.cancel() |
0 commit comments