Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 119 additions & 9 deletions src/claude_code_sdk/_internal/transport/subprocess_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,113 @@
from . import Transport


class JSONStreamParser:
"""Handles parsing of potentially incomplete JSON streams."""

def __init__(self):
self._buffer = ""

def add_data(self, data: str) -> list[dict[str, Any]]:
"""Add new data to buffer and return any complete JSON objects."""
self._buffer += data
return self._extract_complete_objects()

def _extract_complete_objects(self) -> list[dict[str, Any]]:
"""Extract all complete JSON objects from the buffer."""
objects = []

# Handle newline-separated JSON objects
lines = self._buffer.split('\n')

# Keep the last line in buffer
self._buffer = lines[-1] if lines else ""

# Process all complete lines
for line in lines[:-1]:
line = line.strip()
if not line:
continue

if parsed_obj := self._try_parse_single_line(line):
objects.append(parsed_obj)

# Try to parse remaining buffer for complete objects
while self._buffer:
parsed_obj, remaining = self._try_parse_partial_buffer()
if parsed_obj:
objects.append(parsed_obj)
self._buffer = remaining
else:
break

return objects

def _try_parse_single_line(self, line: str) -> dict[str, Any] | None:
"""Try to parse a single line as JSON."""
try:
return json.loads(line)
except json.JSONDecodeError:
# If single line fails, add to buffer for partial parsing
self._buffer = line + "\n" + self._buffer
return None

def _try_parse_partial_buffer(self) -> tuple[dict[str, Any] | None, str]:
"""Try to extract complete JSON object from partial buffer."""
buffer = self._buffer.strip()
if not buffer:
return None, ""

# Quick attempt at full parse
try:
return json.loads(buffer), ""
except json.JSONDecodeError:
pass

# Try to find complete JSON object by tracking braces
complete_object = self._find_complete_json_object(buffer)
if complete_object:
try:
parsed = json.loads(complete_object)
remaining = buffer[len(complete_object):].strip()
return parsed, remaining
except json.JSONDecodeError:
pass

return None, buffer

def _find_complete_json_object(self, text: str) -> str | None:
"""Find the first complete JSON object in text using brace counting."""
if not text.startswith('{'):
return None

brace_count = 0
in_string = False
escape_next = False

for i, char in enumerate(text):
if escape_next:
escape_next = False
continue

if char == '\\':
escape_next = True
continue

if char == '"' and not escape_next:
in_string = not in_string
continue

if not in_string:
if char == '{':
brace_count += 1
elif char == '}':
brace_count -= 1
if brace_count == 0:
return text[:i + 1]

return None


class SubprocessCLITransport(Transport):
"""Subprocess transport using Claude Code CLI."""

Expand Down Expand Up @@ -169,6 +276,7 @@ async def receive_messages(self) -> AsyncIterator[dict[str, Any]]:
raise CLIConnectionError("Not connected")

stderr_lines = []
json_parser = JSONStreamParser()

async def read_stderr() -> None:
"""Read stderr in background."""
Expand All @@ -188,17 +296,19 @@ async def read_stderr() -> None:
if not line_str:
continue

# Parse potentially incomplete JSON stream
try:
data = json.loads(line_str)
try:
yield data
except GeneratorExit:
# Handle generator cleanup gracefully
return
except json.JSONDecodeError as e:
complete_objects = json_parser.add_data(line_str)
for json_obj in complete_objects:
yield json_obj
except Exception as e:
# If parsing fails completely, try simple fallback
if line_str.startswith("{") or line_str.startswith("["):
raise SDKJSONDecodeError(line_str, e) from e
continue
try:
data = json.loads(line_str)
yield data
except json.JSONDecodeError as json_err:
raise SDKJSONDecodeError(line_str, json_err) from json_err

except anyio.ClosedResourceError:
pass
Expand Down