Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 45 additions & 20 deletions src/claude_code_sdk/_internal/transport/subprocess_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import anyio
from anyio.abc import Process
from anyio.streams.text import TextReceiveStream

from ..._errors import CLIConnectionError, CLINotFoundError, ProcessError
from ..._errors import CLIJSONDecodeError as SDKJSONDecodeError
Expand All @@ -32,8 +31,8 @@ def __init__(
self._cli_path = str(cli_path) if cli_path else self._find_cli()
self._cwd = str(options.cwd) if options.cwd else None
self._process: Process | None = None
self._stdout_stream: TextReceiveStream | None = None
self._stderr_stream: TextReceiveStream | None = None
self._stdout_stream: Any | None = None # Raw stdout stream
self._stderr_stream: Any | None = None # Raw stderr stream

def _find_cli(self) -> str:
"""Find Claude Code CLI binary."""
Expand Down Expand Up @@ -131,9 +130,11 @@ async def connect(self) -> None:
)

if self._process.stdout:
self._stdout_stream = TextReceiveStream(self._process.stdout)
# Use raw stream to avoid TextReceiveStream issues with large JSON
self._stdout_stream = self._process.stdout
if self._process.stderr:
self._stderr_stream = TextReceiveStream(self._process.stderr)
# Use raw stream for stderr as well
self._stderr_stream = self._process.stderr

except FileNotFoundError as e:
raise CLINotFoundError(f"Claude Code not found at: {self._cli_path}") from e
Expand Down Expand Up @@ -174,31 +175,55 @@ async def read_stderr() -> None:
"""Read stderr in background."""
if self._stderr_stream:
try:
async for line in self._stderr_stream:
stderr_lines.append(line.strip())
# Read as bytes and decode
buffer = b""
async for chunk in self._stderr_stream:
buffer += chunk
while b'\n' in buffer:
line, buffer = buffer.split(b'\n', 1)
stderr_lines.append(line.decode('utf-8', errors='replace').strip())
except anyio.ClosedResourceError:
pass

async with anyio.create_task_group() as tg:
tg.start_soon(read_stderr)

try:
async for line in self._stdout_stream:
line_str = line.strip()
if not line_str:
continue
# Read stdout as bytes to properly handle large JSON lines
buffer = b""
async for chunk in self._stdout_stream:
buffer += chunk

# Process complete lines
while b'\n' in buffer:
line_bytes, buffer = buffer.split(b'\n', 1)
line_str = line_bytes.decode('utf-8', errors='replace').strip()

if not line_str:
continue

try:
data = json.loads(line_str)
try:
data = json.loads(line_str)
try:
yield data
except GeneratorExit:
# Handle generator cleanup gracefully
return
except json.JSONDecodeError as e:
if line_str.startswith("{") or line_str.startswith("["):
raise SDKJSONDecodeError(line_str, e) from e
continue

# Process any remaining data in buffer
if buffer:
line_str = buffer.decode('utf-8', errors='replace').strip()
if line_str:
try:
data = json.loads(line_str)
yield data
except GeneratorExit:
# Handle generator cleanup gracefully
return
except json.JSONDecodeError as e:
if line_str.startswith("{") or line_str.startswith("["):
raise SDKJSONDecodeError(line_str, e) from e
continue
except json.JSONDecodeError:
# Incomplete JSON at end, ignore
pass

except anyio.ClosedResourceError:
pass
Expand Down
240 changes: 240 additions & 0 deletions src/claude_code_sdk/_internal/transport/subprocess_cli_fixed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
"""Subprocess transport implementation using Claude Code CLI with fix for large JSON messages."""

import json
import os
import shutil
from collections.abc import AsyncIterator
from pathlib import Path
from subprocess import PIPE
from typing import Any

import anyio
from anyio.abc import Process

from ..._errors import CLIConnectionError, CLINotFoundError, ProcessError
from ..._errors import CLIJSONDecodeError as SDKJSONDecodeError
from ...types import ClaudeCodeOptions
from . import Transport


class SubprocessCLITransport(Transport):
"""Subprocess transport using Claude Code CLI."""

def __init__(
self,
prompt: str,
options: ClaudeCodeOptions,
cli_path: str | Path | None = None,
):
self._prompt = prompt
self._options = options
self._cli_path = str(cli_path) if cli_path else self._find_cli()
self._cwd = str(options.cwd) if options.cwd else None
self._process: Process | None = None
self._stdout: Any = None # Store raw stdout
self._stderr: Any = None # Store raw stderr

def _find_cli(self) -> str:
"""Find Claude Code CLI binary."""
if cli := shutil.which("claude"):
return cli

locations = [
Path.home() / ".npm-global/bin/claude",
Path("/usr/local/bin/claude"),
Path.home() / ".local/bin/claude",
Path.home() / "node_modules/.bin/claude",
Path.home() / ".yarn/bin/claude",
]

for path in locations:
if path.exists() and path.is_file():
return str(path)

node_installed = shutil.which("node") is not None

if not node_installed:
error_msg = "Claude Code requires Node.js, which is not installed.\n\n"
error_msg += "Install Node.js from: https://nodejs.org/\n"
error_msg += "\nAfter installing Node.js, install Claude Code:\n"
error_msg += " npm install -g @anthropic-ai/claude-code"
raise CLINotFoundError(error_msg)

