-
Notifications
You must be signed in to change notification settings - Fork 533
fix: add write lock to prevent concurrent transport writes #391
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| else _DEFAULT_MAX_BUFFER_SIZE | ||
| ) | ||
| self._temp_files: list[str] = [] # Track temporary files for cleanup | ||
| self._write_lock: anyio.Lock = anyio.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1. Write Lock Initialization
The lock serializes concurrent writes to stdin. When parallel subagents invoke MCP tools, multiple handlers try to write responses at the same time. Trio's TextSendStream.send() isn't safe for concurrent use and raises BusyResourceError.
🤖 Generated with Claude Code
| async def write(self, data: str) -> None: | ||
| """Write raw data to the transport.""" | ||
| # Check if ready (like TypeScript) | ||
| if not self._ready or not self._stdin_stream: | ||
| raise CLIConnectionError("ProcessTransport is not ready for writing") | ||
|
|
||
| # Check if process is still alive (like TypeScript) | ||
| if self._process and self._process.returncode is not None: | ||
| raise CLIConnectionError( | ||
| f"Cannot write to terminated process (exit code: {self._process.returncode})" | ||
| ) | ||
| async with self._write_lock: | ||
| # All checks inside lock to prevent TOCTOU races with close()/end_input() | ||
| if not self._ready or not self._stdin_stream: | ||
| raise CLIConnectionError("ProcessTransport is not ready for writing") | ||
|
|
||
| if self._process and self._process.returncode is not None: | ||
| raise CLIConnectionError( | ||
| f"Cannot write to terminated process (exit code: {self._process.returncode})" | ||
| ) | ||
|
|
||
| # Check for exit errors (like TypeScript) | ||
| if self._exit_error: | ||
| raise CLIConnectionError( | ||
| f"Cannot write to process that exited with error: {self._exit_error}" | ||
| ) from self._exit_error | ||
| if self._exit_error: | ||
| raise CLIConnectionError( | ||
| f"Cannot write to process that exited with error: {self._exit_error}" | ||
| ) from self._exit_error | ||
|
|
||
| try: | ||
| await self._stdin_stream.send(data) | ||
| except Exception as e: | ||
| self._ready = False # Mark as not ready (like TypeScript) | ||
| self._exit_error = CLIConnectionError( | ||
| f"Failed to write to process stdin: {e}" | ||
| ) | ||
| raise self._exit_error from e | ||
| try: | ||
| await self._stdin_stream.send(data) | ||
| except Exception as e: | ||
| self._ready = False | ||
| self._exit_error = CLIConnectionError( | ||
| f"Failed to write to process stdin: {e}" | ||
| ) | ||
| raise self._exit_error from e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2. Write Method with Lock
The entire write() method now runs inside the lock. This prevents two issues:
- Concurrent sends: Multiple coroutines can't call
send()at the same time - TOCTOU race: The
_readycheck andsend()are now atomic. Previously,close()could set_ready=Falseand close the stream between checking_readyand callingsend().
Related: See comment 1 for lock initialization.
🤖 Generated with Claude Code
| assert network["httpProxyPort"] == 8080 | ||
| assert network["socksProxyPort"] == 8081 | ||
|
|
||
| def test_concurrent_writes_are_serialized(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3. Concurrent Write Tests
Two tests verify the lock works correctly:
- test_concurrent_writes_are_serialized: Spawns 10 concurrent writes with the lock enabled. All should succeed.
- test_concurrent_writes_fail_without_lock: Replaces the lock with a no-op. This proves the race condition exists without the fix - you get "another task is already" errors.
Both tests use a real subprocess with TextSendStream to match production behavior. They run on the Trio backend where the bug surfaces.
🤖 Generated with Claude Code
When parallel subagents invoke MCP tools, the CLI sends multiple concurrent control_request messages. Without synchronization, handlers race to write responses back, causing trio.BusyResourceError. This adds an anyio.Lock to serialize writes to stdin, and moves the _ready flag inside the lock to prevent TOCTOU races with close(). :house: Remote-Dev: homespace
Replace trio.lowlevel.FdStream (Unix-only) with anyio.open_process() which works on both Unix and Windows. The tests now use a real subprocess with the same stream setup as production code. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> :house: Remote-Dev: homespace
a5c0fbb to
42bae65
Compare
Include the three bug fixes that were merged but not documented: - #388: Faster CLI error propagation - #385: Pydantic 2.12+ compatibility - #391: Concurrent subagent write lock 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]>
TL;DR
Adds a write lock to
SubprocessCLITransportto prevent concurrent writes from parallel subagents.Overview
When multiple subagents run in parallel and invoke MCP tools, the CLI sends concurrent
control_requestmessages. Each handler tries to write a response back to the subprocess stdin at the same time. Trio'sTextSendStreamisn't thread-safe for concurrent access, so this causesBusyResourceError.This PR adds an
anyio.Lockaround all write operations (write(),end_input(), and the stdin-closing part ofclose()). The lock serializes concurrent writes so they happen one at a time. The_readyflag is now set inside the lock duringclose()to prevent a TOCTOU race wherewrite()checks_ready, thenclose()sets it and closes the stream beforewrite()actually sends data.Call Flow
flowchart TD A["write()<br/>subprocess_cli.py:505"] --> B["acquire _write_lock<br/>subprocess_cli.py:507"] B --> C["check _ready & stream<br/>subprocess_cli.py:509"] C --> D["_stdin_stream.send()<br/>subprocess_cli.py:523"] E["close()<br/>subprocess_cli.py:458"] --> F["acquire _write_lock<br/>subprocess_cli.py:478"] F --> G["set _ready = False<br/>subprocess_cli.py:479"] G --> H["close _stdin_stream<br/>subprocess_cli.py:481"] I["end_input()<br/>subprocess_cli.py:531"] --> J["acquire _write_lock<br/>subprocess_cli.py:533"] J --> K["close _stdin_stream<br/>subprocess_cli.py:535"]