Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 19 additions & 6 deletions src/claude_code_sdk/_internal/transport/subprocess_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from ...types import ClaudeCodeOptions
from . import Transport

_MAX_BUFFER_SIZE = 1024 * 1024 # 1MB buffer limit


class SubprocessCLITransport(Transport):
"""Subprocess transport using Claude Code CLI."""
Expand Down Expand Up @@ -182,30 +184,41 @@ async def read_stderr() -> None:
async with anyio.create_task_group() as tg:
tg.start_soon(read_stderr)

json_buffer = ""

try:
async for line in self._stdout_stream:
line_str = line.strip()
if not line_str:
continue

# Split on newlines in case multiple JSON objects are buffered together
json_lines = line_str.split("\n")

for json_line in json_lines:
json_line = json_line.strip()
if not json_line:
continue

# Keep accumulating partial JSON until we can parse it
json_buffer += json_line

if len(json_buffer) > _MAX_BUFFER_SIZE:
json_buffer = ""
raise SDKJSONDecodeError(
f"JSON message exceeded maximum buffer size of {_MAX_BUFFER_SIZE} bytes",
ValueError(
f"Buffer size {len(json_buffer)} exceeds limit {_MAX_BUFFER_SIZE}"
),
)

try:
data = json.loads(json_line)
data = json.loads(json_buffer)
json_buffer = ""
try:
yield data
except GeneratorExit:
# Handle generator cleanup gracefully
return
except json.JSONDecodeError as e:
if json_line.startswith("{") or json_line.startswith("["):
raise SDKJSONDecodeError(json_line, e) from e
except json.JSONDecodeError:
continue

except anyio.ClosedResourceError:
Expand Down
187 changes: 177 additions & 10 deletions tests/test_subprocess_buffering.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@
from unittest.mock import AsyncMock, MagicMock

import anyio
import pytest

from claude_code_sdk._internal.transport.subprocess_cli import SubprocessCLITransport
from claude_code_sdk._errors import CLIJSONDecodeError
from claude_code_sdk._internal.transport.subprocess_cli import (
_MAX_BUFFER_SIZE,
SubprocessCLITransport,
)
from claude_code_sdk.types import ClaudeCodeOptions


Expand Down Expand Up @@ -40,34 +45,27 @@ def test_multiple_json_objects_on_single_line(self) -> None:
"""

async def _test() -> None:
# Two valid JSON objects separated by a newline character
json_obj1 = {"type": "message", "id": "msg1", "content": "First message"}
json_obj2 = {"type": "result", "id": "res1", "status": "completed"}

# Simulate buffered output where both objects appear on one line
buffered_line = json.dumps(json_obj1) + "\n" + json.dumps(json_obj2)

# Create transport
transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)

# Mock the process and streams
mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process

# Create mock stream that returns the buffered line
transport._stdout_stream = MockTextReceiveStream([buffered_line]) # type: ignore[assignment]
transport._stderr_stream = MockTextReceiveStream([]) # type: ignore[assignment]

# Collect all messages
messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)

# Verify both JSON objects were successfully parsed
assert len(messages) == 2
assert messages[0]["type"] == "message"
assert messages[0]["id"] == "msg1"
Expand All @@ -82,7 +80,6 @@ def test_json_with_embedded_newlines(self) -> None:
"""Test parsing JSON objects that contain newline characters in string values."""

async def _test() -> None:
# JSON objects with newlines in string values
json_obj1 = {"type": "message", "content": "Line 1\nLine 2\nLine 3"}
json_obj2 = {"type": "result", "data": "Some\nMultiline\nContent"}

Expand Down Expand Up @@ -116,7 +113,6 @@ async def _test() -> None:
json_obj1 = {"type": "message", "id": "msg1"}
json_obj2 = {"type": "result", "id": "res1"}

# Multiple newlines between objects
buffered_line = json.dumps(json_obj1) + "\n\n\n" + json.dumps(json_obj2)

transport = SubprocessCLITransport(
Expand All @@ -139,3 +135,174 @@ async def _test() -> None:
assert messages[1]["id"] == "res1"

anyio.run(_test)

def test_split_json_across_multiple_reads(self) -> None:
"""Test parsing when a single JSON object is split across multiple stream reads."""

async def _test() -> None:
json_obj = {
"type": "assistant",
"message": {
"content": [
{"type": "text", "text": "x" * 1000},
{
"type": "tool_use",
"id": "tool_123",
"name": "Read",
"input": {"file_path": "/test.txt"},
},
]
},
}

complete_json = json.dumps(json_obj)

part1 = complete_json[:100]
part2 = complete_json[100:250]
part3 = complete_json[250:]

transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)

mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream([part1, part2, part3])
transport._stderr_stream = MockTextReceiveStream([])

messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)

assert len(messages) == 1
assert messages[0]["type"] == "assistant"
assert len(messages[0]["message"]["content"]) == 2

anyio.run(_test)

def test_large_minified_json(self) -> None:
"""Test parsing a large minified JSON (simulating the reported issue)."""

async def _test() -> None:
large_data = {"data": [{"id": i, "value": "x" * 100} for i in range(1000)]}
json_obj = {
"type": "user",
"message": {
"role": "user",
"content": [
{
"tool_use_id": "toolu_016fed1NhiaMLqnEvrj5NUaj",
"type": "tool_result",
"content": json.dumps(large_data),
}
],
},
}

complete_json = json.dumps(json_obj)

chunk_size = 64 * 1024
chunks = [
complete_json[i : i + chunk_size]
for i in range(0, len(complete_json), chunk_size)
]

transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)

mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream(chunks)
transport._stderr_stream = MockTextReceiveStream([])

messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)

assert len(messages) == 1
assert messages[0]["type"] == "user"
assert (
messages[0]["message"]["content"][0]["tool_use_id"]
== "toolu_016fed1NhiaMLqnEvrj5NUaj"
)

anyio.run(_test)

def test_buffer_size_exceeded(self) -> None:
"""Test that exceeding buffer size raises an appropriate error."""

async def _test() -> None:
huge_incomplete = '{"data": "' + "x" * (_MAX_BUFFER_SIZE + 1000)

transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)

mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream([huge_incomplete])
transport._stderr_stream = MockTextReceiveStream([])

with pytest.raises(Exception) as exc_info:
messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)

assert len(exc_info.value.exceptions) == 1
assert isinstance(exc_info.value.exceptions[0], CLIJSONDecodeError)
assert "exceeded maximum buffer size" in str(exc_info.value.exceptions[0])

anyio.run(_test)

def test_mixed_complete_and_split_json(self) -> None:
"""Test handling a mix of complete and split JSON messages."""

async def _test() -> None:
msg1 = json.dumps({"type": "system", "subtype": "start"})

large_msg = {
"type": "assistant",
"message": {"content": [{"type": "text", "text": "y" * 5000}]},
}
large_json = json.dumps(large_msg)

msg3 = json.dumps({"type": "system", "subtype": "end"})

lines = [
msg1 + "\n",
large_json[:1000],
large_json[1000:3000],
large_json[3000:] + "\n" + msg3,
]

transport = SubprocessCLITransport(
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
)

mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream(lines)
transport._stderr_stream = MockTextReceiveStream([])

messages: list[Any] = []
async for msg in transport.receive_messages():
messages.append(msg)

assert len(messages) == 3
assert messages[0]["type"] == "system"
assert messages[0]["subtype"] == "start"
assert messages[1]["type"] == "assistant"
assert len(messages[1]["message"]["content"][0]["text"]) == 5000
assert messages[2]["type"] == "system"
assert messages[2]["subtype"] == "end"

anyio.run(_test)