Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 54 additions & 12 deletions src/claude_code_sdk/_internal/transport/subprocess_cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Subprocess transport implementation using Claude Code CLI."""

import json
import logging
import os
import shutil
from collections.abc import AsyncIterator
Expand All @@ -17,6 +18,8 @@
from ...types import ClaudeCodeOptions
from . import Transport

logger = logging.getLogger(__name__)

_MAX_BUFFER_SIZE = 1024 * 1024 # 1MB buffer limit


Expand Down Expand Up @@ -170,6 +173,10 @@ async def receive_messages(self) -> AsyncIterator[dict[str, Any]]:
if not self._process or not self._stdout_stream:
raise CLIConnectionError("Not connected")

# Safety constants
MAX_STDERR_SIZE = 10 * 1024 * 1024 # 10MB
STDERR_TIMEOUT = 30.0 # 30 seconds

json_buffer = ""

# Process stdout messages first
Expand Down Expand Up @@ -210,25 +217,60 @@ async def receive_messages(self) -> AsyncIterator[dict[str, Any]]:

except anyio.ClosedResourceError:
pass
except GeneratorExit:
# Client disconnected - still need to clean up
pass

# Read stderr after stdout completes (no concurrent task group)
# Process stderr with safety limits
stderr_lines = []
stderr_size = 0

if self._stderr_stream:
try:
async for line in self._stderr_stream:
stderr_lines.append(line.strip())
# Use timeout to prevent hanging
with anyio.fail_after(STDERR_TIMEOUT):
async for line in self._stderr_stream:
line_text = line.strip()
line_size = len(line_text)

# Enforce memory limit
if stderr_size + line_size > MAX_STDERR_SIZE:
stderr_lines.append(
f"[stderr truncated after {stderr_size} bytes]"
)
# Drain rest of stream without storing
async for _ in self._stderr_stream:
pass
break

stderr_lines.append(line_text)
stderr_size += line_size

except TimeoutError:
stderr_lines.append(
f"[stderr collection timed out after {STDERR_TIMEOUT}s]"
)
except anyio.ClosedResourceError:
pass

await self._process.wait()
if self._process.returncode is not None and self._process.returncode != 0:
stderr_output = "\n".join(stderr_lines)
if stderr_output and "error" in stderr_output.lower():
raise ProcessError(
"CLI process failed",
exit_code=self._process.returncode,
stderr=stderr_output,
)
# Check process completion and handle errors
try:
returncode = await self._process.wait()
except Exception:
returncode = -1

stderr_output = "\n".join(stderr_lines) if stderr_lines else ""

# Use exit code for error detection, not string matching
if returncode != 0:
raise ProcessError(
f"Command failed with exit code {returncode}",
exit_code=returncode,
stderr=stderr_output,
)
elif stderr_output:
# Log stderr for debugging but don't fail on non-zero exit
logger.debug(f"Process stderr: {stderr_output}")

def is_connected(self) -> bool:
"""Check if subprocess is running."""
Expand Down