Skip to content

Commit 9bda4e8

Browse files
authored
Merge pull request #5 from Bradley-Butcher/fix/subprocess-buffering-issue
fix multi-line buffering issue
2 parents 3bc37b5 + 7610791 commit 9bda4e8

File tree

2 files changed

+159
-10
lines changed

2 files changed

+159
-10
lines changed

src/claude_code_sdk/_internal/transport/subprocess_cli.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -188,17 +188,25 @@ async def read_stderr() -> None:
188188
if not line_str:
189189
continue
190190

191-
try:
192-
data = json.loads(line_str)
191+
# Split on newlines in case multiple JSON objects are buffered together
192+
json_lines = line_str.split("\n")
193+
194+
for json_line in json_lines:
195+
json_line = json_line.strip()
196+
if not json_line:
197+
continue
198+
193199
try:
194-
yield data
195-
except GeneratorExit:
196-
# Handle generator cleanup gracefully
197-
return
198-
except json.JSONDecodeError as e:
199-
if line_str.startswith("{") or line_str.startswith("["):
200-
raise SDKJSONDecodeError(line_str, e) from e
201-
continue
200+
data = json.loads(json_line)
201+
try:
202+
yield data
203+
except GeneratorExit:
204+
# Handle generator cleanup gracefully
205+
return
206+
except json.JSONDecodeError as e:
207+
if json_line.startswith("{") or json_line.startswith("["):
208+
raise SDKJSONDecodeError(json_line, e) from e
209+
continue
202210

203211
except anyio.ClosedResourceError:
204212
pass

tests/test_subprocess_buffering.py

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
"""Tests for subprocess transport buffering edge cases."""
2+
3+
import json
4+
from collections.abc import AsyncIterator
5+
from typing import Any
6+
from unittest.mock import AsyncMock, MagicMock
7+
8+
import anyio
9+
10+
from claude_code_sdk._internal.transport.subprocess_cli import SubprocessCLITransport
11+
from claude_code_sdk.types import ClaudeCodeOptions
12+
13+
14+
class MockTextReceiveStream:
15+
"""Mock TextReceiveStream for testing."""
16+
17+
def __init__(self, lines: list[str]) -> None:
18+
self.lines = lines
19+
self.index = 0
20+
21+
def __aiter__(self) -> AsyncIterator[str]:
22+
return self
23+
24+
async def __anext__(self) -> str:
25+
if self.index >= len(self.lines):
26+
raise StopAsyncIteration
27+
line = self.lines[self.index]
28+
self.index += 1
29+
return line
30+
31+
32+
class TestSubprocessBuffering:
33+
"""Test subprocess transport handling of buffered output."""
34+
35+
def test_multiple_json_objects_on_single_line(self) -> None:
36+
"""Test parsing when multiple JSON objects are concatenated on a single line.
37+
38+
In some environments, stdout buffering can cause multiple distinct JSON
39+
objects to be delivered as a single line with embedded newlines.
40+
"""
41+
42+
async def _test() -> None:
43+
# Two valid JSON objects separated by a newline character
44+
json_obj1 = {"type": "message", "id": "msg1", "content": "First message"}
45+
json_obj2 = {"type": "result", "id": "res1", "status": "completed"}
46+
47+
# Simulate buffered output where both objects appear on one line
48+
buffered_line = json.dumps(json_obj1) + "\n" + json.dumps(json_obj2)
49+
50+
# Create transport
51+
transport = SubprocessCLITransport(
52+
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
53+
)
54+
55+
# Mock the process and streams
56+
mock_process = MagicMock()
57+
mock_process.returncode = None
58+
mock_process.wait = AsyncMock(return_value=None)
59+
transport._process = mock_process
60+
61+
# Create mock stream that returns the buffered line
62+
transport._stdout_stream = MockTextReceiveStream([buffered_line]) # type: ignore[assignment]
63+
transport._stderr_stream = MockTextReceiveStream([]) # type: ignore[assignment]
64+
65+
# Collect all messages
66+
messages: list[Any] = []
67+
async for msg in transport.receive_messages():
68+
messages.append(msg)
69+
70+
# Verify both JSON objects were successfully parsed
71+
assert len(messages) == 2
72+
assert messages[0]["type"] == "message"
73+
assert messages[0]["id"] == "msg1"
74+
assert messages[0]["content"] == "First message"
75+
assert messages[1]["type"] == "result"
76+
assert messages[1]["id"] == "res1"
77+
assert messages[1]["status"] == "completed"
78+
79+
anyio.run(_test)
80+
81+
def test_json_with_embedded_newlines(self) -> None:
82+
"""Test parsing JSON objects that contain newline characters in string values."""
83+
84+
async def _test() -> None:
85+
# JSON objects with newlines in string values
86+
json_obj1 = {"type": "message", "content": "Line 1\nLine 2\nLine 3"}
87+
json_obj2 = {"type": "result", "data": "Some\nMultiline\nContent"}
88+
89+
buffered_line = json.dumps(json_obj1) + "\n" + json.dumps(json_obj2)
90+
91+
transport = SubprocessCLITransport(
92+
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
93+
)
94+
95+
mock_process = MagicMock()
96+
mock_process.returncode = None
97+
mock_process.wait = AsyncMock(return_value=None)
98+
transport._process = mock_process
99+
transport._stdout_stream = MockTextReceiveStream([buffered_line])
100+
transport._stderr_stream = MockTextReceiveStream([])
101+
102+
messages: list[Any] = []
103+
async for msg in transport.receive_messages():
104+
messages.append(msg)
105+
106+
assert len(messages) == 2
107+
assert messages[0]["content"] == "Line 1\nLine 2\nLine 3"
108+
assert messages[1]["data"] == "Some\nMultiline\nContent"
109+
110+
anyio.run(_test)
111+
112+
def test_multiple_newlines_between_objects(self) -> None:
113+
"""Test parsing with multiple newlines between JSON objects."""
114+
115+
async def _test() -> None:
116+
json_obj1 = {"type": "message", "id": "msg1"}
117+
json_obj2 = {"type": "result", "id": "res1"}
118+
119+
# Multiple newlines between objects
120+
buffered_line = json.dumps(json_obj1) + "\n\n\n" + json.dumps(json_obj2)
121+
122+
transport = SubprocessCLITransport(
123+
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
124+
)
125+
126+
mock_process = MagicMock()
127+
mock_process.returncode = None
128+
mock_process.wait = AsyncMock(return_value=None)
129+
transport._process = mock_process
130+
transport._stdout_stream = MockTextReceiveStream([buffered_line])
131+
transport._stderr_stream = MockTextReceiveStream([])
132+
133+
messages: list[Any] = []
134+
async for msg in transport.receive_messages():
135+
messages.append(msg)
136+
137+
assert len(messages) == 2
138+
assert messages[0]["id"] == "msg1"
139+
assert messages[1]["id"] == "res1"
140+
141+
anyio.run(_test)

0 commit comments

Comments
 (0)