Skip to content

Commit 9de8f25

Browse files
James Sunfacebook-github-bot
authored andcommitted
flush log upon ipython notebook cell exit (#816)
Summary: Pull Request resolved: #816 In ipython notebook, a cell can end fast. Yet the process can still run in the background. However, the background process will not flush logs to the existing cell anymore. The patch registers the flush function upon a cell exiting. Differential Revision: D79982702
1 parent ee336e7 commit 9de8f25

File tree

2 files changed

+127
-1
lines changed

2 files changed

+127
-1
lines changed

python/monarch/_src/actor/proc_mesh.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,16 @@
6868
from monarch._src.actor.future import DeprecatedNotAFuture, Future
6969
from monarch._src.actor.shape import MeshTrait
7070

71+
HAS_IPYTHON = False
72+
try:
73+
# Check if we are in ipython environment
74+
# pyre-ignore[21]
75+
from IPython import get_ipython
76+
77+
HAS_IPYTHON = True
78+
except ImportError:
79+
pass
80+
7181
HAS_TENSOR_ENGINE = False
7282
try:
7383
# Torch is needed for tensor engine
@@ -167,14 +177,29 @@ async def _init_manager_actors_coro(
167177
# WARNING: it is unsafe to await self._proc_mesh here
168178
# because self._proc_mesh is the result of this function itself!
169179

170-
proc_mesh = await proc_mesh_
180+
proc_mesh: HyProcMesh = await proc_mesh_
171181

172182
self._logging_mesh_client = await LoggingMeshClient.spawn(proc_mesh=proc_mesh)
173183
self._logging_mesh_client.set_mode(
174184
stream_to_client=True,
175185
aggregate_window_sec=3,
176186
level=logging.INFO,
177187
)
188+
if HAS_IPYTHON and get_ipython() is not None:
189+
# For ipython environment, a cell can end fast with threads running in background.
190+
# Flush all the ongoing logs proactively to avoid missing logs.
191+
assert self._logging_mesh_client is not None
192+
logging_client: LoggingMeshClient = self._logging_mesh_client
193+
ipython = get_ipython()
194+
195+
# pyre-ignore[21]
196+
from IPython.core.interactiveshell import ExecutionResult
197+
198+
# pyre-ignore[11]
199+
def flush_logs(_: ExecutionResult) -> None:
200+
return Future(coro=logging_client.flush(proc_mesh).spawn().task()).get()
201+
202+
ipython.events.register("post_run_cell", flush_logs)
178203

179204
_rdma_manager = (
180205
# type: ignore[16]

python/tests/test_python_actors.py

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -724,6 +724,107 @@ async def test_logging_option_defaults() -> None:
724724
pass
725725

726726

727+
# oss_skip: pytest keeps complaining about mocking get_ipython module
728+
@pytest.mark.oss_skip
729+
@pytest.mark.timeout(60)
730+
async def test_flush_logs_ipython() -> None:
731+
"""Test that logs are flushed when get_ipython is available and post_run_cell event is triggered."""
732+
# Save original file descriptors
733+
original_stdout_fd = os.dup(1) # stdout
734+
735+
try:
736+
# Create temporary files to capture output
737+
with tempfile.NamedTemporaryFile(mode="w+", delete=False) as stdout_file:
738+
stdout_path = stdout_file.name
739+
740+
# Redirect file descriptors to our temp files
741+
os.dup2(stdout_file.fileno(), 1)
742+
743+
# Also redirect Python's sys.stdout
744+
original_sys_stdout = sys.stdout
745+
sys.stdout = stdout_file
746+
747+
try:
748+
# Mock IPython environment
749+
class MockExecutionResult:
750+
pass
751+
752+
class MockEvents:
753+
def __init__(self):
754+
self.callbacks = {}
755+
756+
def register(self, event_name, callback):
757+
if event_name not in self.callbacks:
758+
self.callbacks[event_name] = []
759+
self.callbacks[event_name].append(callback)
760+
761+
def trigger(self, event_name, *args, **kwargs):
762+
if event_name in self.callbacks:
763+
for callback in self.callbacks[event_name]:
764+
callback(*args, **kwargs)
765+
766+
class MockIPython:
767+
def __init__(self):
768+
self.events = MockEvents()
769+
770+
mock_ipython = MockIPython()
771+
772+
# Patch get_ipython to return our mock using unittest.mock
773+
import unittest.mock
774+
775+
with unittest.mock.patch(
776+
"monarch._src.actor.proc_mesh.get_ipython",
777+
lambda: mock_ipython,
778+
):
779+
pm = await proc_mesh(gpus=2)
780+
am = await pm.spawn("printer", Printer)
781+
782+
# Set aggregation window to ensure logs are buffered
783+
await pm.logging_option(
784+
stream_to_client=True, aggregate_window_sec=600
785+
)
786+
await asyncio.sleep(1)
787+
788+
# Generate some logs that will be aggregated
789+
for _ in range(5):
790+
await am.print.call("ipython test log")
791+
792+
# Trigger the post_run_cell event which should flush logs
793+
mock_ipython.events.trigger("post_run_cell", MockExecutionResult())
794+
795+
# Flush all outputs
796+
stdout_file.flush()
797+
os.fsync(stdout_file.fileno())
798+
799+
finally:
800+
# Restore Python's sys.stdout
801+
sys.stdout = original_sys_stdout
802+
803+
# Restore original file descriptors
804+
os.dup2(original_stdout_fd, 1)
805+
806+
# Read the captured output
807+
with open(stdout_path, "r") as f:
808+
stdout_content = f.read()
809+
810+
# Clean up temp files
811+
os.unlink(stdout_path)
812+
813+
# Verify that logs were flushed when the post_run_cell event was triggered
814+
# We should see the aggregated logs in the output
815+
assert re.search(
816+
r"\[10 similar log lines\].*ipython test log", stdout_content
817+
), stdout_content
818+
819+
finally:
820+
# Ensure file descriptors are restored even if something goes wrong
821+
try:
822+
os.dup2(original_stdout_fd, 1)
823+
os.close(original_stdout_fd)
824+
except OSError:
825+
pass
826+
827+
727828
# oss_skip: importlib not pulling resource correctly in git CI, needs to be revisited
728829
@pytest.mark.oss_skip
729830
async def test_flush_logs_fast_exit() -> None:

0 commit comments

Comments
 (0)