Skip to content

Commit 4af210e

Browse files
authored
Merge pull request #53 from anthropics/fix/json-streaming-buffer
fix: handle JSON messages split across multiple stream reads
2 parents 012e1d1 + 8233533 commit 4af210e

File tree

2 files changed

+196
-16
lines changed

2 files changed

+196
-16
lines changed

src/claude_code_sdk/_internal/transport/subprocess_cli.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
from ...types import ClaudeCodeOptions
1818
from . import Transport
1919

20+
_MAX_BUFFER_SIZE = 1024 * 1024 # 1MB buffer limit
21+
2022

2123
class SubprocessCLITransport(Transport):
2224
"""Subprocess transport using Claude Code CLI."""
@@ -182,30 +184,41 @@ async def read_stderr() -> None:
182184
async with anyio.create_task_group() as tg:
183185
tg.start_soon(read_stderr)
184186

187+
json_buffer = ""
188+
185189
try:
186190
async for line in self._stdout_stream:
187191
line_str = line.strip()
188192
if not line_str:
189193
continue
190194

191-
# Split on newlines in case multiple JSON objects are buffered together
192195
json_lines = line_str.split("\n")
193196

194197
for json_line in json_lines:
195198
json_line = json_line.strip()
196199
if not json_line:
197200
continue
198201

202+
# Keep accumulating partial JSON until we can parse it
203+
json_buffer += json_line
204+
205+
if len(json_buffer) > _MAX_BUFFER_SIZE:
206+
json_buffer = ""
207+
raise SDKJSONDecodeError(
208+
f"JSON message exceeded maximum buffer size of {_MAX_BUFFER_SIZE} bytes",
209+
ValueError(
210+
f"Buffer size {len(json_buffer)} exceeds limit {_MAX_BUFFER_SIZE}"
211+
),
212+
)
213+
199214
try:
200-
data = json.loads(json_line)
215+
data = json.loads(json_buffer)
216+
json_buffer = ""
201217
try:
202218
yield data
203219
except GeneratorExit:
204-
# Handle generator cleanup gracefully
205220
return
206-
except json.JSONDecodeError as e:
207-
if json_line.startswith("{") or json_line.startswith("["):
208-
raise SDKJSONDecodeError(json_line, e) from e
221+
except json.JSONDecodeError:
209222
continue
210223

211224
except anyio.ClosedResourceError:

tests/test_subprocess_buffering.py

Lines changed: 177 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,13 @@
66
from unittest.mock import AsyncMock, MagicMock
77

88
import anyio
9+
import pytest
910

10-
from claude_code_sdk._internal.transport.subprocess_cli import SubprocessCLITransport
11+
from claude_code_sdk._errors import CLIJSONDecodeError
12+
from claude_code_sdk._internal.transport.subprocess_cli import (
13+
_MAX_BUFFER_SIZE,
14+
SubprocessCLITransport,
15+
)
1116
from claude_code_sdk.types import ClaudeCodeOptions
1217

1318

