Skip to content

Commit 2354e37

Browse files
: set mesh logging defaults for notebooks (#1893)
Summary: D86994420 changed defaults: - `HYPERACTOR_MESH_ENABLE_LOG_FORWARDING=false` - `HYPERACTOR_MESH_ENABLE_FILE_CAPTURE=false` - `HYPERACTOR_MESH_TAIL_LOG_LINES=0` these defaults do not play well with interactive notebooks so this diff selectively overrides them when the execution environment is interactive (ipython, jupyter, bento). incidentally this diff adds a new test module `test_actor_logging.py`. today it has a new smoke test, in time i mean to break out the logging tests from `test_python_actor.py`, fix those that need it, and move them here. Reviewed By: mariusae Differential Revision: D87098535
1 parent 34e9810 commit 2354e37

File tree

5 files changed

+188
-34
lines changed

5 files changed

+188
-34
lines changed

hyperactor/src/config/global.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,16 @@ pub fn get<T: AttrValue + Copy>(key: Key<T>) -> T {
413413
*key.default().expect("key must have a default")
414414
}
415415

416+
/// Return the override value for `key` if it is explicitly present in
417+
/// `overrides`, otherwise fall back to the global value for that key.
418+
pub fn override_or_global<T: AttrValue + Copy>(overrides: &Attrs, key: Key<T>) -> T {
419+
if overrides.contains_key(key) {
420+
*overrides.get(key).unwrap()
421+
} else {
422+
get(key)
423+
}
424+
}
425+
416426
/// Get a key by cloning the value.
417427
///
418428
/// Resolution order: TestOverride -> Runtime -> Env -> File ->

hyperactor_mesh/src/alloc/process.rs

Lines changed: 14 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ use super::ProcStopReason;
4949
use crate::assign::Ranks;
5050
use crate::bootstrap;
5151
use crate::bootstrap::Allocator2Process;
52-
use crate::bootstrap::MESH_ENABLE_FILE_CAPTURE;
5352
use crate::bootstrap::MESH_ENABLE_LOG_FORWARDING;
5453
use crate::bootstrap::MESH_TAIL_LOG_LINES;
5554
use crate::bootstrap::Process2Allocator;
@@ -454,43 +453,30 @@ impl ProcessAlloc {
454453
}
455454
let mut cmd = self.cmd.lock().await;
456455

457-
// Read config (defaults are in 'bootstrap.rs').
456+
// In the case `MESH_ENABLE_LOG_FORWARDING` is set it's
457+
// probable the client execution context is a notebook. In
458+
// that case, for output from this process's children to
459+
// reach the client, we **must** use pipes and copy output
460+
// from child to parent (**`Stdio::inherit`** does not work!).
461+
// So, this variable is being used as a proxy for "use pipes"
462+
// here.
458463
let enable_forwarding = hyperactor::config::global::get(MESH_ENABLE_LOG_FORWARDING);
459-
let enable_file_capture = hyperactor::config::global::get(MESH_ENABLE_FILE_CAPTURE);
460464
let tail_size = hyperactor::config::global::get(MESH_TAIL_LOG_LINES);
461-
462-
// We don't support FileAppender in this v0 allocator path; warn if asked.
463-
if enable_file_capture {
464-
tracing::info!(
465-
"MESH_ENABLE_FILE_CAPTURE=true, but ProcessAllocator (v0) has no FileAppender; \
466-
files will NOT be written in this path"
467-
);
468-
}
469-
470-
let need_stdio = enable_forwarding || tail_size > 0;
471-
472-
if need_stdio {
465+
if enable_forwarding || tail_size > 0 {
473466
cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
474467
} else {
475468
cmd.stdout(Stdio::inherit()).stderr(Stdio::inherit());
476469
tracing::info!(
477-
enable_forwarding,
478-
enable_file_capture,
479-
tail_size,
480470
"child stdio NOT captured (forwarding/file_capture/tail all disabled); \
481471
inheriting parent console"
482472
);
483473
}
484-
485-
// Only allocate & export a log channel when forwarding is
486-
// enabled.
487-
let log_channel: Option<ChannelAddr> = if enable_forwarding {
488-
let addr = ChannelAddr::any(ChannelTransport::Unix);
489-
cmd.env(bootstrap::BOOTSTRAP_LOG_CHANNEL, addr.to_string());
490-
Some(addr)
491-
} else {
492-
None
493-
};
474+
// Regardless of the value of `MESH_ENABLE_LOG_FORWARDING`
475+
// (c.f. `enable_forwarding`), we do not do log forwarding on
476+
// these procs. This is because, now that we are on the v1
477+
// path, the only procs we spawn via this code path are those
478+
// to support `HostMeshAgent`s.
479+
let log_channel: Option<ChannelAddr> = None;
494480

495481
let index = self.created.len();
496482
self.created.push(ShortUuid::generate());

hyperactor_mesh/src/bootstrap.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ use hyperactor::clock::RealClock;
4545
use hyperactor::config::CONFIG;
4646
use hyperactor::config::ConfigAttr;
4747
use hyperactor::config::global as config;
48+
use hyperactor::config::global::override_or_global;
4849
use hyperactor::context;
4950
use hyperactor::declare_attrs;
5051
use hyperactor::host::Host;
@@ -1840,6 +1841,13 @@ impl ProcManager for BootstrapProcManager {
18401841
let (callback_addr, mut callback_rx) =
18411842
channel::serve(ChannelAddr::any(ChannelTransport::Unix))?;
18421843

1844+
// Decide whether we need to capture stdio.
1845+
let overrides = &config.client_config_override;
1846+
let enable_forwarding = override_or_global(overrides, MESH_ENABLE_LOG_FORWARDING);
1847+
let enable_file_capture = override_or_global(overrides, MESH_ENABLE_FILE_CAPTURE);
1848+
let tail_size = override_or_global(overrides, MESH_TAIL_LOG_LINES);
1849+
let need_stdio = enable_forwarding || enable_file_capture || tail_size > 0;
1850+
18431851
let mode = Bootstrap::Proc {
18441852
proc_id: proc_id.clone(),
18451853
backend_addr,
@@ -1854,12 +1862,6 @@ impl ProcManager for BootstrapProcManager {
18541862
.map_err(|e| HostError::ProcessConfigurationFailure(proc_id.clone(), e.into()))?,
18551863
);
18561864

1857-
// Decide whether we need to capture stdio.
1858-
let enable_forwarding = hyperactor::config::global::get(MESH_ENABLE_LOG_FORWARDING);
1859-
let enable_file_capture = hyperactor::config::global::get(MESH_ENABLE_FILE_CAPTURE);
1860-
let tail_size = hyperactor::config::global::get(MESH_TAIL_LOG_LINES);
1861-
let need_stdio = enable_forwarding || enable_file_capture || tail_size > 0;
1862-
18631865
if need_stdio {
18641866
cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
18651867
} else {

python/monarch/_src/actor/__init__.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,26 @@
99
"""
1010
Monarch Actor API
1111
"""
12+
13+
from monarch._rust_bindings.monarch_hyperactor.config import configure
14+
15+
# Detect if we're running in IPython/Jupyter
16+
_in_ipython = False
17+
try:
18+
# pyre-ignore[21]
19+
from IPython import get_ipython
20+
21+
_in_ipython = get_ipython() is not None
22+
except ImportError:
23+
pass
24+
25+
# Set notebook-friendly defaults for stdio piping when spawning procs.
26+
# These config is read by:
27+
# 1. Rust BootstrapProcManager::spawn() to decide whether to pipe
28+
# child stdio
29+
# 2. Rust LoggingMeshClient::spawn() to decide whether to spawn
30+
# LogForwardActors
31+
# Only apply these defaults overrides in notebook/IPython environments
32+
# where stdout **needs** to be captured.
33+
if _in_ipython:
34+
configure(enable_log_forwarding=True)

python/tests/test_actor_logging.py

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
# Copyright (c) Meta Platforms, Inc. and affiliates.
2+
# All rights reserved.
3+
#
4+
# This source code is licensed under the BSD-style license found in the
5+
# LICENSE file in the root directory of this source tree.
6+
7+
# pyre-unsafe
8+
9+
import asyncio
10+
import logging
11+
import os
12+
import re
13+
import sys
14+
import tempfile
15+
16+
import pytest
17+
from monarch._src.actor.host_mesh import this_host
18+
from monarch.actor import Actor, endpoint
19+
20+
21+
class Logger(Actor):
22+
def __init__(
23+
self, stdout_path: str | None = None, stderr_path: str | None = None
24+
) -> None:
25+
self._logger: logging.Logger = logging.getLogger()
26+
27+
# If file paths are provided, remove existing handlers to log
28+
# only to files.
29+
if stdout_path or stderr_path:
30+
self._logger.handlers.clear()
31+
32+
stdout_handler = (
33+
logging.FileHandler(stdout_path, mode="a")
34+
if stdout_path
35+
else logging.StreamHandler(sys.stdout)
36+
)
37+
stdout_handler.setLevel(logging.INFO)
38+
stdout_handler.addFilter(lambda record: record.levelno < logging.ERROR)
39+
40+
stderr_handler = (
41+
logging.FileHandler(stderr_path, mode="a")
42+
if stderr_path
43+
else logging.StreamHandler(sys.stderr)
44+
)
45+
stderr_handler.setLevel(logging.ERROR)
46+
47+
self._logger.addHandler(stdout_handler)
48+
self._logger.addHandler(stderr_handler)
49+
50+
self._stdout_handler = stdout_handler
51+
self._stderr_handler = stderr_handler
52+
53+
@endpoint
54+
async def log_warn(self, content: str) -> None:
55+
self._logger.warning(f"{content}")
56+
self._stdout_handler.flush()
57+
self._stderr_handler.flush()
58+
59+
@endpoint
60+
async def log_info(self, content: str) -> None:
61+
self._logger.info(f"{content}")
62+
self._stdout_handler.flush()
63+
self._stderr_handler.flush()
64+
65+
@endpoint
66+
async def log_error(self, content: str) -> None:
67+
self._logger.error(f"{content}")
68+
self._stdout_handler.flush()
69+
self._stderr_handler.flush()
70+
71+
72+
@pytest.mark.timeout(60)
73+
async def test_actor_logging_smoke() -> None:
74+
# Create temporary files to capture output.
75+
with tempfile.NamedTemporaryFile(
76+
mode="w+", delete=False, suffix="_stdout.log"
77+
) as stdout_file, tempfile.NamedTemporaryFile(
78+
mode="w+", delete=False, suffix="_stderr.log"
79+
) as stderr_file:
80+
stdout_path = stdout_file.name
81+
stderr_path = stderr_file.name
82+
83+
try:
84+
pm = this_host().spawn_procs(per_host={"gpus": 2})
85+
await pm.logging_option(level=logging.INFO)
86+
87+
# Log to the terminal.
88+
am_1 = pm.spawn("logger_1", Logger)
89+
await am_1.log_warn.call("hello 1")
90+
await am_1.log_info.call("hello 2")
91+
await am_1.log_error.call("hello 3")
92+
93+
# Log to files.
94+
am_2 = pm.spawn("logger_2", Logger, stdout_path, stderr_path)
95+
await am_2.log_warn.call("hello 1")
96+
await am_2.log_info.call("hello 2")
97+
await am_2.log_error.call("hello 3")
98+
99+
# Wait for output to be written.
100+
await asyncio.sleep(1)
101+
102+
# Read the captured output.
103+
with open(stdout_path, "r") as f:
104+
stdout_content = f.read()
105+
with open(stderr_path, "r") as f:
106+
stderr_content = f.read()
107+
108+
# Assertions on the captured output.
109+
assert re.search(
110+
r"hello 1", stdout_content
111+
), f"Expected 'hello 1' in stdout: {stdout_content}"
112+
assert re.search(
113+
r"hello 2", stdout_content
114+
), f"Expected 'hello 2' in stdout: {stdout_content}"
115+
assert re.search(
116+
r"hello 3", stderr_content
117+
), f"Expected 'hello 3' in stderr: {stderr_content}"
118+
assert re.search(
119+
r"\[actor=.*Logger.*\]", stdout_content
120+
), f"Expected actor prefix in stdout: {stdout_content}"
121+
122+
await pm.stop()
123+
124+
finally:
125+
# Clean up temp files.
126+
try:
127+
os.unlink(stdout_path)
128+
except OSError:
129+
pass
130+
try:
131+
os.unlink(stderr_path)
132+
except OSError:
133+
pass

0 commit comments

Comments
 (0)