raise CLINotFoundError(
"Claude Code not found. Install with:\n"
" npm install -g @anthropic-ai/claude-code\n"
"\nIf already installed locally, try:\n"
' export PATH="$HOME/node_modules/.bin:$PATH"\n'
"\nOr specify the path when creating transport:\n"
" SubprocessCLITransport(..., cli_path='/path/to/claude')"
)

def _build_command(self) -> list[str]:
"""Build CLI command with arguments."""
cmd = [self._cli_path, "--output-format", "stream-json", "--verbose"]

if self._options.system_prompt:
cmd.extend(["--system-prompt", self._options.system_prompt])

if self._options.append_system_prompt:
cmd.extend(["--append-system-prompt", self._options.append_system_prompt])

if self._options.allowed_tools:
cmd.extend(["--allowedTools", ",".join(self._options.allowed_tools)])

if self._options.max_turns:
cmd.extend(["--max-turns", str(self._options.max_turns)])

if self._options.disallowed_tools:
cmd.extend(["--disallowedTools", ",".join(self._options.disallowed_tools)])

if self._options.model:
cmd.extend(["--model", self._options.model])

if self._options.permission_prompt_tool_name:
cmd.extend(
["--permission-prompt-tool", self._options.permission_prompt_tool_name]
)

if self._options.permission_mode:
cmd.extend(["--permission-mode", self._options.permission_mode])

if self._options.continue_conversation:
cmd.append("--continue")

if self._options.resume:
cmd.extend(["--resume", self._options.resume])

if self._options.mcp_servers:
cmd.extend(
["--mcp-config", json.dumps({"mcpServers": self._options.mcp_servers})]
)

cmd.extend(["--print", self._prompt])
return cmd

async def connect(self) -> None:
"""Start subprocess."""
if self._process:
return

cmd = self._build_command()
try:
self._process = await anyio.open_process(
cmd,
stdin=None,
stdout=PIPE,
stderr=PIPE,
cwd=self._cwd,
env={**os.environ, "CLAUDE_CODE_ENTRYPOINT": "sdk-py"},
)

# Store raw streams instead of wrapping in TextReceiveStream
self._stdout = self._process.stdout
self._stderr = self._process.stderr

except FileNotFoundError as e:
raise CLINotFoundError(f"Claude Code not found at: {self._cli_path}") from e
except Exception as e:
raise CLIConnectionError(f"Failed to start Claude Code: {e}") from e

async def disconnect(self) -> None:
"""Terminate subprocess."""
if not self._process:
return

if self._process.returncode is None:
try:
self._process.terminate()
with anyio.fail_after(5.0):
await self._process.wait()
except TimeoutError:
self._process.kill()
await self._process.wait()
except ProcessLookupError:
pass

self._process = None
self._stdout = None
self._stderr = None

async def send_request(self, messages: list[Any], options: dict[str, Any]) -> None:
"""Not used for CLI transport - args passed via command line."""

async def receive_messages(self) -> AsyncIterator[dict[str, Any]]:
"""Receive messages from CLI with proper handling of large JSON lines."""
if not self._process or not self._stdout:
raise CLIConnectionError("Not connected")

stderr_lines = []

async def read_stderr() -> None:
"""Read stderr in background."""
if self._stderr:
try:
# Read stderr as bytes and decode
buffer = b""
async for chunk in self._stderr:
buffer += chunk
while b'\n' in buffer:
line, buffer = buffer.split(b'\n', 1)
stderr_lines.append(line.decode('utf-8', errors='replace').strip())
except anyio.ClosedResourceError:
pass

async with anyio.create_task_group() as tg:
tg.start_soon(read_stderr)

try:
# Read stdout as bytes to avoid TextReceiveStream issues
buffer = b""
async for chunk in self._stdout:
buffer += chunk

# Process complete lines
while b'\n' in buffer:
line_bytes, buffer = buffer.split(b'\n', 1)
line_str = line_bytes.decode('utf-8', errors='replace').strip()

if not line_str:
continue

try:
data = json.loads(line_str)
try:
yield data
except GeneratorExit:
# Handle generator cleanup gracefully
return
except json.JSONDecodeError as e:
if line_str.startswith("{") or line_str.startswith("["):
raise SDKJSONDecodeError(line_str, e) from e
continue

# Process any remaining data in buffer
if buffer:
line_str = buffer.decode('utf-8', errors='replace').strip()
if line_str:
try:
data = json.loads(line_str)
yield data
except json.JSONDecodeError:
# Incomplete JSON at end, ignore
pass

except anyio.ClosedResourceError:
pass

await self._process.wait()
if self._process.returncode is not None and self._process.returncode != 0:
stderr_output = "\n".join(stderr_lines)
if stderr_output and "error" in stderr_output.lower():
raise ProcessError(
"CLI process failed",
exit_code=self._process.returncode,
stderr=stderr_output,
)

def is_connected(self) -> bool:
"""Check if subprocess is running."""
return self._process is not None and self._process.returncode is None