Skip to content

Commit d65c476

Browse files
James Sunfacebook-github-bot
authored andcommitted
disallow logging option for local procs (#825)
Summary: there is no way to tail local procs; simply disallow it; I don't quite like the new interface introduced. RFC is needed. Differential Revision: D80063700
1 parent 158d8d5 commit d65c476

File tree

4 files changed

+74
-58
lines changed

4 files changed

+74
-58
lines changed

hyperactor_mesh/src/logging.rs

Lines changed: 8 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ use hyperactor::Unbind;
3434
use hyperactor::channel;
3535
use hyperactor::channel::ChannelAddr;
3636
use hyperactor::channel::ChannelRx;
37-
use hyperactor::channel::ChannelTransport;
3837
use hyperactor::channel::ChannelTx;
3938
use hyperactor::channel::Rx;
4039
use hyperactor::channel::Tx;
@@ -543,6 +542,10 @@ impl<T: LogSender + Unpin + 'static, S: io::AsyncWrite + Send + Unpin + 'static>
543542
// Since LogSender::send takes &self, we don't need to clone it
544543
if let Err(e) = this.log_sender.send(output_target, data_to_send) {
545544
tracing::error!("error sending log: {}", e);
545+
return Poll::Ready(Err(io::Error::other(format!(
546+
"error sending write message: {}",
547+
e
548+
))));
546549
}
547550
// Return success with the full buffer size
548551
Poll::Ready(Ok(buf.len()))
@@ -632,37 +635,15 @@ impl Actor for LogForwardActor {
632635
async fn new(logging_client_ref: Self::Params) -> Result<Self> {
633636
let log_channel: ChannelAddr = match std::env::var(BOOTSTRAP_LOG_CHANNEL) {
634637
Ok(channel) => channel.parse()?,
635-
Err(err) => {
636-
tracing::debug!(
637-
"log forwarder actor failed to read env var {}: {}",
638-
BOOTSTRAP_LOG_CHANNEL,
639-
err
640-
);
641-
// TODO: an empty channel to serve
642-
ChannelAddr::any(ChannelTransport::Unix)
643-
}
638+
Err(err) => return Err(err.into()),
644639
};
645640
tracing::info!(
646641
"log forwarder {} serve at {}",
647642
std::process::id(),
648643
log_channel
649644
);
650645

651-
let rx = match channel::serve(log_channel.clone()).await {
652-
Ok((_, rx)) => rx,
653-
Err(err) => {
654-
// This can happen if we are not spanwed on a separate process like local.
655-
// For local mesh, log streaming anyway is not needed.
656-
tracing::error!(
657-
"log forwarder actor failed to bootstrap on given channel {}: {}",
658-
log_channel,
659-
err
660-
);
661-
channel::serve(ChannelAddr::any(ChannelTransport::Unix))
662-
.await?
663-
.1
664-
}
665-
};
646+
let (_, rx) = channel::serve(log_channel.clone()).await?;
666647
Ok(Self {
667648
rx,
668649
logging_client_ref,
@@ -761,15 +742,7 @@ impl Actor for LogFlushActor {
761742
async fn new(_: ()) -> Result<Self, anyhow::Error> {
762743
let log_channel: ChannelAddr = match std::env::var(BOOTSTRAP_LOG_CHANNEL) {
763744
Ok(channel) => channel.parse()?,
764-
Err(err) => {
765-
tracing::debug!(
766-
"log forwarder actor failed to read env var {}: {}",
767-
BOOTSTRAP_LOG_CHANNEL,
768-
err
769-
);
770-
// TODO: this should error out; it can only happen with local proc; we need to fix it.
771-
ChannelAddr::any(ChannelTransport::Unix)
772-
}
745+
Err(err) => return Err(err.into()),
773746
};
774747
let tx = channel::dial::<LogMessage>(log_channel)?;
775748

@@ -993,6 +966,7 @@ mod tests {
993966

994967
use hyperactor::channel;
995968
use hyperactor::channel::ChannelAddr;
969+
use hyperactor::channel::ChannelTransport;
996970
use hyperactor::channel::ChannelTx;
997971
use hyperactor::channel::Tx;
998972
use hyperactor::id;

python/monarch/_src/actor/allocator.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,12 @@ def allocate(self, spec: AllocSpec) -> "AllocHandle":
6565
"""
6666
return AllocHandle(self.allocate_nonblocking(spec).spawn(), spec.extent)
6767

68+
def fork_processses(self) -> bool:
69+
"""
70+
Return a boolean indicating whether the allocator forks processes.
71+
"""
72+
return True
73+
6874

6975
@final
7076
class ProcessAllocator(ProcessAllocatorBase, AllocateMixin):
@@ -79,6 +85,9 @@ class LocalAllocator(LocalAllocatorBase, AllocateMixin):
7985
An allocator that allocates by spawning actors into the current process.
8086
"""
8187

88+
def fork_processses(self) -> bool:
89+
return False
90+
8291

8392
@final
8493
class SimAllocator(SimAllocatorBase, AllocateMixin):

python/monarch/_src/actor/proc_mesh.py

Lines changed: 48 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ def __init__(
133133
self,
134134
hy_proc_mesh: "Shared[HyProcMesh]",
135135
shape: Shape,
136+
_fork_processes: bool,
136137
_device_mesh: Optional["DeviceMesh"] = None,
137138
) -> None:
138139
self._proc_mesh = hy_proc_mesh
@@ -146,6 +147,7 @@ def __init__(
146147
self._code_sync_client: Optional[CodeSyncMeshClient] = None
147148
self._logging_mesh_client: Optional[LoggingMeshClient] = None
148149
self._maybe_device_mesh: Optional["DeviceMesh"] = _device_mesh
150+
self._fork_processes = _fork_processes
149151
self._stopped = False
150152

151153
@property
@@ -163,41 +165,50 @@ async def task() -> Literal[True]:
163165

164166
return Future(coro=task())
165167

166-
def _init_manager_actors(self, setup: Callable[[], None] | None = None) -> None:
168+
def _init_manager_actors(
169+
self, setup: Callable[[], None] | None = None, _fork_processes: bool = True
170+
) -> None:
167171
self._proc_mesh = PythonTask.from_coroutine(
168-
self._init_manager_actors_coro(self._proc_mesh, setup)
172+
self._init_manager_actors_coro(self._proc_mesh, setup, _fork_processes)
169173
).spawn()
170174

171175
async def _init_manager_actors_coro(
172176
self,
173177
proc_mesh_: "Shared[HyProcMesh]",
174178
setup: Callable[[], None] | None = None,
179+
_fork_processes: bool = True,
175180
) -> "HyProcMesh":
176181
proc_mesh: HyProcMesh = await proc_mesh_
177182
# WARNING: it is unsafe to await self._proc_mesh here
178183
# because self._proc_mesh is the result of this function itself!
179184

180-
self._logging_mesh_client = await LoggingMeshClient.spawn(proc_mesh=proc_mesh)
181-
self._logging_mesh_client.set_mode(
182-
stream_to_client=True,
183-
aggregate_window_sec=3,
184-
level=logging.INFO,
185-
)
186-
if HAS_IPYTHON and get_ipython() is not None:
187-
# For ipython environment, a cell can end fast with threads running in background.
188-
# Flush all the ongoing logs proactively to avoid missing logs.
189-
assert self._logging_mesh_client is not None
190-
logging_client: LoggingMeshClient = self._logging_mesh_client
191-
ipython = get_ipython()
185+
if _fork_processes:
186+
# logging mesh is only makes sense with forked (remote or local) processes
187+
self._logging_mesh_client = await LoggingMeshClient.spawn(
188+
proc_mesh=proc_mesh
189+
)
190+
self._logging_mesh_client.set_mode(
191+
stream_to_client=True,
192+
aggregate_window_sec=3,
193+
level=logging.INFO,
194+
)
195+
if HAS_IPYTHON and get_ipython() is not None:
196+
# For ipython environment, a cell can end fast with threads running in background.
197+
# Flush all the ongoing logs proactively to avoid missing logs.
198+
assert self._logging_mesh_client is not None
199+
logging_client: LoggingMeshClient = self._logging_mesh_client
200+
ipython = get_ipython()
192201

193-
# pyre-ignore[21]
194-
from IPython.core.interactiveshell import ExecutionResult
202+
# pyre-ignore[21]
203+
from IPython.core.interactiveshell import ExecutionResult
195204

196-
# pyre-ignore[11]
197-
def flush_logs(_: ExecutionResult) -> None:
198-
return Future(coro=logging_client.flush(proc_mesh).spawn().task()).get()
205+
# pyre-ignore[11]
206+
def flush_logs(_: ExecutionResult) -> None:
207+
return Future(
208+
coro=logging_client.flush(proc_mesh).spawn().task()
209+
).get()
199210

200-
ipython.events.register("post_run_cell", flush_logs)
211+
ipython.events.register("post_run_cell", flush_logs)
201212

202213
_rdma_manager = (
203214
# type: ignore[16]
@@ -239,7 +250,12 @@ def _new_with_shape(self, shape: Shape) -> "ProcMesh":
239250
if self._maybe_device_mesh is None
240251
else self._device_mesh._new_with_shape(shape)
241252
)
242-
pm = ProcMesh(self._proc_mesh, shape, _device_mesh=device_mesh)
253+
pm = ProcMesh(
254+
self._proc_mesh,
255+
shape,
256+
_device_mesh=device_mesh,
257+
_fork_processes=self._fork_processes,
258+
)
243259
pm._slice = True
244260
return pm
245261

@@ -284,6 +300,7 @@ def from_alloc(
284300
alloc: AllocHandle,
285301
setup: Callable[[], None] | None = None,
286302
_init_manager_actors: bool = True,
303+
_fork_processes: bool = True,
287304
) -> "ProcMesh":
288305
"""
289306
Allocate a process mesh according to the provided alloc.
@@ -311,10 +328,10 @@ async def task() -> HyProcMesh:
311328
list(alloc._extent.keys()),
312329
Slice.new_row_major(list(alloc._extent.values())),
313330
)
314-
pm = ProcMesh(PythonTask.from_coroutine(task()).spawn(), shape)
331+
pm = ProcMesh(PythonTask.from_coroutine(task()).spawn(), shape, _fork_processes)
315332

316333
if _init_manager_actors:
317-
pm._init_manager_actors(setup)
334+
pm._init_manager_actors(setup, _fork_processes)
318335
return pm
319336

320337
def __repr__(self) -> str:
@@ -420,6 +437,11 @@ async def logging_option(
420437
Returns:
421438
None
422439
"""
440+
if not self._fork_processes:
441+
raise RuntimeError(
442+
"Logging option is only available for allocators that fork processes. Allocators like LocalAllocator are not supported."
443+
)
444+
423445
if level < 0 or level > 255:
424446
raise ValueError("Invalid logging level: {}".format(level))
425447
await self.initialized
@@ -510,7 +532,9 @@ def _proc_mesh_from_allocator(
510532
# in the order of the dimensions.
511533
spec: AllocSpec = AllocSpec(AllocConstraints(), hosts=hosts, gpus=gpus)
512534
alloc = allocator.allocate(spec)
513-
return ProcMesh.from_alloc(alloc, setup, _init_manager_actors)
535+
return ProcMesh.from_alloc(
536+
alloc, setup, _init_manager_actors, allocator.fork_processses()
537+
)
514538

515539

516540
def proc_mesh(

python/tests/test_python_actors.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1117,3 +1117,12 @@ def s(t):
11171117
b = PythonTask.spawn_blocking(lambda: s(0))
11181118
r = PythonTask.select_one([a.task(), b.task()]).block_on()
11191119
assert r == (0, 1)
1120+
1121+
1122+
async def test_logging_option_on_local_procs() -> None:
1123+
proc_mesh = local_proc_mesh(gpus=1)
1124+
with pytest.raises(
1125+
RuntimeError,
1126+
match="Logging option is only available for allocators that fork processes",
1127+
):
1128+
await proc_mesh.logging_option(stream_to_client=True)

0 commit comments

Comments
 (0)