Skip to content

Commit 2c8c7fd

Browse files
authored
Address anyio.BrokenResourceError issue from #139 (#149)
## Summary This PR addresses the `anyio.BrokenResourceError` issue from #139 and aligns the Python SDK's error handling with the TypeScript SDK implementation. ## Changes ### Fix stream closure issue in Query - Removed the `async with` context manager from `Query.receive_messages()` that was closing the stream after first use - The stream now remains open for the entire session, allowing multiple queries in streaming mode - This fixes the `BrokenResourceError` that occurred during multi-turn conversations ### Align subprocess transport with TypeScript SDK Following the TypeScript `ProcessTransport` implementation pattern: - **Added `_exit_error` tracking**: Captures and preserves process-level errors for better error propagation - **Enhanced `write()` method checks**: - Validates transport readiness before writing - Checks if process is still alive (exit code) - Checks for stored exit errors before attempting writes - Marks transport as not ready on write failures - **Improved error handling in `connect()`**: Stores errors as `_exit_error` for later reference - **Simplified `is_ready()` method**: Now just returns the `_ready` flag, matching TypeScript's simpler approach ### Other improvements - Added asyncio pytest plugin configuration (`-p asyncio` in pyproject.toml) - Added clarifying comment about TextReceiveStream line handling ## Testing The multi-turn conversation example now works correctly: ```bash python examples/streaming_mode.py multi_turn_conversation ``` ## Related Issues Fixes #139
1 parent 681f46c commit 2c8c7fd

File tree

3 files changed

+50
-27
lines changed

3 files changed

+50
-27
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ testpaths = ["tests"]
6161
pythonpath = ["src"]
6262
addopts = [
6363
"--import-mode=importlib",
64+
"-p", "asyncio",
6465
]
6566

6667
[tool.pytest-asyncio]

src/claude_code_sdk/_internal/query.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -456,15 +456,14 @@ async def stream_input(self, stream: AsyncIterable[dict[str, Any]]) -> None:
456456

457457
async def receive_messages(self) -> AsyncIterator[dict[str, Any]]:
458458
"""Receive SDK messages (not control messages)."""
459-
async with self._message_receive:
460-
async for message in self._message_receive:
461-
# Check for special messages
462-
if message.get("type") == "end":
463-
break
464-
elif message.get("type") == "error":
465-
raise Exception(message.get("error", "Unknown error"))
466-
467-
yield message
459+
async for message in self._message_receive:
460+
# Check for special messages
461+
if message.get("type") == "end":
462+
break
463+
elif message.get("type") == "error":
464+
raise Exception(message.get("error", "Unknown error"))
465+
466+
yield message
468467

469468
async def close(self) -> None:
470469
"""Close the query and transport."""

src/claude_code_sdk/_internal/transport/subprocess_cli.py

Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ def __init__(
4646
self._stdin_stream: TextSendStream | None = None
4747
self._stderr_file: Any = None # tempfile.NamedTemporaryFile
4848
self._ready = False
49+
self._exit_error: Exception | None = None # Track process exit errors
4950

5051
def _find_cli(self) -> str:
5152
"""Find Claude Code CLI binary."""
@@ -213,12 +214,18 @@ async def connect(self) -> None:
213214
except FileNotFoundError as e:
214215
# Check if the error comes from the working directory or the CLI
215216
if self._cwd and not Path(self._cwd).exists():
216-
raise CLIConnectionError(
217+
error = CLIConnectionError(
217218
f"Working directory does not exist: {self._cwd}"
218-
) from e
219-
raise CLINotFoundError(f"Claude Code not found at: {self._cli_path}") from e
219+
)
220+
self._exit_error = error
221+
raise error from e
222+
error = CLINotFoundError(f"Claude Code not found at: {self._cli_path}")
223+
self._exit_error = error
224+
raise error from e
220225
except Exception as e:
221-
raise CLIConnectionError(f"Failed to start Claude Code: {e}") from e
226+
error = CLIConnectionError(f"Failed to start Claude Code: {e}")
227+
self._exit_error = error
228+
raise error from e
222229

223230
async def close(self) -> None:
224231
"""Close the transport and clean up resources."""
@@ -259,23 +266,41 @@ async def close(self) -> None:
259266
self._stdout_stream = None
260267
self._stderr_stream = None
261268
self._stdin_stream = None
269+
self._exit_error = None
262270

263271
async def write(self, data: str) -> None:
264272
"""Write raw data to the transport."""
265-
if not self._stdin_stream:
266-
raise CLIConnectionError("Cannot write: stdin not available")
273+
# Check if ready (like TypeScript)
274+
if not self._ready or not self._stdin_stream:
275+
raise CLIConnectionError("ProcessTransport is not ready for writing")
276+
277+
# Check if process is still alive (like TypeScript)
278+
if self._process and self._process.returncode is not None:
279+
raise CLIConnectionError(
280+
f"Cannot write to terminated process (exit code: {self._process.returncode})"
281+
)
282+
283+
# Check for exit errors (like TypeScript)
284+
if self._exit_error:
285+
raise CLIConnectionError(
286+
f"Cannot write to process that exited with error: {self._exit_error}"
287+
) from self._exit_error
267288

268-
await self._stdin_stream.send(data)
289+
try:
290+
await self._stdin_stream.send(data)
291+
except Exception as e:
292+
self._ready = False # Mark as not ready (like TypeScript)
293+
self._exit_error = CLIConnectionError(
294+
f"Failed to write to process stdin: {e}"
295+
)
296+
raise self._exit_error from e
269297

270298
async def end_input(self) -> None:
271299
"""End the input stream (close stdin)."""
272300
if self._stdin_stream:
273301
with suppress(Exception):
274302
await self._stdin_stream.aclose()
275303
self._stdin_stream = None
276-
if self._process and self._process.stdin:
277-
with suppress(Exception):
278-
await self._process.stdin.aclose()
279304

280305
def read_messages(self) -> AsyncIterator[dict[str, Any]]:
281306
"""Read and parse messages from the transport."""
@@ -295,6 +320,9 @@ async def _read_messages_impl(self) -> AsyncIterator[dict[str, Any]]:
295320
if not line_str:
296321
continue
297322

323+
# Accumulate partial JSON until we can parse it
324+
# Note: TextReceiveStream can truncate long lines, so we need to buffer
325+
# and speculatively parse until we get a complete JSON object
298326
json_lines = line_str.split("\n")
299327

300328
for json_line in json_lines:
@@ -361,21 +389,16 @@ async def _read_messages_impl(self) -> AsyncIterator[dict[str, Any]]:
361389

362390
# Use exit code for error detection, not string matching
363391
if returncode is not None and returncode != 0:
364-
raise ProcessError(
392+
self._exit_error = ProcessError(
365393
f"Command failed with exit code {returncode}",
366394
exit_code=returncode,
367395
stderr=stderr_output,
368396
)
397+
raise self._exit_error
369398
elif stderr_output:
370399
# Log stderr for debugging but don't fail on non-zero exit
371400
logger.debug(f"Process stderr: {stderr_output}")
372401

373402
def is_ready(self) -> bool:
374403
"""Check if transport is ready for communication."""
375-
return (
376-
self._ready
377-
and self._process is not None
378-
and self._process.returncode is None
379-
)
380-
381-
# Remove interrupt and control request methods - these now belong in Query class
404+
return self._ready

0 commit comments

Comments
 (0)