@@ -724,6 +724,104 @@ async def test_logging_option_defaults() -> None:
724
724
pass
725
725
726
726
727
+ @pytest .mark .timeout (60 )
728
+ async def test_flush_logs_ipython () -> None :
729
+ """Test that logs are flushed when get_ipython is available and post_run_cell event is triggered."""
730
+ # Save original file descriptors
731
+ original_stdout_fd = os .dup (1 ) # stdout
732
+
733
+ try :
734
+ # Create temporary files to capture output
735
+ with tempfile .NamedTemporaryFile (mode = "w+" , delete = False ) as stdout_file :
736
+ stdout_path = stdout_file .name
737
+
738
+ # Redirect file descriptors to our temp files
739
+ os .dup2 (stdout_file .fileno (), 1 )
740
+
741
+ # Also redirect Python's sys.stdout
742
+ original_sys_stdout = sys .stdout
743
+ sys .stdout = stdout_file
744
+
745
+ try :
746
+ # Mock IPython environment
747
+ class MockExecutionResult :
748
+ pass
749
+
750
+ class MockEvents :
751
+ def __init__ (self ):
752
+ self .callbacks = {}
753
+
754
+ def register (self , event_name , callback ):
755
+ if event_name not in self .callbacks :
756
+ self .callbacks [event_name ] = []
757
+ self .callbacks [event_name ].append (callback )
758
+
759
+ def trigger (self , event_name , * args , ** kwargs ):
760
+ if event_name in self .callbacks :
761
+ for callback in self .callbacks [event_name ]:
762
+ callback (* args , ** kwargs )
763
+
764
+ class MockIPython :
765
+ def __init__ (self ):
766
+ self .events = MockEvents ()
767
+
768
+ mock_ipython = MockIPython ()
769
+
770
+ # Patch get_ipython to return our mock using unittest.mock
771
+ import unittest .mock
772
+
773
+ with unittest .mock .patch (
774
+ "monarch._src.actor.proc_mesh.get_ipython" , lambda : mock_ipython
775
+ ):
776
+ pm = await proc_mesh (gpus = 2 )
777
+ am = await pm .spawn ("printer" , Printer )
778
+
779
+ # Set aggregation window to ensure logs are buffered
780
+ await pm .logging_option (
781
+ stream_to_client = True , aggregate_window_sec = 600
782
+ )
783
+ await asyncio .sleep (1 )
784
+
785
+ # Generate some logs that will be aggregated
786
+ for _ in range (5 ):
787
+ await am .print .call ("ipython test log" )
788
+
789
+ # Trigger the post_run_cell event which should flush logs
790
+ mock_ipython .events .trigger ("post_run_cell" , MockExecutionResult ())
791
+
792
+ # Flush all outputs
793
+ stdout_file .flush ()
794
+ os .fsync (stdout_file .fileno ())
795
+
796
+ finally :
797
+ # Restore Python's sys.stdout
798
+ sys .stdout = original_sys_stdout
799
+
800
+ # Restore original file descriptors
801
+ os .dup2 (original_stdout_fd , 1 )
802
+
803
+ # Read the captured output
804
+ with open (stdout_path , "r" ) as f :
805
+ stdout_content = f .read ()
806
+
807
+ # Clean up temp files
808
+ os .unlink (stdout_path )
809
+
810
+ # Verify that logs were flushed when the post_run_cell event was triggered
811
+ # We should see the aggregated logs in the output
812
+ assert re .search (
813
+ r"\[10 similar log lines\].*ipython test log" , stdout_content
814
+ ), stdout_content
815
+
816
+ finally :
817
+ # Ensure file descriptors are restored even if something goes wrong
818
+ try :
819
+ os .dup2 (original_stdout_fd , 1 )
820
+ os .close (original_stdout_fd )
821
+ except OSError :
822
+ pass
823
+
824
+
727
825
# oss_skip: importlib not pulling resource correctly in git CI, needs to be revisited
728
826
@pytest .mark .oss_skip
729
827
async def test_flush_logs_fast_exit () -> None :
0 commit comments