@@ -77,8 +77,8 @@ def capture_callback(output: t.AnyStr, timestamp: datetime.datetime) -> None:
7777 # Check that final carriage return was sent
7878 assert captured_chunks [- 1 ] == "\r " , "Should end with carriage return"
7979
80- def test_fragmentation_with_128_byte_chunks (self , tmp_path : pathlib .Path ) -> None :
81- """Test that current implementation fragments output at 128-byte boundaries ."""
80+ def test_no_fragmentation_without_newlines (self , tmp_path : pathlib .Path ) -> None :
81+ """Test that output without newlines is sent as one chunk after process ends ."""
8282 captured_chunks : list [str ] = []
8383
8484 def capture_callback (output : t .AnyStr , timestamp : datetime .datetime ) -> None :
@@ -87,9 +87,9 @@ def capture_callback(output: t.AnyStr, timestamp: datetime.datetime) -> None:
8787 if output_str != "\r " : # Ignore the final \r
8888 captured_chunks .append (output_str )
8989
90- # Create a script that outputs a long line
90+ # Create a script that outputs a long line without newline
9191 script = tmp_path / "long_line_test.py"
92- long_message = "X" * 300 # 300 characters, should be split into 3 chunks
92+ long_message = "X" * 300 # 300 characters without newline
9393 script .write_text (
9494 f"""
9595import sys
@@ -105,12 +105,9 @@ def capture_callback(output: t.AnyStr, timestamp: datetime.datetime) -> None:
105105 callback = capture_callback ,
106106 )
107107
108- # Verify fragmentation occurs at 128-byte boundaries
109- assert len (captured_chunks ) >= 2 , "Long line should be fragmented"
110-
111- # Check chunk sizes (except possibly the last one)
112- for chunk in captured_chunks [:- 1 ]:
113- assert len (chunk ) == 128 , f"Chunk should be 128 bytes, got { len (chunk )} "
108+ # With line-buffered reading, output without newlines
109+ # should come as a single chunk after the process ends
110+ assert len (captured_chunks ) == 1 , "Output without newline should be one chunk"
114111
115112 # Verify total output is preserved
116113 total_output = "" .join (captured_chunks )
@@ -394,22 +391,22 @@ def capture_callback(output: str, timestamp: datetime.datetime) -> None:
394391 assert "Starting process..." in full_output
395392 assert "Process complete!" in full_output
396393
397- def test_demonstrate_fragmentation (self , tmp_path : pathlib .Path ) -> None :
398- """Demonstrate the fragmentation issue with actual output ."""
394+ def test_line_based_output_chunks (self , tmp_path : pathlib .Path ) -> None :
395+ """Demonstrate that output is now properly line-buffered ."""
399396 captured_chunks : list [str ] = []
400397
401398 def capture_callback (output : str , timestamp : datetime .datetime ) -> None :
402399 """Capture output chunks."""
403400 captured_chunks .append (output )
404401
405402 # Create a script with realistic git clone output
406- script = tmp_path / "demo_fragmentation .py"
403+ script = tmp_path / "line_based_output .py"
407404 script .write_text (
408405 textwrap .dedent (
409406 """
410407 import sys
411408
412- # Simulate git clone output that will be fragmented
409+ # Simulate git clone output with multiple lines
413410 output = (
414411 "Cloning into '/home/user/project'...\\ n"
415412 "remote: Enumerating objects: 11363, done.\\ n"
@@ -430,14 +427,15 @@ def capture_callback(output: str, timestamp: datetime.datetime) -> None:
430427 callback = capture_callback ,
431428 )
432429
433- # Verify fragmentation occurred
434- non_cr_chunks = [c for c in captured_chunks if c != "\\ r" ]
435- # With current 128-byte chunking, this output should be fragmented
436- assert len (non_cr_chunks ) >= 2 , (
437- "Output should be fragmented into multiple chunks"
430+ # With line-buffered reading, we should get one chunk per line
431+ non_cr_chunks = [c for c in captured_chunks if c != "\r " ]
432+ assert len (non_cr_chunks ) == 5 , (
433+ f"Expected 5 chunks (one per line), got { len (non_cr_chunks )} "
438434 )
439435
440- assert len (captured_chunks ) > 0
436+ # Verify each chunk is a complete line
437+ for chunk in non_cr_chunks :
438+ assert chunk .endswith ("\n " ), f"Each chunk should be a complete line: { repr (chunk )} "
441439
442440 def test_no_fragmentation_of_progress_lines (self , tmp_path : pathlib .Path ) -> None :
443441 """Test that progress lines should not be fragmented."""
@@ -941,6 +939,175 @@ def stderr_callback(output: str, timestamp: datetime.datetime) -> None:
941939 assert "STDERR:" not in stdout_result
942940
943941
942+ class TestStreamingFix :
943+ """Test the fix for proper line-buffered streaming."""
944+
945+ def test_line_by_line_streaming (self , tmp_path : pathlib .Path ) -> None :
946+ """Test that output is streamed line by line, not in fixed chunks."""
947+ captured_chunks : list [str ] = []
948+
949+ def capture_callback (output : str , timestamp : datetime .datetime ) -> None :
950+ """Capture output chunks."""
951+ captured_chunks .append (output )
952+
953+ # Create a script that outputs lines of varying lengths
954+ script = tmp_path / "line_streaming_test.py"
955+ script .write_text (
956+ textwrap .dedent (
957+ """
958+ import sys
959+ import time
960+
961+ # Output lines of different lengths to verify line-buffered behavior
962+ lines = [
963+ "Short line\\ n",
964+ "This is a medium length line that is longer than the short one\\ n",
965+ "This is a very long line that should definitely be longer than 128 characters to ensure we're not just getting lucky with the buffering behavior\\ n",
966+ "Another short\\ n",
967+ "Progress: 50% [=================> ]\\ r",
968+ "Progress: 100% [======================================]\\ n",
969+ ]
970+
971+ for line in lines:
972+ sys.stderr.write(line)
973+ sys.stderr.flush()
974+ time.sleep(0.01) # Small delay to ensure separate chunks
975+ """ ,
976+ ),
977+ )
978+
979+ # Run with the fixed implementation
980+ run (
981+ [sys .executable , str (script )],
982+ log_in_real_time = True ,
983+ callback = capture_callback ,
984+ )
985+
986+ # Remove the final \r if present
987+ non_cr_chunks = [c for c in captured_chunks if c != "\r " ]
988+
989+ # With proper line buffering, each line should be its own chunk
990+ # This test will fail with current implementation but pass with the fix
991+ assert len (non_cr_chunks ) >= 6 , (
992+ f"Expected at least 6 chunks (one per line), got { len (non_cr_chunks )} "
993+ )
994+
995+ # Verify that complete lines are preserved
996+ full_output = "" .join (non_cr_chunks )
997+ assert "Short line\n " in full_output
998+ assert "Progress: 100%" in full_output
999+
1000+ # Check that lines weren't fragmented
1001+ # With line buffering, no line should be split across chunks
1002+ for chunk in non_cr_chunks :
1003+ # Each chunk should either end with \n or \r (except possibly the last)
1004+ if chunk and chunk != non_cr_chunks [- 1 ]:
1005+ assert chunk .endswith (("\n " , "\r " )), (
1006+ f"Chunk should end with newline or carriage return: { repr (chunk )} "
1007+ )
1008+
1009+ def test_real_time_line_streaming_with_timing (self , tmp_path : pathlib .Path ) -> None :
1010+ """Test that lines are streamed immediately when flushed, not buffered."""
1011+ capture_events : list [tuple [str , float ]] = []
1012+ start_time = time .time ()
1013+
1014+ def timing_callback (output : str , timestamp : datetime .datetime ) -> None :
1015+ """Capture output with relative timing."""
1016+ capture_events .append ((output , time .time () - start_time ))
1017+
1018+ script = tmp_path / "realtime_line_test.py"
1019+ script .write_text (
1020+ textwrap .dedent (
1021+ """
1022+ import sys
1023+ import time
1024+
1025+ # Output lines with delays to verify real-time streaming
1026+ sys.stderr.write("Line 1: Starting\\ n")
1027+ sys.stderr.flush()
1028+ time.sleep(0.1)
1029+
1030+ sys.stderr.write("Line 2: After 100ms\\ n")
1031+ sys.stderr.flush()
1032+ time.sleep(0.1)
1033+
1034+ sys.stderr.write("Line 3: After 200ms\\ n")
1035+ sys.stderr.flush()
1036+ """ ,
1037+ ),
1038+ )
1039+
1040+ run (
1041+ [sys .executable , str (script )],
1042+ log_in_real_time = True ,
1043+ callback = timing_callback ,
1044+ )
1045+
1046+ # Remove final \r
1047+ events = [(o , t ) for o , t in capture_events if o != "\r " ]
1048+
1049+ # With proper streaming, we should get 3 separate events
1050+ assert len (events ) >= 3 , f"Expected at least 3 events, got { len (events )} "
1051+
1052+ # Check timing - with line buffering, lines should come separately
1053+ if len (events ) >= 3 :
1054+ # Verify we got separate events for each line
1055+ assert "Line 1: Starting\n " == events [0 ][0 ]
1056+ assert "Line 2: After 100ms\n " == events [1 ][0 ]
1057+ assert "Line 3: After 200ms\n " == events [2 ][0 ]
1058+
1059+ # The timing might vary due to process startup and buffering,
1060+ # but we should see that lines come as separate events
1061+ # rather than all at once
1062+
1063+ def test_no_fragmentation_with_line_buffering (self , tmp_path : pathlib .Path ) -> None :
1064+ """Test that line buffering prevents fragmentation of lines."""
1065+ captured_chunks : list [str ] = []
1066+
1067+ def capture_callback (output : str , timestamp : datetime .datetime ) -> None :
1068+ """Capture output chunks."""
1069+ if output != "\r " :
1070+ captured_chunks .append (output )
1071+
1072+ # Create a script with lines longer than 128 bytes
1073+ script = tmp_path / "no_fragment_line_test.py"
1074+ long_line = "A" * 200 # 200 chars, definitely > 128 bytes
1075+ script .write_text (
1076+ f"""
1077+ import sys
1078+
1079+ # Output a long line that would be fragmented with fixed-size buffering
1080+ sys.stderr.write("Start: { long_line } :End\\ n")
1081+ sys.stderr.flush()
1082+
1083+ # Output another long line
1084+ sys.stderr.write("Line2: { 'B' * 150 } :Done\\ n")
1085+ sys.stderr.flush()
1086+ """ ,
1087+ )
1088+
1089+ run (
1090+ [sys .executable , str (script )],
1091+ log_in_real_time = True ,
1092+ callback = capture_callback ,
1093+ )
1094+
1095+ # With line buffering, long lines should not be fragmented
1096+ # Each line should be in a single chunk
1097+ assert len (captured_chunks ) == 2 , (
1098+ f"Expected 2 chunks (one per line), got { len (captured_chunks )} "
1099+ )
1100+
1101+ # Verify complete lines
1102+ assert captured_chunks [0 ].startswith ("Start: " )
1103+ assert captured_chunks [0 ].endswith (" :End\n " )
1104+ assert "A" * 200 in captured_chunks [0 ]
1105+
1106+ assert captured_chunks [1 ].startswith ("Line2: " )
1107+ assert captured_chunks [1 ].endswith (" :Done\n " )
1108+ assert "B" * 150 in captured_chunks [1 ]
1109+
1110+
9441111class TestRealWorldScenarios :
9451112 """Test real-world subprocess output scenarios."""
9461113
0 commit comments