Skip to content

Commit 22dc699

Browse files
pablorfb-metameta-codesync[bot]
authored andcommitted
Enable worker logs on IPython (#1407)
Summary: Pull Request resolved: #1407 Logs do not show on Notebooks running monarch with no Conda Why are FD watchers needed on IPython (Bento/Colab)? - On notebook, the top level process is IPython kernel which captures Python streams (sys.stdout/sys.stderr) - Monarch routes logs to parent’s process io::stdout/stderr (OS File Descriptors 1 and 2), which the IPython Kernel does not monitor by default - On buck, the top process is the Monarch binary itself, meaning that FD 1/2 are connected to the terminal, so logs show up automatically - monarch_conda automatically displays all logs More context: https://fburl.com/gdoc/36pyayi5 Reviewed By: pzhan9 Differential Revision: D83712603 fbshipit-source-id: 3bf3ac1a1b65def7e1ff461f137385957e64a023
1 parent 7221d6f commit 22dc699

File tree

2 files changed

+260
-1
lines changed

2 files changed

+260
-1
lines changed

python/monarch/_src/actor/logging.py

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
import logging
1010
import threading
11-
from typing import Optional, Union
11+
from typing import Optional, TextIO, Tuple, Union
1212

1313
from monarch._rust_bindings.monarch_extension.logging import LoggingMeshClient
1414

@@ -40,6 +40,8 @@
4040
_global_flush_registered = False
4141
_global_flush_lock = threading.Lock()
4242

43+
FD_READ_CHUNK_SIZE = 4096
44+
4345

4446
def flush_all_proc_mesh_logs(v1: bool = False) -> None:
4547
"""Flush logs from all active ProcMesh instances."""
@@ -101,6 +103,52 @@ def register_flusher_if_in_ipython(self) -> None:
101103
)
102104
_global_flush_registered = True
103105

