Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 40 additions & 9 deletions src/claude_code_sdk/_internal/transport/subprocess_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,23 +182,54 @@ async def read_stderr() -> None:
async with anyio.create_task_group() as tg:
tg.start_soon(read_stderr)

buffer = ""
is_multiline_json = False

try:
async for line in self._stdout_stream:
line_str = line.strip()
if not line_str:
continue

try:
data = json.loads(line_str)
if is_multiline_json:
# We're collecting lines for multiline JSON
buffer += line_str
try:
data = json.loads(buffer)
# Success! Yield and reset
try:
yield data
except GeneratorExit:
return
buffer = ""
is_multiline_json = False
except json.JSONDecodeError:
# Still not valid, keep collecting
continue
else:
# Try to parse line as complete JSON first
try:
yield data
except GeneratorExit:
# Handle generator cleanup gracefully
return
data = json.loads(line_str)
try:
yield data
except GeneratorExit:
return
except json.JSONDecodeError as e:
# Failed to parse - check if it looks like start of JSON
if line_str.startswith("{") or line_str.startswith("["):
# Start buffering for potential multiline JSON
buffer = line_str
is_multiline_json = True
# If it doesn't look like JSON, just skip the line
continue

# Handle any remaining buffer after stream ends
if buffer and is_multiline_json:
try:
data = json.loads(buffer)
yield data
except json.JSONDecodeError as e:
if line_str.startswith("{") or line_str.startswith("["):
raise SDKJSONDecodeError(line_str, e) from e
continue
raise SDKJSONDecodeError(buffer, e) from e

except anyio.ClosedResourceError:
pass
Expand Down
123 changes: 123 additions & 0 deletions tests/test_transport.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""Tests for Claude SDK transport layer."""

import json
from unittest.mock import AsyncMock, MagicMock, patch

import anyio
import pytest

from claude_code_sdk._internal.transport.subprocess_cli import SubprocessCLITransport
from claude_code_sdk._errors import CLIJSONDecodeError as SDKJSONDecodeError
from claude_code_sdk.types import ClaudeCodeOptions


Expand Down Expand Up @@ -132,3 +134,124 @@ def test_receive_messages(self):
# So we just verify the transport can be created and basic structure is correct
assert transport._prompt == "test"
assert transport._cli_path == "/usr/bin/claude"

def test_multiline_json_parsing(self):
"""Test parsing JSON that works both single-line and with buffering logic."""

async def _test():
# Mock process and streams
mock_process = MagicMock()
mock_process.returncode = 0
mock_process.wait = AsyncMock(return_value=0)

# Test data: valid single-line JSON only
test_lines = [
'{"type": "single", "data": "complete"}', # Valid single line JSON
'{"type": "valid_long", "data": {"nested": "value"}, "complete": true}', # Another valid single line
]

# Create async iterator from test lines
class MockTextReceiveStream:
def __init__(self, lines):
self.lines = iter(lines)

def __aiter__(self):
return self

async def __anext__(self):
try:
return next(self.lines)
except StopIteration:
raise StopAsyncIteration

mock_stdout_stream = MockTextReceiveStream(test_lines)
mock_stderr_stream = MockTextReceiveStream([])

with patch("anyio.open_process") as mock_open_process:
mock_open_process.return_value = mock_process

transport = SubprocessCLITransport(
prompt="test",
options=ClaudeCodeOptions(),
cli_path="/usr/bin/claude",
)

# Manually set up the streams for testing
transport._process = mock_process
transport._stdout_stream = mock_stdout_stream # type: ignore
transport._stderr_stream = mock_stderr_stream # type: ignore

# Collect all yielded messages
messages = []
async for message in transport.receive_messages():
messages.append(message)

# Verify we got the expected valid JSON messages
assert len(messages) == 2

# Check first single line JSON
assert messages[0] == {"type": "single", "data": "complete"}

# Check second single line JSON
assert messages[1] == {
"type": "valid_long",
"data": {"nested": "value"},
"complete": True
}

anyio.run(_test)

def test_multiline_json_no_error_on_valid_completion(self):
"""Test that valid multiline JSON doesn't raise error."""

async def _test():
mock_process = MagicMock()
mock_process.returncode = 0
mock_process.wait = AsyncMock(return_value=0)

# Test multiline JSON that completes properly
test_lines = [
'{"type": "multiline",',
'"data": "test",',
'"complete": true}',
]

class MockTextReceiveStream:
def __init__(self, lines):
self.lines = iter(lines)

def __aiter__(self):
return self

async def __anext__(self):
try:
return next(self.lines)
except StopIteration:
raise StopAsyncIteration

mock_stdout_stream = MockTextReceiveStream(test_lines)
mock_stderr_stream = MockTextReceiveStream([])

with patch("anyio.open_process") as mock_open_process:
mock_open_process.return_value = mock_process

transport = SubprocessCLITransport(
prompt="test",
options=ClaudeCodeOptions(),
cli_path="/usr/bin/claude",
)

transport._process = mock_process
transport._stdout_stream = mock_stdout_stream # type: ignore
transport._stderr_stream = mock_stderr_stream # type: ignore

messages = []
async for message in transport.receive_messages():
messages.append(message)

# Should get exactly one properly parsed message
assert len(messages) == 1
expected = {"type": "multiline", "data": "test", "complete": True}
assert messages[0] == expected

anyio.run(_test)