diff --git a/src/claude_code_sdk/_internal/transport/subprocess_cli.py b/src/claude_code_sdk/_internal/transport/subprocess_cli.py index f63732a8..242ce08f 100644 --- a/src/claude_code_sdk/_internal/transport/subprocess_cli.py +++ b/src/claude_code_sdk/_internal/transport/subprocess_cli.py @@ -10,7 +10,6 @@ import anyio from anyio.abc import Process -from anyio.streams.text import TextReceiveStream from ..._errors import CLIConnectionError, CLINotFoundError, ProcessError from ..._errors import CLIJSONDecodeError as SDKJSONDecodeError @@ -32,8 +31,8 @@ def __init__( self._cli_path = str(cli_path) if cli_path else self._find_cli() self._cwd = str(options.cwd) if options.cwd else None self._process: Process | None = None - self._stdout_stream: TextReceiveStream | None = None - self._stderr_stream: TextReceiveStream | None = None + self._stdout_stream: Any | None = None # Raw stdout stream + self._stderr_stream: Any | None = None # Raw stderr stream def _find_cli(self) -> str: """Find Claude Code CLI binary.""" @@ -131,9 +130,11 @@ async def connect(self) -> None: ) if self._process.stdout: - self._stdout_stream = TextReceiveStream(self._process.stdout) + # Use raw stream to avoid TextReceiveStream issues with large JSON + self._stdout_stream = self._process.stdout if self._process.stderr: - self._stderr_stream = TextReceiveStream(self._process.stderr) + # Use raw stream for stderr as well + self._stderr_stream = self._process.stderr except FileNotFoundError as e: raise CLINotFoundError(f"Claude Code not found at: {self._cli_path}") from e @@ -174,8 +175,13 @@ async def read_stderr() -> None: """Read stderr in background.""" if self._stderr_stream: try: - async for line in self._stderr_stream: - stderr_lines.append(line.strip()) + # Read as bytes and decode + buffer = b"" + async for chunk in self._stderr_stream: + buffer += chunk + while b'\n' in buffer: + line, buffer = buffer.split(b'\n', 1) + stderr_lines.append(line.decode('utf-8', errors='replace').strip()) except anyio.ClosedResourceError: pass @@ -183,22 +189,41 @@ async def read_stderr() -> None: tg.start_soon(read_stderr) try: - async for line in self._stdout_stream: - line_str = line.strip() - if not line_str: - continue + # Read stdout as bytes to properly handle large JSON lines + buffer = b"" + async for chunk in self._stdout_stream: + buffer += chunk + + # Process complete lines + while b'\n' in buffer: + line_bytes, buffer = buffer.split(b'\n', 1) + line_str = line_bytes.decode('utf-8', errors='replace').strip() + + if not line_str: + continue - try: - data = json.loads(line_str) try: + data = json.loads(line_str) + try: + yield data + except GeneratorExit: + # Handle generator cleanup gracefully + return + except json.JSONDecodeError as e: + if line_str.startswith("{") or line_str.startswith("["): + raise SDKJSONDecodeError(line_str, e) from e + continue + + # Process any remaining data in buffer + if buffer: + line_str = buffer.decode('utf-8', errors='replace').strip() + if line_str: + try: + data = json.loads(line_str) yield data - except GeneratorExit: - # Handle generator cleanup gracefully - return - except json.JSONDecodeError as e: - if line_str.startswith("{") or line_str.startswith("["): - raise SDKJSONDecodeError(line_str, e) from e - continue + except json.JSONDecodeError: + # Incomplete JSON at end, ignore + pass except anyio.ClosedResourceError: pass diff --git a/src/claude_code_sdk/_internal/transport/subprocess_cli_fixed.py b/src/claude_code_sdk/_internal/transport/subprocess_cli_fixed.py new file mode 100644 index 00000000..b270a5ba --- /dev/null +++ b/src/claude_code_sdk/_internal/transport/subprocess_cli_fixed.py @@ -0,0 +1,240 @@ +"""Subprocess transport implementation using Claude Code CLI with fix for large JSON messages.""" + +import json +import os +import shutil +from collections.abc import AsyncIterator +from pathlib import Path +from subprocess import PIPE +from typing import Any + +import anyio +from anyio.abc import Process + +from ..._errors import CLIConnectionError, CLINotFoundError, ProcessError +from ..._errors import CLIJSONDecodeError as SDKJSONDecodeError +from ...types import ClaudeCodeOptions +from . import Transport + + +class SubprocessCLITransport(Transport): + """Subprocess transport using Claude Code CLI.""" + + def __init__( + self, + prompt: str, + options: ClaudeCodeOptions, + cli_path: str | Path | None = None, + ): + self._prompt = prompt + self._options = options + self._cli_path = str(cli_path) if cli_path else self._find_cli() + self._cwd = str(options.cwd) if options.cwd else None + self._process: Process | None = None + self._stdout: Any = None # Store raw stdout + self._stderr: Any = None # Store raw stderr + + def _find_cli(self) -> str: + """Find Claude Code CLI binary.""" + if cli := shutil.which("claude"): + return cli + + locations = [ + Path.home() / ".npm-global/bin/claude", + Path("/usr/local/bin/claude"), + Path.home() / ".local/bin/claude", + Path.home() / "node_modules/.bin/claude", + Path.home() / ".yarn/bin/claude", + ] + + for path in locations: + if path.exists() and path.is_file(): + return str(path) + + node_installed = shutil.which("node") is not None + + if not node_installed: + error_msg = "Claude Code requires Node.js, which is not installed.\n\n" + error_msg += "Install Node.js from: https://nodejs.org/\n" + error_msg += "\nAfter installing Node.js, install Claude Code:\n" + error_msg += " npm install -g @anthropic-ai/claude-code" + raise CLINotFoundError(error_msg) + + raise CLINotFoundError( + "Claude Code not found. Install with:\n" + " npm install -g @anthropic-ai/claude-code\n" + "\nIf already installed locally, try:\n" + ' export PATH="$HOME/node_modules/.bin:$PATH"\n' + "\nOr specify the path when creating transport:\n" + " SubprocessCLITransport(..., cli_path='/path/to/claude')" + ) + + def _build_command(self) -> list[str]: + """Build CLI command with arguments.""" + cmd = [self._cli_path, "--output-format", "stream-json", "--verbose"] + + if self._options.system_prompt: + cmd.extend(["--system-prompt", self._options.system_prompt]) + + if self._options.append_system_prompt: + cmd.extend(["--append-system-prompt", self._options.append_system_prompt]) + + if self._options.allowed_tools: + cmd.extend(["--allowedTools", ",".join(self._options.allowed_tools)]) + + if self._options.max_turns: + cmd.extend(["--max-turns", str(self._options.max_turns)]) + + if self._options.disallowed_tools: + cmd.extend(["--disallowedTools", ",".join(self._options.disallowed_tools)]) + + if self._options.model: + cmd.extend(["--model", self._options.model]) + + if self._options.permission_prompt_tool_name: + cmd.extend( + ["--permission-prompt-tool", self._options.permission_prompt_tool_name] + ) + + if self._options.permission_mode: + cmd.extend(["--permission-mode", self._options.permission_mode]) + + if self._options.continue_conversation: + cmd.append("--continue") + + if self._options.resume: + cmd.extend(["--resume", self._options.resume]) + + if self._options.mcp_servers: + cmd.extend( + ["--mcp-config", json.dumps({"mcpServers": self._options.mcp_servers})] + ) + + cmd.extend(["--print", self._prompt]) + return cmd + + async def connect(self) -> None: + """Start subprocess.""" + if self._process: + return + + cmd = self._build_command() + try: + self._process = await anyio.open_process( + cmd, + stdin=None, + stdout=PIPE, + stderr=PIPE, + cwd=self._cwd, + env={**os.environ, "CLAUDE_CODE_ENTRYPOINT": "sdk-py"}, + ) + + # Store raw streams instead of wrapping in TextReceiveStream + self._stdout = self._process.stdout + self._stderr = self._process.stderr + + except FileNotFoundError as e: + raise CLINotFoundError(f"Claude Code not found at: {self._cli_path}") from e + except Exception as e: + raise CLIConnectionError(f"Failed to start Claude Code: {e}") from e + + async def disconnect(self) -> None: + """Terminate subprocess.""" + if not self._process: + return + + if self._process.returncode is None: + try: + self._process.terminate() + with anyio.fail_after(5.0): + await self._process.wait() + except TimeoutError: + self._process.kill() + await self._process.wait() + except ProcessLookupError: + pass + + self._process = None + self._stdout = None + self._stderr = None + + async def send_request(self, messages: list[Any], options: dict[str, Any]) -> None: + """Not used for CLI transport - args passed via command line.""" + + async def receive_messages(self) -> AsyncIterator[dict[str, Any]]: + """Receive messages from CLI with proper handling of large JSON lines.""" + if not self._process or not self._stdout: + raise CLIConnectionError("Not connected") + + stderr_lines = [] + + async def read_stderr() -> None: + """Read stderr in background.""" + if self._stderr: + try: + # Read stderr as bytes and decode + buffer = b"" + async for chunk in self._stderr: + buffer += chunk + while b'\n' in buffer: + line, buffer = buffer.split(b'\n', 1) + stderr_lines.append(line.decode('utf-8', errors='replace').strip()) + except anyio.ClosedResourceError: + pass + + async with anyio.create_task_group() as tg: + tg.start_soon(read_stderr) + + try: + # Read stdout as bytes to avoid TextReceiveStream issues + buffer = b"" + async for chunk in self._stdout: + buffer += chunk + + # Process complete lines + while b'\n' in buffer: + line_bytes, buffer = buffer.split(b'\n', 1) + line_str = line_bytes.decode('utf-8', errors='replace').strip() + + if not line_str: + continue + + try: + data = json.loads(line_str) + try: + yield data + except GeneratorExit: + # Handle generator cleanup gracefully + return + except json.JSONDecodeError as e: + if line_str.startswith("{") or line_str.startswith("["): + raise SDKJSONDecodeError(line_str, e) from e + continue + + # Process any remaining data in buffer + if buffer: + line_str = buffer.decode('utf-8', errors='replace').strip() + if line_str: + try: + data = json.loads(line_str) + yield data + except json.JSONDecodeError: + # Incomplete JSON at end, ignore + pass + + except anyio.ClosedResourceError: + pass + + await self._process.wait() + if self._process.returncode is not None and self._process.returncode != 0: + stderr_output = "\n".join(stderr_lines) + if stderr_output and "error" in stderr_output.lower(): + raise ProcessError( + "CLI process failed", + exit_code=self._process.returncode, + stderr=stderr_output, + ) + + def is_connected(self) -> bool: + """Check if subprocess is running.""" + return self._process is not None and self._process.returncode is None \ No newline at end of file