@@ -40,34 +45,27 @@ def test_multiple_json_objects_on_single_line(self) -> None:
4045
"""
4146

4247
async def _test() -> None:
43-
# Two valid JSON objects separated by a newline character
4448
json_obj1 = {"type": "message", "id": "msg1", "content": "First message"}
4549
json_obj2 = {"type": "result", "id": "res1", "status": "completed"}
4650

47-
# Simulate buffered output where both objects appear on one line
4851
buffered_line = json.dumps(json_obj1) + "\n" + json.dumps(json_obj2)
4952

50-
# Create transport
5153
transport = SubprocessCLITransport(
5254
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
5355
)
5456

55-
# Mock the process and streams
5657
mock_process = MagicMock()
5758
mock_process.returncode = None
5859
mock_process.wait = AsyncMock(return_value=None)
5960
transport._process = mock_process
6061

61-
# Create mock stream that returns the buffered line
6262
transport._stdout_stream = MockTextReceiveStream([buffered_line]) # type: ignore[assignment]
6363
transport._stderr_stream = MockTextReceiveStream([]) # type: ignore[assignment]
6464

65-
# Collect all messages
6665
messages: list[Any] = []
6766
async for msg in transport.receive_messages():
6867
messages.append(msg)
6968

70-
# Verify both JSON objects were successfully parsed
7169
assert len(messages) == 2
7270
assert messages[0]["type"] == "message"
7371
assert messages[0]["id"] == "msg1"
@@ -82,7 +80,6 @@ def test_json_with_embedded_newlines(self) -> None:
8280
"""Test parsing JSON objects that contain newline characters in string values."""
8381

8482
async def _test() -> None:
85-
# JSON objects with newlines in string values
8683
json_obj1 = {"type": "message", "content": "Line 1\nLine 2\nLine 3"}
8784
json_obj2 = {"type": "result", "data": "Some\nMultiline\nContent"}
8885

@@ -116,7 +113,6 @@ async def _test() -> None:
116113
json_obj1 = {"type": "message", "id": "msg1"}
117114
json_obj2 = {"type": "result", "id": "res1"}
118115

119-
# Multiple newlines between objects
120116
buffered_line = json.dumps(json_obj1) + "\n\n\n" + json.dumps(json_obj2)
121117

122118
transport = SubprocessCLITransport(
@@ -139,3 +135,174 @@ async def _test() -> None:
139135
assert messages[1]["id"] == "res1"
140136

141137
anyio.run(_test)
138+
139+
def test_split_json_across_multiple_reads(self) -> None:
140+
"""Test parsing when a single JSON object is split across multiple stream reads."""
141+
142+
async def _test() -> None:
143+
json_obj = {
144+
"type": "assistant",
145+
"message": {
146+
"content": [
147+
{"type": "text", "text": "x" * 1000},
148+
{
149+
"type": "tool_use",
150+
"id": "tool_123",
151+
"name": "Read",
152+
"input": {"file_path": "/test.txt"},
153+
},
154+
]
155+
},
156+
}
157+
158+
complete_json = json.dumps(json_obj)
159+
160+
part1 = complete_json[:100]
161+
part2 = complete_json[100:250]
162+
part3 = complete_json[250:]
163+
164+
transport = SubprocessCLITransport(
165+
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
166+
)
167+
168+
mock_process = MagicMock()
169+
mock_process.returncode = None
170+
mock_process.wait = AsyncMock(return_value=None)
171+
transport._process = mock_process
172+
transport._stdout_stream = MockTextReceiveStream([part1, part2, part3])
173+
transport._stderr_stream = MockTextReceiveStream([])
174+
175+
messages: list[Any] = []
176+
async for msg in transport.receive_messages():
177+
messages.append(msg)
178+
179+
assert len(messages) == 1
180+
assert messages[0]["type"] == "assistant"
181+
assert len(messages[0]["message"]["content"]) == 2
182+
183+
anyio.run(_test)
184+
185+
def test_large_minified_json(self) -> None:
186+
"""Test parsing a large minified JSON (simulating the reported issue)."""
187+
188+
async def _test() -> None:
189+
large_data = {"data": [{"id": i, "value": "x" * 100} for i in range(1000)]}
190+
json_obj = {
191+
"type": "user",
192+
"message": {
193+
"role": "user",
194+
"content": [
195+
{
196+
"tool_use_id": "toolu_016fed1NhiaMLqnEvrj5NUaj",
197+
"type": "tool_result",
198+
"content": json.dumps(large_data),
199+
}
200+
],
201+
},
202+
}
203+
204+
complete_json = json.dumps(json_obj)
205+
206+
chunk_size = 64 * 1024
207+
chunks = [
208+
complete_json[i : i + chunk_size]
209+
for i in range(0, len(complete_json), chunk_size)
210+
]
211+
212+
transport = SubprocessCLITransport(
213+
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
214+
)
215+
216+
mock_process = MagicMock()
217+
mock_process.returncode = None
218+
mock_process.wait = AsyncMock(return_value=None)
219+
transport._process = mock_process
220+
transport._stdout_stream = MockTextReceiveStream(chunks)
221+
transport._stderr_stream = MockTextReceiveStream([])
222+
223+
messages: list[Any] = []
224+
async for msg in transport.receive_messages():
225+
messages.append(msg)
226+
227+
assert len(messages) == 1
228+
assert messages[0]["type"] == "user"
229+
assert (
230+
messages[0]["message"]["content"][0]["tool_use_id"]
231+
== "toolu_016fed1NhiaMLqnEvrj5NUaj"
232+
)
233+
234+
anyio.run(_test)
235+
236+
def test_buffer_size_exceeded(self) -> None:
237+
"""Test that exceeding buffer size raises an appropriate error."""
238+
239+
async def _test() -> None:
240+
huge_incomplete = '{"data": "' + "x" * (_MAX_BUFFER_SIZE + 1000)
241+
242+
transport = SubprocessCLITransport(
243+
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
244+
)
245+
246+
mock_process = MagicMock()
247+
mock_process.returncode = None
248+
mock_process.wait = AsyncMock(return_value=None)
249+
transport._process = mock_process
250+
transport._stdout_stream = MockTextReceiveStream([huge_incomplete])
251+
transport._stderr_stream = MockTextReceiveStream([])
252+
253+
with pytest.raises(Exception) as exc_info:
254+
messages: list[Any] = []
255+
async for msg in transport.receive_messages():
256+
messages.append(msg)
257+
258+
assert len(exc_info.value.exceptions) == 1
259+
assert isinstance(exc_info.value.exceptions[0], CLIJSONDecodeError)
260+
assert "exceeded maximum buffer size" in str(exc_info.value.exceptions[0])
261+
262+
anyio.run(_test)
263+
264+
def test_mixed_complete_and_split_json(self) -> None:
265+
"""Test handling a mix of complete and split JSON messages."""
266+
267+
async def _test() -> None:
268+
msg1 = json.dumps({"type": "system", "subtype": "start"})
269+
270+
large_msg = {
271+
"type": "assistant",
272+
"message": {"content": [{"type": "text", "text": "y" * 5000}]},
273+
}
274+
large_json = json.dumps(large_msg)
275+
276+
msg3 = json.dumps({"type": "system", "subtype": "end"})
277+
278+
lines = [
279+
msg1 + "\n",
280+
large_json[:1000],
281+
large_json[1000:3000],
282+
large_json[3000:] + "\n" + msg3,
283+
]
284+
285+
transport = SubprocessCLITransport(
286+
prompt="test", options=ClaudeCodeOptions(), cli_path="/usr/bin/claude"
287+
)
288+
289+
mock_process = MagicMock()
290+
mock_process.returncode = None
291+
mock_process.wait = AsyncMock(return_value=None)
292+
transport._process = mock_process
293+
transport._stdout_stream = MockTextReceiveStream(lines)
294+
transport._stderr_stream = MockTextReceiveStream([])
295+
296+
messages: list[Any] = []
297+
async for msg in transport.receive_messages():
298+
messages.append(msg)
299+
300+
assert len(messages) == 3
301+
assert messages[0]["type"] == "system"
302+
assert messages[0]["subtype"] == "start"
303+
assert messages[1]["type"] == "assistant"
304+
assert len(messages[1]["message"]["content"][0]["text"]) == 5000
305+
assert messages[2]["type"] == "system"
306+
assert messages[2]["subtype"] == "end"
307+
308+
anyio.run(_test)

0 commit comments

Comments
 (0)