diff --git a/src/claude_code_sdk/_internal/transport/subprocess_cli.py b/src/claude_code_sdk/_internal/transport/subprocess_cli.py index f63732a8..097d26b0 100644 --- a/src/claude_code_sdk/_internal/transport/subprocess_cli.py +++ b/src/claude_code_sdk/_internal/transport/subprocess_cli.py @@ -182,23 +182,54 @@ async def read_stderr() -> None: async with anyio.create_task_group() as tg: tg.start_soon(read_stderr) + buffer = "" + is_multiline_json = False + try: async for line in self._stdout_stream: line_str = line.strip() if not line_str: continue - try: - data = json.loads(line_str) + if is_multiline_json: + # We're collecting lines for multiline JSON + buffer += line_str + try: + data = json.loads(buffer) + # Success! Yield and reset + try: + yield data + except GeneratorExit: + return + buffer = "" + is_multiline_json = False + except json.JSONDecodeError: + # Still not valid, keep collecting + continue + else: + # Try to parse line as complete JSON first try: - yield data - except GeneratorExit: - # Handle generator cleanup gracefully - return + data = json.loads(line_str) + try: + yield data + except GeneratorExit: + return + except json.JSONDecodeError as e: + # Failed to parse - check if it looks like start of JSON + if line_str.startswith("{") or line_str.startswith("["): + # Start buffering for potential multiline JSON + buffer = line_str + is_multiline_json = True + # If it doesn't look like JSON, just skip the line + continue + + # Handle any remaining buffer after stream ends + if buffer and is_multiline_json: + try: + data = json.loads(buffer) + yield data except json.JSONDecodeError as e: - if line_str.startswith("{") or line_str.startswith("["): - raise SDKJSONDecodeError(line_str, e) from e - continue + raise SDKJSONDecodeError(buffer, e) from e except anyio.ClosedResourceError: pass diff --git a/tests/test_transport.py b/tests/test_transport.py index 65702bc7..1f60624d 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -1,11 +1,13 @@ """Tests for Claude SDK transport layer.""" +import json from unittest.mock import AsyncMock, MagicMock, patch import anyio import pytest from claude_code_sdk._internal.transport.subprocess_cli import SubprocessCLITransport +from claude_code_sdk._errors import CLIJSONDecodeError as SDKJSONDecodeError from claude_code_sdk.types import ClaudeCodeOptions @@ -132,3 +134,124 @@ def test_receive_messages(self): # So we just verify the transport can be created and basic structure is correct assert transport._prompt == "test" assert transport._cli_path == "/usr/bin/claude" + + def test_multiline_json_parsing(self): + """Test parsing JSON that works both single-line and with buffering logic.""" + + async def _test(): + # Mock process and streams + mock_process = MagicMock() + mock_process.returncode = 0 + mock_process.wait = AsyncMock(return_value=0) + + # Test data: valid single-line JSON only + test_lines = [ + '{"type": "single", "data": "complete"}', # Valid single line JSON + '{"type": "valid_long", "data": {"nested": "value"}, "complete": true}', # Another valid single line + ] + + # Create async iterator from test lines + class MockTextReceiveStream: + def __init__(self, lines): + self.lines = iter(lines) + + def __aiter__(self): + return self + + async def __anext__(self): + try: + return next(self.lines) + except StopIteration: + raise StopAsyncIteration + + mock_stdout_stream = MockTextReceiveStream(test_lines) + mock_stderr_stream = MockTextReceiveStream([]) + + with patch("anyio.open_process") as mock_open_process: + mock_open_process.return_value = mock_process + + transport = SubprocessCLITransport( + prompt="test", + options=ClaudeCodeOptions(), + cli_path="/usr/bin/claude", + ) + + # Manually set up the streams for testing + transport._process = mock_process + transport._stdout_stream = mock_stdout_stream # type: ignore + transport._stderr_stream = mock_stderr_stream # type: ignore + + # Collect all yielded messages + messages = [] + async for message in transport.receive_messages(): + messages.append(message) + + # Verify we got the expected valid JSON messages + assert len(messages) == 2 + + # Check first single line JSON + assert messages[0] == {"type": "single", "data": "complete"} + + # Check second single line JSON + assert messages[1] == { + "type": "valid_long", + "data": {"nested": "value"}, + "complete": True + } + + anyio.run(_test) + + def test_multiline_json_no_error_on_valid_completion(self): + """Test that valid multiline JSON doesn't raise error.""" + + async def _test(): + mock_process = MagicMock() + mock_process.returncode = 0 + mock_process.wait = AsyncMock(return_value=0) + + # Test multiline JSON that completes properly + test_lines = [ + '{"type": "multiline",', + '"data": "test",', + '"complete": true}', + ] + + class MockTextReceiveStream: + def __init__(self, lines): + self.lines = iter(lines) + + def __aiter__(self): + return self + + async def __anext__(self): + try: + return next(self.lines) + except StopIteration: + raise StopAsyncIteration + + mock_stdout_stream = MockTextReceiveStream(test_lines) + mock_stderr_stream = MockTextReceiveStream([]) + + with patch("anyio.open_process") as mock_open_process: + mock_open_process.return_value = mock_process + + transport = SubprocessCLITransport( + prompt="test", + options=ClaudeCodeOptions(), + cli_path="/usr/bin/claude", + ) + + transport._process = mock_process + transport._stdout_stream = mock_stdout_stream # type: ignore + transport._stderr_stream = mock_stderr_stream # type: ignore + + messages = [] + async for message in transport.receive_messages(): + messages.append(message) + + # Should get exactly one properly parsed message + assert len(messages) == 1 + expected = {"type": "multiline", "data": "test", "complete": True} + assert messages[0] == expected + + anyio.run(_test)