Skip to content

Commit e47a107

Browse files
James Sunfacebook-github-bot
authored andcommitted
disallow logging option for local procs (#825)
Summary: Pull Request resolved: #825 there is no way to tail local procs; simply disallow it; I don't quite like the new interface introduced. RFC is needed. Differential Revision: D80063700
1 parent 1fdb59d commit e47a107

File tree

4 files changed

+74
-46
lines changed

4 files changed

+74
-46
lines changed

hyperactor_mesh/src/logging.rs

Lines changed: 8 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ use hyperactor::Unbind;
3434
use hyperactor::channel;
3535
use hyperactor::channel::ChannelAddr;
3636
use hyperactor::channel::ChannelRx;
37-
use hyperactor::channel::ChannelTransport;
3837
use hyperactor::channel::ChannelTx;
3938
use hyperactor::channel::Rx;
4039
use hyperactor::channel::Tx;
@@ -539,6 +538,10 @@ impl<T: LogSender + Unpin + 'static, S: io::AsyncWrite + Send + Unpin + 'static>
539538
// Since LogSender::send takes &self, we don't need to clone it
540539
if let Err(e) = this.log_sender.send(output_target, data_to_send) {
541540
tracing::error!("error sending log: {}", e);
541+
return Poll::Ready(Err(io::Error::other(format!(
542+
"error sending write message: {}",
543+
e
544+
))));
542545
}
543546
// Return success with the full buffer size
544547
Poll::Ready(Ok(buf.len()))
@@ -610,37 +613,15 @@ impl Actor for LogForwardActor {
610613
async fn new(logging_client_ref: Self::Params) -> Result<Self> {
611614
let log_channel: ChannelAddr = match std::env::var(BOOTSTRAP_LOG_CHANNEL) {
612615
Ok(channel) => channel.parse()?,
613-
Err(err) => {
614-
tracing::debug!(
615-
"log forwarder actor failed to read env var {}: {}",
616-
BOOTSTRAP_LOG_CHANNEL,
617-
err
618-
);
619-
// TODO: an empty channel to serve
620-
ChannelAddr::any(ChannelTransport::Unix)
621-
}
616+
Err(err) => return Err(err.into()),
622617
};
623618
tracing::info!(
624619
"log forwarder {} serve at {}",
625620
std::process::id(),
626621
log_channel
627622
);
628623

629-
let rx = match channel::serve(log_channel.clone()).await {
630-
Ok((_, rx)) => rx,
631-
Err(err) => {
632-
// This can happen if we are not spanwed on a separate process like local.
633-
// For local mesh, log streaming anyway is not needed.
634-
tracing::error!(
635-
"log forwarder actor failed to bootstrap on given channel {}: {}",
636-
log_channel,
637-
err
638-
);
639-
channel::serve(ChannelAddr::any(ChannelTransport::Unix))
640-
.await?
641-
.1
642-
}
643-
};
624+
let (_, rx) = channel::serve(log_channel.clone()).await?;
644625
Ok(Self {
645626
rx,
646627
logging_client_ref,
@@ -743,15 +724,7 @@ impl Actor for LogFlushActor {
743724
async fn new(_: ()) -> Result<Self, anyhow::Error> {
744725
let log_channel: ChannelAddr = match std::env::var(BOOTSTRAP_LOG_CHANNEL) {
745726
Ok(channel) => channel.parse()?,
746-
Err(err) => {
747-
tracing::debug!(
748-
"log forwarder actor failed to read env var {}: {}",
749-
BOOTSTRAP_LOG_CHANNEL,
750-
err
751-
);
752-
// TODO: this should error out; it can only happen with local proc; we need to fix it.
753-
ChannelAddr::any(ChannelTransport::Unix)
754-
}
727+
Err(err) => return Err(err.into()),
755728
};
756729
let tx = channel::dial::<LogMessage>(log_channel)?;
757730

@@ -1014,6 +987,7 @@ mod tests {
1014987

1015988
use hyperactor::channel;
1016989
use hyperactor::channel::ChannelAddr;
990+
use hyperactor::channel::ChannelTransport;
1017991
use hyperactor::channel::ChannelTx;
1018992
use hyperactor::channel::Tx;
1019993
use hyperactor::id;

python/monarch/_src/actor/allocator.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
class AllocHandle(DeprecatedNotAFuture):
3434
_hy_alloc: "Shared[Alloc]"
3535
_extent: Dict[str, int]
36+
_fork_processes: bool
3637

3738
@property
3839
def initialized(self) -> Future[Literal[True]]:
@@ -48,6 +49,10 @@ async def task() -> Literal[True]:
4849

4950
return Future(coro=task())
5051

52+
@property
53+
def fork_processes(self) -> bool:
54+
return self._fork_processes
55+
5156

5257
class AllocateMixin(abc.ABC):
5358
@abc.abstractmethod
@@ -63,7 +68,14 @@ def allocate(self, spec: AllocSpec) -> "AllocHandle":
6368
Returns:
6469
- A future that will be fulfilled when the requested allocation is fulfilled.
6570
"""
66-
return AllocHandle(self.allocate_nonblocking(spec).spawn(), spec.extent)
71+
return AllocHandle(
72+
self.allocate_nonblocking(spec).spawn(),
73+
spec.extent,
74+
self._fork_processes(),
75+
)
76+
77+
@abc.abstractmethod
78+
def _fork_processes(self) -> bool: ...
6779

6880

6981
@final
@@ -72,20 +84,29 @@ class ProcessAllocator(ProcessAllocatorBase, AllocateMixin):
7284
An allocator that allocates by spawning local processes.
7385
"""
7486

87+
def _fork_processes(self) -> bool:
88+
return True
89+
7590

7691
@final
7792
class LocalAllocator(LocalAllocatorBase, AllocateMixin):
7893
"""
7994
An allocator that allocates by spawning actors into the current process.
8095
"""
8196

97+
def _fork_processes(self) -> bool:
98+
return False
99+
82100

83101
@final
84102
class SimAllocator(SimAllocatorBase, AllocateMixin):
85103
"""
86104
An allocator that allocates by spawning actors into the current process using simulated channels for transport
87105
"""
88106

107+
def _fork_processes(self) -> bool:
108+
return False
109+
89110

90111
class RemoteAllocInitializer(abc.ABC):
91112
"""Subclass-able Python interface for `hyperactor_mesh::alloc::remoteprocess:RemoteProcessAllocInitializer`.
@@ -219,3 +240,6 @@ class RemoteAllocator(RemoteAllocatorBase, AllocateMixin):
219240
An allocator that allocates by spawning actors on a remote host.
220241
The remote host must be running hyperactor's remote-process-allocator.
221242
"""
243+
244+
def _fork_processes(self) -> bool:
245+
return True

python/monarch/_src/actor/proc_mesh.py

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ def __init__(
128128
self,
129129
hy_proc_mesh: "Shared[HyProcMesh]",
130130
shape: Shape,
131+
_fork_processes: bool,
131132
_device_mesh: Optional["DeviceMesh"] = None,
132133
) -> None:
133134
self._proc_mesh = hy_proc_mesh
@@ -141,6 +142,7 @@ def __init__(
141142
self._code_sync_client: Optional[CodeSyncMeshClient] = None
142143
self._logging_mesh_client: Optional[LoggingMeshClient] = None
143144
self._maybe_device_mesh: Optional["DeviceMesh"] = _device_mesh
145+
self._fork_processes = _fork_processes
144146
self._stopped = False
145147

146148
@property
@@ -158,27 +160,34 @@ async def task() -> Literal[True]:
158160

159161
return Future(coro=task())
160162

161-
def _init_manager_actors(self, setup: Callable[[], None] | None = None) -> None:
163+
def _init_manager_actors(
164+
self, setup: Callable[[], None] | None = None, _fork_processes: bool = True
165+
) -> None:
162166
self._proc_mesh = PythonTask.from_coroutine(
163-
self._init_manager_actors_coro(self._proc_mesh, setup)
167+
self._init_manager_actors_coro(self._proc_mesh, setup, _fork_processes)
164168
).spawn()
165169

166170
async def _init_manager_actors_coro(
167171
self,
168172
proc_mesh_: "Shared[HyProcMesh]",
169173
setup: Callable[[], None] | None = None,
174+
_fork_processes: bool = True,
170175
) -> "HyProcMesh":
171176
# WARNING: it is unsafe to await self._proc_mesh here
172177
# because self._proc_mesh is the result of this function itself!
173178

174179
proc_mesh = await proc_mesh_
175180

176-
self._logging_mesh_client = await LoggingMeshClient.spawn(proc_mesh=proc_mesh)
177-
self._logging_mesh_client.set_mode(
178-
stream_to_client=True,
179-
aggregate_window_sec=3,
180-
level=logging.INFO,
181-
)
181+
if _fork_processes:
182+
# logging mesh is only makes sense with forked (remote or local) processes
183+
self._logging_mesh_client = await LoggingMeshClient.spawn(
184+
proc_mesh=proc_mesh
185+
)
186+
self._logging_mesh_client.set_mode(
187+
stream_to_client=True,
188+
aggregate_window_sec=3,
189+
level=logging.INFO,
190+
)
182191

183192
_rdma_manager = (
184193
# type: ignore[16]
@@ -215,7 +224,12 @@ def _new_with_shape(self, shape: Shape) -> "ProcMesh":
215224
if self._maybe_device_mesh is None
216225
else self._device_mesh._new_with_shape(shape)
217226
)
218-
pm = ProcMesh(self._proc_mesh, shape, _device_mesh=device_mesh)
227+
pm = ProcMesh(
228+
self._proc_mesh,
229+
shape,
230+
_device_mesh=device_mesh,
231+
_fork_processes=self._fork_processes,
232+
)
219233
pm._slice = True
220234
return pm
221235

@@ -287,10 +301,12 @@ async def task() -> HyProcMesh:
287301
list(alloc._extent.keys()),
288302
Slice.new_row_major(list(alloc._extent.values())),
289303
)
290-
pm = ProcMesh(PythonTask.from_coroutine(task()).spawn(), shape)
304+
pm = ProcMesh(
305+
PythonTask.from_coroutine(task()).spawn(), shape, alloc.fork_processes
306+
)
291307

292308
if _init_manager_actors:
293-
pm._init_manager_actors(setup)
309+
pm._init_manager_actors(setup, alloc.fork_processes)
294310
# we do this here rather than in _init_manager_actors
295311
# because serializing debug_client() requires waiting on
296312
# its actor to spawn which would block the tokio event loop inside
@@ -439,6 +455,11 @@ async def logging_option(
439455
Returns:
440456
None
441457
"""
458+
if not self._fork_processes:
459+
raise RuntimeError(
460+
"Logging option is only available for allocators that fork processes. Allocators like LocalAllocator are not supported."
461+
)
462+
442463
if level < 0 or level > 255:
443464
raise ValueError("Invalid logging level: {}".format(level))
444465
await self.initialized

python/tests/test_python_actors.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1085,3 +1085,12 @@ def test_mesh_len():
10851085
proc_mesh = local_proc_mesh(gpus=12).get()
10861086
s = proc_mesh.spawn("sync_actor", SyncActor).get()
10871087
assert 12 == len(s)
1088+
1089+
1090+
async def test_logging_option_on_local_procs() -> None:
1091+
proc_mesh = local_proc_mesh(gpus=1)
1092+
with pytest.raises(
1093+
RuntimeError,
1094+
match="Logging option is only available for allocators that fork processes",
1095+
):
1096+
await proc_mesh.logging_option(stream_to_client=True)

0 commit comments

Comments
 (0)