@@ -724,6 +724,107 @@ async def test_logging_option_defaults() -> None:
724
724
pass
725
725
726
726
727
+ # oss_skip: pytest keeps complaining about mocking get_ipython module
728
+ @pytest .mark .oss_skip
729
+ @pytest .mark .timeout (60 )
730
+ async def test_flush_logs_ipython () -> None :
731
+ """Test that logs are flushed when get_ipython is available and post_run_cell event is triggered."""
732
+ # Save original file descriptors
733
+ original_stdout_fd = os .dup (1 ) # stdout
734
+
735
+ try :
736
+ # Create temporary files to capture output
737
+ with tempfile .NamedTemporaryFile (mode = "w+" , delete = False ) as stdout_file :
738
+ stdout_path = stdout_file .name
739
+
740
+ # Redirect file descriptors to our temp files
741
+ os .dup2 (stdout_file .fileno (), 1 )
742
+
743
+ # Also redirect Python's sys.stdout
744
+ original_sys_stdout = sys .stdout
745
+ sys .stdout = stdout_file
746
+
747
+ try :
748
+ # Mock IPython environment
749
+ class MockExecutionResult :
750
+ pass
751
+
752
+ class MockEvents :
753
+ def __init__ (self ):
754
+ self .callbacks = {}
755
+
756
+ def register (self , event_name , callback ):
757
+ if event_name not in self .callbacks :
758
+ self .callbacks [event_name ] = []
759
+ self .callbacks [event_name ].append (callback )
760
+
761
+ def trigger (self , event_name , * args , ** kwargs ):
762
+ if event_name in self .callbacks :
763
+ for callback in self .callbacks [event_name ]:
764
+ callback (* args , ** kwargs )
765
+
766
+ class MockIPython :
767
+ def __init__ (self ):
768
+ self .events = MockEvents ()
769
+
770
+ mock_ipython = MockIPython ()
771
+
772
+ # Patch get_ipython to return our mock using unittest.mock
773
+ import unittest .mock
774
+
775
+ with unittest .mock .patch (
776
+ "monarch._src.actor.proc_mesh.get_ipython" ,
777
+ lambda : mock_ipython ,
778
+ ):
779
+ pm = await proc_mesh (gpus = 2 )
780
+ am = await pm .spawn ("printer" , Printer )
781
+
782
+ # Set aggregation window to ensure logs are buffered
783
+ await pm .logging_option (
784
+ stream_to_client = True , aggregate_window_sec = 600
785
+ )
786
+ await asyncio .sleep (1 )
787
+
788
+ # Generate some logs that will be aggregated
789
+ for _ in range (5 ):
790
+ await am .print .call ("ipython test log" )
791
+
792
+ # Trigger the post_run_cell event which should flush logs
793
+ mock_ipython .events .trigger ("post_run_cell" , MockExecutionResult ())
794
+
795
+ # Flush all outputs
796
+ stdout_file .flush ()
797
+ os .fsync (stdout_file .fileno ())
798
+
799
+ finally :
800
+ # Restore Python's sys.stdout
801
+ sys .stdout = original_sys_stdout
802
+
803
+ # Restore original file descriptors
804
+ os .dup2 (original_stdout_fd , 1 )
805
+
806
+ # Read the captured output
807
+ with open (stdout_path , "r" ) as f :
808
+ stdout_content = f .read ()
809
+
810
+ # Clean up temp files
811
+ os .unlink (stdout_path )
812
+
813
+ # Verify that logs were flushed when the post_run_cell event was triggered
814
+ # We should see the aggregated logs in the output
815
+ assert re .search (
816
+ r"\[10 similar log lines\].*ipython test log" , stdout_content
817
+ ), stdout_content
818
+
819
+ finally :
820
+ # Ensure file descriptors are restored even if something goes wrong
821
+ try :
822
+ os .dup2 (original_stdout_fd , 1 )
823
+ os .close (original_stdout_fd )
824
+ except OSError :
825
+ pass
826
+
827
+
727
828
# oss_skip: importlib not pulling resource correctly in git CI, needs to be revisited
728
829
@pytest .mark .oss_skip
729
830
async def test_flush_logs_fast_exit () -> None :
0 commit comments