106+
def enable_fd_capture_if_in_ipython(self) -> Optional[Tuple[int, int]]:
107+
"""
108+
On notebooks, the UI shows logs from Python streams (sys.stdout/sys.stderr), but
109+
Monarch actors write directly to the OS file descriptors 1/2 (stdout/stderr). Those
110+
low-level writes bypass Python’s streams and therefore don’t appear in the
111+
notebook output.
112+
113+
What this does:
114+
- Creates two OS pipes and uses dup2 to redirect the current process's
115+
stdout/stderr FDs (1/2) into those pipes.
116+
- Spawns tiny background threads that read bytes from the pipes and forward
117+
them into the notebook’s visible Python streams (sys.stdout/sys.stderr).
118+
119+
If in IPython, returns backups of the original FDs so they can be restored.
120+
"""
121+
if IN_IPYTHON:
122+
import os, sys
123+
124+
r1, w1 = os.pipe()
125+
r2, w2 = os.pipe()
126+
b1 = os.dup(1)
127+
b2 = os.dup(2)
128+
os.dup2(w1, 1)
129+
os.dup2(w2, 2)
130+
os.close(w1)
131+
os.close(w2)
132+
133+
def pump(fd: int, stream: TextIO) -> None:
134+
while True:
135+
chunk = os.read(fd, FD_READ_CHUNK_SIZE)
136+
if not chunk:
137+
break
138+
(
139+
stream.buffer.write(chunk)
140+
if hasattr(stream, "buffer")
141+
else stream.write(chunk.decode("utf-8", "replace"))
142+
)
143+
stream.flush()
144+
145+
threading.Thread(target=pump, args=(r1, sys.stdout), daemon=True).start()
146+
threading.Thread(target=pump, args=(r2, sys.stderr), daemon=True).start()
147+
148+
return b1, b2
149+
150+
return None
151+
104152
async def logging_option(
105153
self,
106154
stream_to_client: bool = True,
@@ -118,6 +166,7 @@ async def logging_option(
118166
level=level,
119167
)
120168
self.register_flusher_if_in_ipython()
169+
self.enable_fd_capture_if_in_ipython()
121170

122171
def flush(self) -> None:
123172
assert self._logging_mesh_client is not None
Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
# Copyright (c) Meta Platforms, Inc. and affiliates.
2+
# All rights reserved.
3+
#
4+
# This source code is licensed under the BSD-style license found in the
5+
# LICENSE file in the root directory of this source tree.
6+
7+
# pyre-strict
8+
9+
import logging
10+
from typing import Any
11+
from unittest import IsolatedAsyncioTestCase, TestCase
12+
from unittest.mock import Mock, patch
13+
14+
from monarch._rust_bindings.monarch_hyperactor.v1.proc_mesh import (
15+
ProcMesh as HyProcMeshV1,
16+
)
17+
from monarch._src.actor.logging import flush_all_proc_mesh_logs, LoggingManager
18+
19+
20+
class LoggingManagerTest(TestCase):
21+
def setUp(self) -> None:
22+
self.logging_manager = LoggingManager()
23+
24+
def test_init_initializes_logging_mesh_client_to_none(self) -> None:
25+
# Setup: create a new LoggingManager instance
26+
manager = LoggingManager()
27+
28+
# Execute: check initial state
29+
# Assert: confirm that _logging_mesh_client is initialized to None
30+
self.assertIsNone(manager._logging_mesh_client)
31+
32+
@patch("monarch._src.actor.logging.IN_IPYTHON", True)
33+
@patch("monarch._src.actor.logging.get_ipython")
34+
@patch("monarch._src.actor.logging._global_flush_registered", False)
35+
def test_register_flusher_if_in_ipython_registers_event(
36+
self, mock_get_ipython: Mock
37+
) -> None:
38+
# Setup: mock IPython environment
39+
mock_ipython = Mock()
40+
mock_get_ipython.return_value = mock_ipython
41+
42+
# Execute: register flusher
43+
self.logging_manager.register_flusher_if_in_ipython()
44+
45+
# Assert: post_run_cell event was registered
46+
mock_ipython.events.register.assert_called_once()
47+
args = mock_ipython.events.register.call_args[0]
48+
self.assertEqual(args[0], "post_run_cell")
49+
# Check that the callback is callable
50+
self.assertTrue(callable(args[1]))
51+
52+
@patch("monarch._src.actor.logging.IN_IPYTHON", False)
53+
def test_enable_fd_capture_if_not_in_ipython_returns_none(self) -> None:
54+
# Execute: try to enable FD capture when not in IPython
55+
result = self.logging_manager.enable_fd_capture_if_in_ipython()
56+
57+
# Assert: None is returned
58+
self.assertIsNone(result)
59+
60+
@patch("monarch._src.actor.logging.Future")
61+
@patch("monarch._src.actor.logging.context")
62+
def test_flush_calls_mesh_client_flush(
63+
self, mock_context: Mock, mock_future: Mock
64+
) -> None:
65+
# Setup: mock context, client, and Future
66+
mock_instance = Mock()
67+
mock_context.return_value.actor_instance._as_rust.return_value = mock_instance
68+
mock_client = Mock()
69+
mock_task = Mock()
70+
mock_client.flush.return_value.spawn.return_value.task.return_value = mock_task
71+
self.logging_manager._logging_mesh_client = mock_client
72+
73+
mock_future_instance = Mock()
74+
mock_future.return_value = mock_future_instance
75+
76+
# Execute: flush logs
77+
self.logging_manager.flush()
78+
79+
# Assert: mesh client flush was called
80+
mock_client.flush.assert_called_once_with(mock_instance)
81+
# Assert: Future was created and get was called with timeout
82+
mock_future.assert_called_once_with(coro=mock_task)
83+
mock_future_instance.get.assert_called_once_with(timeout=3)
84+
85+
@patch("monarch._src.actor.logging.Future")
86+
def test_flush_handles_exception_gracefully(self, mock_future: Mock) -> None:
87+
# Setup: mock client and Future that raises exception
88+
mock_client = Mock()
89+
self.logging_manager._logging_mesh_client = mock_client
90+
91+
mock_future_instance = Mock()
92+
mock_future_instance.get.side_effect = Exception("Test exception")
93+
mock_future.return_value = mock_future_instance
94+
95+
# Execute: flush logs (should not raise exception)
96+
self.logging_manager.flush()
97+
98+
# Assert: no exception is raised and method completes gracefully
99+
100+
101+
class FlushAllProcMeshLogsTest(TestCase):
102+
@patch("monarch._src.actor.proc_mesh.get_active_proc_meshes")
103+
def test_flush_all_proc_mesh_logs_calls_flush_on_all_meshes(
104+
self, mock_get_active: Mock
105+
) -> None:
106+
# Setup: create mock proc meshes
107+
mock_mesh1 = Mock()
108+
mock_mesh2 = Mock()
109+
mock_get_active.return_value = [mock_mesh1, mock_mesh2]
110+
111+
# Execute: flush all proc mesh logs
112+
flush_all_proc_mesh_logs()
113+
114+
# Assert: flush was called on all meshes
115+
mock_mesh1._logging_manager.flush.assert_called_once()
116+
mock_mesh2._logging_manager.flush.assert_called_once()
117+
118+
@patch("monarch._src.actor.proc_mesh.get_active_proc_meshes")
119+
def test_flush_all_proc_mesh_logs_handles_empty_list(
120+
self, mock_get_active: Mock
121+
) -> None:
122+
# Setup: no active proc meshes
123+
mock_get_active.return_value = []
124+
125+
# Execute: flush all proc mesh logs (should not raise exception)
126+
flush_all_proc_mesh_logs()
127+
128+
# Assert: method completes without error
129+
130+
131+
class LoggingManagerAsyncTest(IsolatedAsyncioTestCase):
132+
def setUp(self) -> None:
133+
self.logging_manager = LoggingManager()
134+
135+
@patch("monarch._src.actor.logging.LoggingMeshClientV1")
136+
@patch("monarch._src.actor.logging.context")
137+
async def test_init_with_hyprocmesh_creates_logging_mesh_client(
138+
self, mock_context: Mock, mock_logging_client: Mock
139+
) -> None:
140+
# Setup: mock the context and LoggingMeshClient
141+
mock_instance = Mock()
142+
mock_context.return_value.actor_instance._as_rust.return_value = mock_instance
143+
mock_proc_mesh = Mock(spec=HyProcMeshV1)
144+
145+
mock_client: Mock = Mock()
146+
147+
# Make spawn return a coroutine that resolves to mock_client
148+
async def mock_spawn(*args: Any, **kwargs: Any) -> Mock:
149+
return mock_client
150+
151+
mock_logging_client.spawn = mock_spawn
152+
153+
# Execute: initialize the logging manager with HyProcMeshV1
154+
await self.logging_manager.init(mock_proc_mesh, stream_to_client=True)
155+
156+
# Assert: set_mode was called with correct parameters
157+
mock_client.set_mode.assert_called_once_with(
158+
mock_instance,
159+
stream_to_client=True,
160+
aggregate_window_sec=3,
161+
level=logging.INFO,
162+
)
163+
self.assertEqual(self.logging_manager._logging_mesh_client, mock_client)
164+
165+
async def test_init_returns_early_if_already_initialized(self) -> None:
166+
# Setup: set _logging_mesh_client to a mock value
167+
mock_client = Mock()
168+
self.logging_manager._logging_mesh_client = mock_client
169+
170+
with patch(
171+
"monarch._src.actor.logging.LoggingMeshClient"
172+
) as mock_logging_client:
173+
# Execute: try to initialize again
174+
await self.logging_manager.init(Mock(), stream_to_client=True)
175+
176+
# Assert: LoggingMeshClient.spawn was not called
177+
mock_logging_client.spawn.assert_not_called()
178+
179+
@patch("monarch._src.actor.logging.context")
180+
async def test_logging_option_sets_mode_with_valid_parameters(
181+
self, mock_context: Mock
182+
) -> None:
183+
# Setup: mock context and client
184+
mock_instance = Mock()
185+
mock_context.return_value.actor_instance._as_rust.return_value = mock_instance
186+
mock_client = Mock()
187+
self.logging_manager._logging_mesh_client = mock_client
188+
189+
with patch.object(
190+
self.logging_manager, "register_flusher_if_in_ipython"
191+
) as mock_register, patch.object(
192+
self.logging_manager, "enable_fd_capture_if_in_ipython"
193+
) as mock_enable:
194+
# Execute: call logging_option with valid parameters
195+
await self.logging_manager.logging_option(
196+
stream_to_client=False,
197+
aggregate_window_sec=5,
198+
level=logging.WARNING,
199+
)
200+
201+
# Assert: set_mode was called with correct parameters
202+
mock_client.set_mode.assert_called_once_with(
203+
mock_instance,
204+
stream_to_client=False,
205+
aggregate_window_sec=5,
206+
level=logging.WARNING,
207+
)
208+
# Assert: helper methods were called
209+
mock_register.assert_called_once()
210+
mock_enable.assert_called_once()

0 commit comments

Comments
 (0)