Skip to content

Issue with JSON parsing in SubprocessCLITransport.receive_messagesΒ #6

@bam4564

Description

@bam4564

This applies to the initial release of the package (as there are no other versions right now).

I encountered an issue when giving the CLI a prompt that generated a long output such that it was split into multiple chunks that were not individual parseable. I modified the processing logic within the function to handle cases where an individual chunk is either

  1. a full JSON object
  2. the start of a JSON object with no terminator (signaling continuation in future messages)
  3. the continuation of a JSON object with no start or end char (signaling a continuation of an already started message)
  4. the termination of a JSON object that was received in chunks

I don't have the bandwidth to fully test this with all the possible inputs received as it seemed I would have to dive deeper into the claude package itself, but here is a reference for how I patched my own implementation and solved the issue for my own use case. Hopefully it's useful for identifying the issue and working on a future solution.

This relates to this issue: #5

Old Implementation

    async def receive_messages(self) -> AsyncIterator[dict[str, Any]]:
        """Receive messages from CLI."""
        if not self._process or not self._stdout_stream:
            raise CLIConnectionError("Not connected")

        stderr_lines = []

        async def read_stderr() -> None:
            """Read stderr in background."""
            if self._stderr_stream:
                try:
                    async for line in self._stderr_stream:
                        stderr_lines.append(line.strip())
                except anyio.ClosedResourceError:
                    pass

        async with anyio.create_task_group() as tg:
            tg.start_soon(read_stderr)

            try:
                async for line in self._stdout_stream:
                    line_str = line.strip()
                    if not line_str:
                        continue

                    try:
                        data = json.loads(line_str)
                        yield data
                    except json.JSONDecodeError as e:
                        if line_str.startswith("{") or line_str.startswith("["):
                            raise SDKJSONDecodeError(line_str, e) from e
                        continue

            except anyio.ClosedResourceError:
                pass
            finally:
                tg.cancel_scope.cancel()

        await self._process.wait()
        if self._process.returncode is not None and self._process.returncode != 0:
            stderr_output = "\n".join(stderr_lines)
            if stderr_output and "error" in stderr_output.lower():
                raise ProcessError(
                    "CLI process failed",
                    exit_code=self._process.returncode,
                    stderr=stderr_output,
                )

Patched Implementation

    async def receive_messages(self) -> AsyncIterator[dict[str, Any]]:
        """Receive messages from CLI."""
        if not self._process or not self._stdout_stream:
            raise CLIConnectionError("Not connected")

        stderr_lines = []

        async def read_stderr() -> None:
            """Read stderr in background."""
            if self._stderr_stream:
                try:
                    async for line in self._stderr_stream:
                        stderr_lines.append(line.strip())
                except anyio.ClosedResourceError:
                    pass

        async with anyio.create_task_group() as tg:
            tg.start_soon(read_stderr)

            try:
                curr = ""
                async for line in self._stdout_stream:
                    line_str = line.strip()
                    if not line_str:
                        continue

                    # Helper functions to check JSON structure
                    def is_complete_json(s: str) -> bool:
                        return (s.startswith("{") and s.endswith("}")) or (s.startswith("[") and s.endswith("]"))
                    
                    def is_starting_json(s: str) -> bool:
                        return (s.startswith("{") and not s.endswith("}")) or (s.startswith("[") and not s.endswith("]"))
                    
                    def is_ending_json(s: str) -> bool:
                        return (not s.startswith("{") and s.endswith("}")) or (not s.startswith("[") and s.endswith("]"))
                    
                    def is_continuing_json(s: str) -> bool:
                        return not (s.startswith("{") or s.startswith("[") or s.endswith("}") or s.endswith("]"))

                    # Process the line based on its JSON structure
                    if is_complete_json(line_str):
                        if curr:
                            raise SDKJSONDecodeError(line_str, BaseException("Did not find terminating sequence for partial JSON object"))
                        yield json.loads(line_str)
                    
                    elif is_starting_json(line_str):
                        if curr:
                            raise SDKJSONDecodeError(line_str, BaseException("Can't start a new JSON object or array without completing the previous one"))
                        curr = line_str
                    
                    elif is_continuing_json(line_str):
                        if not curr:
                            raise SDKJSONDecodeError(line_str, BaseException("Received line without starting JSON object or array"))
                        curr += line_str
                    
                    elif is_ending_json(line_str):
                        if not curr:
                            raise SDKJSONDecodeError(line_str, BaseException("Received line without starting JSON object or array"))
                        curr += line_str
                        yield json.loads(curr)
                        curr = ""  # Reset for next object
                    
                    else:
                        raise SDKJSONDecodeError(line_str, BaseException("Invalid JSON format"))

                if curr:
                    raise SDKJSONDecodeError(curr, BaseException("Incomplete JSON object or array at end of stream"))
                
            except anyio.ClosedResourceError:
                pass
            finally:
                tg.cancel_scope.cancel()

        await self._process.wait()
        if self._process.returncode is not None and self._process.returncode != 0:
            stderr_output = "\n".join(stderr_lines)
            if stderr_output and "error" in stderr_output.lower():
                raise ProcessError(
                    "CLI process failed",
                    exit_code=self._process.returncode,
                    stderr=stderr_output,
                )

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions