@@ -654,104 +654,113 @@ def test_concurrent_writes_are_serialized(self):
654654 When parallel subagents invoke MCP tools, they trigger concurrent write()
655655 calls. Without the _write_lock, trio raises BusyResourceError.
656656
657- Uses the exact same stream chain as production:
658- FdStream -> SendStreamWrapper -> TextSendStream
657+ Uses a real subprocess with the same stream setup as production:
658+ process.stdin -> TextSendStream
659659 """
660660
661661 async def _test ():
662- from anyio ._backends ._trio import SendStreamWrapper
662+ import sys
663+ from subprocess import PIPE
664+
663665 from anyio .streams .text import TextSendStream
664- from trio .lowlevel import FdStream
665666
666- transport = SubprocessCLITransport (
667- prompt = "test" ,
668- options = ClaudeAgentOptions (cli_path = "/usr/bin/claude" ),
667+ # Create a real subprocess that consumes stdin (cross-platform)
668+ process = await anyio .open_process (
669+ [sys .executable , "-c" , "import sys; sys.stdin.read()" ],
670+ stdin = PIPE ,
671+ stdout = PIPE ,
672+ stderr = PIPE ,
669673 )
670674
671- # Create a pipe - FdStream is the same type used for process stdin
672- read_fd , write_fd = os .pipe ()
675+ try :
676+ transport = SubprocessCLITransport (
677+ prompt = "test" ,
678+ options = ClaudeAgentOptions (cli_path = "/usr/bin/claude" ),
679+ )
673680
674- # Exact same wrapping as production: FdStream -> SendStreamWrapper -> TextSendStream
675- fd_stream = FdStream (write_fd )
676- transport ._ready = True
677- transport ._process = MagicMock (returncode = None )
678- transport ._stdin_stream = TextSendStream (SendStreamWrapper (fd_stream ))
681+ # Same setup as production: TextSendStream wrapping process.stdin
682+ transport ._ready = True
683+ transport ._process = MagicMock (returncode = None )
684+ transport ._stdin_stream = TextSendStream (process .stdin )
679685
680- # Spawn concurrent writes - the lock should serialize them
681- num_writes = 10
682- errors : list [Exception ] = []
686+ # Spawn concurrent writes - the lock should serialize them
687+ num_writes = 10
688+ errors : list [Exception ] = []
683689
684- async def do_write (i : int ):
685- try :
686- await transport .write (f'{{"msg": { i } }}\n ' )
687- except Exception as e :
688- errors .append (e )
690+ async def do_write (i : int ):
691+ try :
692+ await transport .write (f'{{"msg": { i } }}\n ' )
693+ except Exception as e :
694+ errors .append (e )
689695
690- try :
691696 async with anyio .create_task_group () as tg :
692697 for i in range (num_writes ):
693698 tg .start_soon (do_write , i )
694699
695700 # All writes should succeed - the lock serializes them
696701 assert len (errors ) == 0 , f"Got errors: { errors } "
697702 finally :
698- os . close ( read_fd )
699- await fd_stream . aclose ()
703+ process . terminate ( )
704+ await process . wait ()
700705
701706 anyio .run (_test , backend = "trio" )
702707
703708 def test_concurrent_writes_fail_without_lock (self ):
704709 """Verify that without the lock, concurrent writes cause BusyResourceError.
705710
706- Uses the exact same stream chain as production to prove the lock is necessary .
711+ Uses a real subprocess with the same stream setup as production.
707712 """
708713
709714 async def _test ():
715+ import sys
710716 from contextlib import asynccontextmanager
717+ from subprocess import PIPE
711718
712- from anyio ._backends ._trio import SendStreamWrapper
713719 from anyio .streams .text import TextSendStream
714- from trio .lowlevel import FdStream
715720
716- transport = SubprocessCLITransport (
717- prompt = "test" ,
718- options = ClaudeAgentOptions (cli_path = "/usr/bin/claude" ),
721+ # Create a real subprocess that consumes stdin (cross-platform)
722+ process = await anyio .open_process (
723+ [sys .executable , "-c" , "import sys; sys.stdin.read()" ],
724+ stdin = PIPE ,
725+ stdout = PIPE ,
726+ stderr = PIPE ,
719727 )
720728
721- # Create a pipe - FdStream is the same type used for process stdin
722- read_fd , write_fd = os .pipe ()
729+ try :
730+ transport = SubprocessCLITransport (
731+ prompt = "test" ,
732+ options = ClaudeAgentOptions (cli_path = "/usr/bin/claude" ),
733+ )
723734
724- # Exact same wrapping as production
725- fd_stream = FdStream (write_fd )
726- transport ._ready = True
727- transport ._process = MagicMock (returncode = None )
728- transport ._stdin_stream = TextSendStream (SendStreamWrapper (fd_stream ))
735+ # Same setup as production
736+ transport ._ready = True
737+ transport ._process = MagicMock (returncode = None )
738+ transport ._stdin_stream = TextSendStream (process .stdin )
729739
730- # Replace lock with no-op to trigger the race condition
731- class NoOpLock :
732- @asynccontextmanager
733- async def __call__ (self ):
734- yield
740+ # Replace lock with no-op to trigger the race condition
741+ class NoOpLock :
742+ @asynccontextmanager
743+ async def __call__ (self ):
744+ yield
735745
736- async def __aenter__ (self ):
737- return self
746+ async def __aenter__ (self ):
747+ return self
738748
739- async def __aexit__ (self , * args ):
740- pass
749+ async def __aexit__ (self , * args ):
750+ pass
741751
742- transport ._write_lock = NoOpLock ()
752+ transport ._write_lock = NoOpLock ()
743753
744- # Spawn concurrent writes - should fail without lock
745- num_writes = 10
746- errors : list [Exception ] = []
754+ # Spawn concurrent writes - should fail without lock
755+ num_writes = 10
756+ errors : list [Exception ] = []
747757
748- async def do_write (i : int ):
749- try :
750- await transport .write (f'{{"msg": { i } }}\n ' )
751- except Exception as e :
752- errors .append (e )
758+ async def do_write (i : int ):
759+ try :
760+ await transport .write (f'{{"msg": { i } }}\n ' )
761+ except Exception as e :
762+ errors .append (e )
753763
754- try :
755764 async with anyio .create_task_group () as tg :
756765 for i in range (num_writes ):
757766 tg .start_soon (do_write , i )
@@ -767,7 +776,7 @@ async def do_write(i: int):
767776 f"Expected 'another task' error, got: { error_strs } "
768777 )
769778 finally :
770- os . close ( read_fd )
771- await fd_stream . aclose ()
779+ process . terminate ( )
780+ await process . wait ()
772781
773782 anyio .run (_test , backend = "trio" )
0 commit comments