@@ -507,104 +507,113 @@ def test_concurrent_writes_are_serialized(self):
507507 When parallel subagents invoke MCP tools, they trigger concurrent write()
508508 calls. Without the _write_lock, trio raises BusyResourceError.
509509
510- Uses the exact same stream chain as production:
511- FdStream -> SendStreamWrapper -> TextSendStream
510+ Uses a real subprocess with the same stream setup as production:
511+ process.stdin -> TextSendStream
512512 """
513513
514514 async def _test ():
515- from anyio ._backends ._trio import SendStreamWrapper
515+ import sys
516+ from subprocess import PIPE
517+
516518 from anyio .streams .text import TextSendStream
517- from trio .lowlevel import FdStream
518519
519- transport = SubprocessCLITransport (
520- prompt = "test" ,
521- options = ClaudeAgentOptions (cli_path = "/usr/bin/claude" ),
520+ # Create a real subprocess that consumes stdin (cross-platform)
521+ process = await anyio .open_process (
522+ [sys .executable , "-c" , "import sys; sys.stdin.read()" ],
523+ stdin = PIPE ,
524+ stdout = PIPE ,
525+ stderr = PIPE ,
522526 )
523527
524- # Create a pipe - FdStream is the same type used for process stdin
525- read_fd , write_fd = os .pipe ()
528+ try :
529+ transport = SubprocessCLITransport (
530+ prompt = "test" ,
531+ options = ClaudeAgentOptions (cli_path = "/usr/bin/claude" ),
532+ )
526533
527- # Exact same wrapping as production: FdStream -> SendStreamWrapper -> TextSendStream
528- fd_stream = FdStream (write_fd )
529- transport ._ready = True
530- transport ._process = MagicMock (returncode = None )
531- transport ._stdin_stream = TextSendStream (SendStreamWrapper (fd_stream ))
534+ # Same setup as production: TextSendStream wrapping process.stdin
535+ transport ._ready = True
536+ transport ._process = MagicMock (returncode = None )
537+ transport ._stdin_stream = TextSendStream (process .stdin )
532538
533- # Spawn concurrent writes - the lock should serialize them
534- num_writes = 10
535- errors : list [Exception ] = []
539+ # Spawn concurrent writes - the lock should serialize them
540+ num_writes = 10
541+ errors : list [Exception ] = []
536542
537- async def do_write (i : int ):
538- try :
539- await transport .write (f'{{"msg": { i } }}\n ' )
540- except Exception as e :
541- errors .append (e )
543+ async def do_write (i : int ):
544+ try :
545+ await transport .write (f'{{"msg": { i } }}\n ' )
546+ except Exception as e :
547+ errors .append (e )
542548
543- try :
544549 async with anyio .create_task_group () as tg :
545550 for i in range (num_writes ):
546551 tg .start_soon (do_write , i )
547552
548553 # All writes should succeed - the lock serializes them
549554 assert len (errors ) == 0 , f"Got errors: { errors } "
550555 finally :
551- os . close ( read_fd )
552- await fd_stream . aclose ()
556+ process . terminate ( )
557+ await process . wait ()
553558
554559 anyio .run (_test , backend = "trio" )
555560
556561 def test_concurrent_writes_fail_without_lock (self ):
557562 """Verify that without the lock, concurrent writes cause BusyResourceError.
558563
559- Uses the exact same stream chain as production to prove the lock is necessary .
564+ Uses a real subprocess with the same stream setup as production.
560565 """
561566
562567 async def _test ():
568+ import sys
563569 from contextlib import asynccontextmanager
570+ from subprocess import PIPE
564571
565- from anyio ._backends ._trio import SendStreamWrapper
566572 from anyio .streams .text import TextSendStream
567- from trio .lowlevel import FdStream
568573
569- transport = SubprocessCLITransport (
570- prompt = "test" ,
571- options = ClaudeAgentOptions (cli_path = "/usr/bin/claude" ),
574+ # Create a real subprocess that consumes stdin (cross-platform)
575+ process = await anyio .open_process (
576+ [sys .executable , "-c" , "import sys; sys.stdin.read()" ],
577+ stdin = PIPE ,
578+ stdout = PIPE ,
579+ stderr = PIPE ,
572580 )
573581
574- # Create a pipe - FdStream is the same type used for process stdin
575- read_fd , write_fd = os .pipe ()
582+ try :
583+ transport = SubprocessCLITransport (
584+ prompt = "test" ,
585+ options = ClaudeAgentOptions (cli_path = "/usr/bin/claude" ),
586+ )
576587
577- # Exact same wrapping as production
578- fd_stream = FdStream (write_fd )
579- transport ._ready = True
580- transport ._process = MagicMock (returncode = None )
581- transport ._stdin_stream = TextSendStream (SendStreamWrapper (fd_stream ))
588+ # Same setup as production
589+ transport ._ready = True
590+ transport ._process = MagicMock (returncode = None )
591+ transport ._stdin_stream = TextSendStream (process .stdin )
582592
583- # Replace lock with no-op to trigger the race condition
584- class NoOpLock :
585- @asynccontextmanager
586- async def __call__ (self ):
587- yield
593+ # Replace lock with no-op to trigger the race condition
594+ class NoOpLock :
595+ @asynccontextmanager
596+ async def __call__ (self ):
597+ yield
588598
589- async def __aenter__ (self ):
590- return self
599+ async def __aenter__ (self ):
600+ return self
591601
592- async def __aexit__ (self , * args ):
593- pass
602+ async def __aexit__ (self , * args ):
603+ pass
594604
595- transport ._write_lock = NoOpLock ()
605+ transport ._write_lock = NoOpLock ()
596606
597- # Spawn concurrent writes - should fail without lock
598- num_writes = 10
599- errors : list [Exception ] = []
607+ # Spawn concurrent writes - should fail without lock
608+ num_writes = 10
609+ errors : list [Exception ] = []
600610
601- async def do_write (i : int ):
602- try :
603- await transport .write (f'{{"msg": { i } }}\n ' )
604- except Exception as e :
605- errors .append (e )
611+ async def do_write (i : int ):
612+ try :
613+ await transport .write (f'{{"msg": { i } }}\n ' )
614+ except Exception as e :
615+ errors .append (e )
606616
607- try :
608617 async with anyio .create_task_group () as tg :
609618 for i in range (num_writes ):
610619 tg .start_soon (do_write , i )
@@ -620,7 +629,7 @@ async def do_write(i: int):
620629 f"Expected 'another task' error, got: { error_strs } "
621630 )
622631 finally :
623- os . close ( read_fd )
624- await fd_stream . aclose ()
632+ process . terminate ( )
633+ await process . wait ()
625634
626635 anyio .run (_test , backend = "trio" )
0 commit comments