-
Notifications
You must be signed in to change notification settings - Fork 521
fix: add write lock to prevent concurrent transport writes #370
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: add write lock to prevent concurrent transport writes #370
Conversation
| else _DEFAULT_MAX_BUFFER_SIZE | ||
| ) | ||
| self._temp_files: list[str] = [] # Track temporary files for cleanup | ||
| self._write_lock: anyio.Lock = anyio.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1. Write Lock Declaration [Core logic]
Initializes the anyio.Lock that serializes all transport writes. When parallel subagents handle concurrent control_request messages, each calls transport.write() to send responses. trio's underlying FdStream doesn't allow concurrent sends—this lock prevents the BusyResourceError crash.
Related: See comment 2 for where the lock is acquired.
🤖 Generated with Claude Code
| async with self._write_lock: | ||
| # All checks inside lock to prevent TOCTOU races with close()/end_input() | ||
| if not self._ready or not self._stdin_stream: | ||
| raise CLIConnectionError("ProcessTransport is not ready for writing") | ||
|
|
||
| if self._process and self._process.returncode is not None: | ||
| raise CLIConnectionError( | ||
| f"Cannot write to terminated process (exit code: {self._process.returncode})" | ||
| ) | ||
|
|
||
| # Check for exit errors (like TypeScript) | ||
| if self._exit_error: | ||
| raise CLIConnectionError( | ||
| f"Cannot write to process that exited with error: {self._exit_error}" | ||
| ) from self._exit_error | ||
| if self._exit_error: | ||
| raise CLIConnectionError( | ||
| f"Cannot write to process that exited with error: {self._exit_error}" | ||
| ) from self._exit_error | ||
|
|
||
| try: | ||
| await self._stdin_stream.send(data) | ||
| except Exception as e: | ||
| self._ready = False # Mark as not ready (like TypeScript) | ||
| self._exit_error = CLIConnectionError( | ||
| f"Failed to write to process stdin: {e}" | ||
| ) | ||
| raise self._exit_error from e | ||
| try: | ||
| await self._stdin_stream.send(data) | ||
| except Exception as e: | ||
| self._ready = False | ||
| self._exit_error = CLIConnectionError( | ||
| f"Failed to write to process stdin: {e}" | ||
| ) | ||
| raise self._exit_error from e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2. write() Serialization [Core logic]
The core fix: all write logic runs inside async with self._write_lock. This includes:
- State checks (
_ready,_stdin_stream,returncode,_exit_error) - The actual
stdin_stream.send()call
Moving checks inside the lock prevents TOCTOU races—without this, a write could pass all checks, then close() could clear _stdin_stream, and the write would fail on a None stream.
🤖 Generated with Claude Code
| # Close stdin stream (acquire lock to prevent race with concurrent writes) | ||
| async with self._write_lock: | ||
| self._ready = False # Set inside lock to prevent TOCTOU with write() | ||
| if self._stdin_stream: | ||
| with suppress(Exception): | ||
| await self._stdin_stream.aclose() | ||
| self._stdin_stream = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3. close() TOCTOU Prevention [Core logic]
Acquires _write_lock before closing stdin and setting _ready = False. This coordinates with write():
- If
write()holds the lock,close()waits until the write finishes - Once
close()acquires the lock, it sets_ready = Falseinside the critical section - Any subsequent
write()will see_ready = Falsewhen it acquires the lock
Without this coordination, close() could clear the stream while write() was mid-send.
🤖 Generated with Claude Code
| def test_concurrent_writes_are_serialized(self): | ||
| """Test that concurrent write() calls are serialized by the lock. | ||
| When parallel subagents invoke MCP tools, they trigger concurrent write() | ||
| calls. Without the _write_lock, trio raises BusyResourceError. | ||
| Uses a real subprocess with the same stream setup as production: | ||
| process.stdin -> TextSendStream | ||
| """ | ||
|
|
||
| async def _test(): | ||
| import sys | ||
| from subprocess import PIPE | ||
|
|
||
| from anyio.streams.text import TextSendStream | ||
|
|
||
| # Create a real subprocess that consumes stdin (cross-platform) | ||
| process = await anyio.open_process( | ||
| [sys.executable, "-c", "import sys; sys.stdin.read()"], | ||
| stdin=PIPE, | ||
| stdout=PIPE, | ||
| stderr=PIPE, | ||
| ) | ||
|
|
||
| try: | ||
| transport = SubprocessCLITransport( | ||
| prompt="test", | ||
| options=ClaudeAgentOptions(cli_path="/usr/bin/claude"), | ||
| ) | ||
|
|
||
| # Same setup as production: TextSendStream wrapping process.stdin | ||
| transport._ready = True | ||
| transport._process = MagicMock(returncode=None) | ||
| transport._stdin_stream = TextSendStream(process.stdin) | ||
|
|
||
| # Spawn concurrent writes - the lock should serialize them | ||
| num_writes = 10 | ||
| errors: list[Exception] = [] | ||
|
|
||
| async def do_write(i: int): | ||
| try: | ||
| await transport.write(f'{{"msg": {i}}}\n') | ||
| except Exception as e: | ||
| errors.append(e) | ||
|
|
||
| async with anyio.create_task_group() as tg: | ||
| for i in range(num_writes): | ||
| tg.start_soon(do_write, i) | ||
|
|
||
| # All writes should succeed - the lock serializes them | ||
| assert len(errors) == 0, f"Got errors: {errors}" | ||
| finally: | ||
| process.terminate() | ||
| await process.wait() | ||
|
|
||
| anyio.run(_test, backend="trio") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
4. Concurrency Tests [Test]
Two complementary tests verify the fix:
Positive test (this method): Spawns 10 concurrent write() calls against a real subprocess with TextSendStream—the same stream type used in production. All writes should succeed because the lock serializes them.
Negative test (test_concurrent_writes_fail_without_lock): Replaces the lock with a no-op, proving the race condition exists. Without the lock, trio raises BusyResourceError: another task is using this stream for send.
Both tests use a real process rather than mocks to ensure the concurrency behavior matches production.
🤖 Generated with Claude Code
When parallel subagents invoke MCP tools, the CLI sends multiple concurrent control_request messages. Without synchronization, handlers race to write responses back, causing trio.BusyResourceError. This adds an anyio.Lock to serialize writes to stdin, and moves the _ready flag inside the lock to prevent TOCTOU races with close(). :house: Remote-Dev: homespace
Replace trio.lowlevel.FdStream (Unix-only) with anyio.open_process() which works on both Unix and Windows. The tests now use a real subprocess with the same stream setup as production code. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <[email protected]> :house: Remote-Dev: homespace
a41991f to
a3edee4
Compare
TL;DR
Adds
anyio.LocktoSubprocessCLITransport.write()to fixBusyResourceErrorwhen parallel subagents invoke MCP tools concurrently.Overview
When multiple subagents run in parallel, they each receive
control_requestmessages from the CLI (e.g., for MCP tool calls). These requests are handled concurrently via_tg.start_soon()inquery.py:190, and each handler eventually callstransport.write()to send the response.The problem: trio's
FdStream(the underlying stdin writer) does not allow concurrent sends. Without synchronization, this causestrio.BusyResourceError: another task is using this stream for send.This fix adds a
_write_lockto serialize all writes to the transport. The lock also protectsclose()andend_input()to prevent TOCTOU races where a write could start just as the stream is being closed.Call Flow
flowchart TD A["CLI sends control_request<br/>query.py:185"] --> B["start_soon(_handle_control_request)<br/>query.py:190"] B --> C["Handler 1<br/>query.py:213"] B --> D["Handler 2<br/>query.py:213"] B --> E["Handler N...<br/>query.py:213"] C --> F["transport.write()<br/>subprocess_cli.py:449"] D --> F E --> F F --> G["async with _write_lock<br/>subprocess_cli.py:451"] G --> H["stdin_stream.send()<br/>subprocess_cli.py:467"]Without the lock at step G, concurrent calls to H would race and crash.