@@ -479,7 +479,7 @@ class DagsterTestContext:
479479
480480 project_path : str
481481
482- def _run_command (self , cmd : list [str ]) -> None :
482+ def _run_command (self , cmd : list [str ]) -> None :
483483 """Execute a command and stream its output in real-time.
484484
485485 Args:
@@ -488,32 +488,82 @@ def _run_command (self, cmd: list[str]) -> None:
488488 Raises:
489489 subprocess.CalledProcessError: If the command returns non-zero exit code
490490 """
491+ import queue
492+ import threading
493+ import typing as t
494+
495+ def stream_output (
496+ pipe : t .IO [str ], output_queue : queue .Queue [str | None ]
497+ ) -> None :
498+ """Stream output from a pipe to a queue."""
499+ try :
500+ while True :
501+ char = pipe .read (1 )
502+ if not char :
503+ break
504+ output_queue .put (char )
505+ finally :
506+ output_queue .put (None ) # Signal EOF
507+
491508 print (f"Running command: { ' ' .join (cmd )} " )
492509 process = subprocess .Popen (
493510 cmd ,
494511 stdout = subprocess .PIPE ,
495512 stderr = subprocess .PIPE ,
496513 text = True ,
497- bufsize = 1 ,
498514 universal_newlines = True ,
499515 )
500516
501- # Stream output in real-time
502- while True :
503- stdout_line = process .stdout .readline () if process .stdout else ""
504- stderr_line = process .stderr .readline () if process .stderr else ""
517+ if not process .stdout or not process .stderr :
518+ raise RuntimeError ("Failed to open subprocess pipes" )
505519
506- if stdout_line :
507- print (stdout_line .rstrip ())
508- if stderr_line :
509- print (stderr_line .rstrip ())
520+ # Create queues for stdout and stderr
521+ stdout_queue : queue .Queue [str | None ] = queue .Queue ()
522+ stderr_queue : queue .Queue [str | None ] = queue .Queue ()
510523
511- process_finished = not stdout_line and not stderr_line and process .poll () is not None
512- if process_finished :
513- break
524+ # Start threads to read from pipes
525+ stdout_thread = threading .Thread (
526+ target = stream_output , args = (process .stdout , stdout_queue )
527+ )
528+ stderr_thread = threading .Thread (
529+ target = stream_output , args = (process .stderr , stderr_queue )
530+ )
514531
515- process_failed = process .returncode != 0
516- if process_failed :
532+ stdout_thread .daemon = True
533+ stderr_thread .daemon = True
534+ stdout_thread .start ()
535+ stderr_thread .start ()
536+
537+ # Read from queues and print output
538+ stdout_done = False
539+ stderr_done = False
540+
541+ while not (stdout_done and stderr_done ):
542+ # Handle stdout
543+ try :
544+ char = stdout_queue .get_nowait ()
545+ if char is None :
546+ stdout_done = True
547+ else :
548+ print (char , end = "" , flush = True )
549+ except queue .Empty :
550+ pass
551+
552+ # Handle stderr
553+ try :
554+ char = stderr_queue .get_nowait ()
555+ if char is None :
556+ stderr_done = True
557+ else :
558+ print (char , end = "" , flush = True )
559+ except queue .Empty :
560+ pass
561+
562+ stdout_thread .join ()
563+ stderr_thread .join ()
564+ process .wait ()
565+
566+ if process .returncode != 0 :
517567 raise subprocess .CalledProcessError (process .returncode , cmd )
518568
519569 def asset_materialisation (
0 commit comments