Skip to content

Commit 682ae0f

Browse files
highkerfacebook-github-bot
authored andcommitted
disallow logging option for local procs
Summary: there is no way to tail local procs; simply disallow it; I don't quite like the new interface introduced. RFC is needed. Differential Revision: D80063700
1 parent d21ba99 commit 682ae0f

File tree

4 files changed

+70
-45
lines changed

4 files changed

+70
-45
lines changed

hyperactor_mesh/src/logging.rs

Lines changed: 8 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ use hyperactor::Unbind;
3434
use hyperactor::channel;
3535
use hyperactor::channel::ChannelAddr;
3636
use hyperactor::channel::ChannelRx;
37-
use hyperactor::channel::ChannelTransport;
3837
use hyperactor::channel::ChannelTx;
3938
use hyperactor::channel::Rx;
4039
use hyperactor::channel::Tx;
@@ -550,6 +549,10 @@ impl<T: LogSender + Unpin + 'static, S: io::AsyncWrite + Send + Unpin + 'static>
550549
// Since LogSender::send takes &self, we don't need to clone it
551550
if let Err(e) = this.log_sender.send(output_target, data_to_send) {
552551
tracing::error!("error sending log: {}", e);
552+
return Poll::Ready(Err(io::Error::other(format!(
553+
"error sending write message: {}",
554+
e
555+
))));
553556
}
554557
// Return success with the full buffer size
555558
Poll::Ready(Ok(buf.len()))
@@ -621,37 +624,15 @@ impl Actor for LogForwardActor {
621624
async fn new(logging_client_ref: Self::Params) -> Result<Self> {
622625
let log_channel: ChannelAddr = match std::env::var(BOOTSTRAP_LOG_CHANNEL) {
623626
Ok(channel) => channel.parse()?,
624-
Err(err) => {
625-
tracing::debug!(
626-
"log forwarder actor failed to read env var {}: {}",
627-
BOOTSTRAP_LOG_CHANNEL,
628-
err
629-
);
630-
// TODO: an empty channel to serve
631-
ChannelAddr::any(ChannelTransport::Unix)
632-
}
627+
Err(err) => return Err(err.into()),
633628
};
634629
tracing::info!(
635630
"log forwarder {} serve at {}",
636631
std::process::id(),
637632
log_channel
638633
);
639634

640-
let rx = match channel::serve(log_channel.clone()).await {
641-
Ok((_, rx)) => rx,
642-
Err(err) => {
643-
// This can happen if we are not spanwed on a separate process like local.
644-
// For local mesh, log streaming anyway is not needed.
645-
tracing::error!(
646-
"log forwarder actor failed to bootstrap on given channel {}: {}",
647-
log_channel,
648-
err
649-
);
650-
channel::serve(ChannelAddr::any(ChannelTransport::Unix))
651-
.await?
652-
.1
653-
}
654-
};
635+
let (_, rx) = channel::serve(log_channel.clone()).await?;
655636
Ok(Self {
656637
rx,
657638
logging_client_ref,
@@ -754,15 +735,7 @@ impl Actor for LogFlushActor {
754735
async fn new(_: ()) -> Result<Self, anyhow::Error> {
755736
let log_channel: ChannelAddr = match std::env::var(BOOTSTRAP_LOG_CHANNEL) {
756737
Ok(channel) => channel.parse()?,
757-
Err(err) => {
758-
tracing::debug!(
759-
"log forwarder actor failed to read env var {}: {}",
760-
BOOTSTRAP_LOG_CHANNEL,
761-
err
762-
);
763-
// TODO: this should error out; it can only happen with local proc; we need to fix it.
764-
ChannelAddr::any(ChannelTransport::Unix)
765-
}
738+
Err(err) => return Err(err.into()),
766739
};
767740
let tx = channel::dial::<LogMessage>(log_channel)?;
768741

@@ -1036,6 +1009,7 @@ mod tests {
10361009

10371010
use hyperactor::channel;
10381011
use hyperactor::channel::ChannelAddr;
1012+
use hyperactor::channel::ChannelTransport;
10391013
use hyperactor::channel::ChannelTx;
10401014
use hyperactor::channel::Tx;
10411015
use hyperactor::id;

python/monarch/_src/actor/allocator.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
class AllocHandle(DeprecatedNotAFuture):
3434
_hy_alloc: "Shared[Alloc]"
3535
_extent: Dict[str, int]
36+
_fork_processes: bool
3637

3738
@property
3839
def initialized(self) -> Future[Literal[True]]:
@@ -48,6 +49,10 @@ async def task() -> Literal[True]:
4849

4950
return Future(coro=task())
5051

52+
@property
53+
def fork_processes(self) -> bool:
54+
return self._fork_processes
55+
5156

5257
class AllocateMixin(abc.ABC):
5358
@abc.abstractmethod
@@ -63,7 +68,14 @@ def allocate(self, spec: AllocSpec) -> "AllocHandle":
6368
Returns:
6469
- A future that will be fulfilled when the requested allocation is fulfilled.
6570
"""
66-
return AllocHandle(self.allocate_nonblocking(spec).spawn(), spec.extent)
71+
return AllocHandle(
72+
self.allocate_nonblocking(spec).spawn(),
73+
spec.extent,
74+
self._fork_processes(),
75+
)
76+
77+
@abc.abstractmethod
78+
def _fork_processes(self) -> bool: ...
6779

6880

6981
@final
@@ -72,20 +84,29 @@ class ProcessAllocator(ProcessAllocatorBase, AllocateMixin):
7284
An allocator that allocates by spawning local processes.
7385
"""
7486

87+
def _fork_processes(self) -> bool:
88+
return True
89+
7590

7691
@final
7792
class LocalAllocator(LocalAllocatorBase, AllocateMixin):
7893
"""
7994
An allocator that allocates by spawning actors into the current process.
8095
"""
8196

97+
def _fork_processes(self) -> bool:
98+
return False
99+
82100

83101
@final
84102
class SimAllocator(SimAllocatorBase, AllocateMixin):
85103
"""
86104
An allocator that allocates by spawning actors into the current process using simulated channels for transport
87105
"""
88106

107+
def _fork_processes(self) -> bool:
108+
return False
109+
89110

90111
class RemoteAllocInitializer(abc.ABC):
91112
"""Subclass-able Python interface for `hyperactor_mesh::alloc::remoteprocess:RemoteProcessAllocInitializer`.
@@ -219,3 +240,6 @@ class RemoteAllocator(RemoteAllocatorBase, AllocateMixin):
219240
An allocator that allocates by spawning actors on a remote host.
220241
The remote host must be running hyperactor's remote-process-allocator.
221242
"""
243+
244+
def _fork_processes(self) -> bool:
245+
return True

python/monarch/_src/actor/proc_mesh.py

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,7 @@ def __init__(
179179
self,
180180
hy_proc_mesh: "Shared[HyProcMesh]",
181181
shape: Shape,
182+
_fork_processes: bool,
182183
_device_mesh: Optional["DeviceMesh"] = None,
183184
) -> None:
184185
self._proc_mesh = hy_proc_mesh
@@ -193,6 +194,7 @@ def __init__(
193194
self._code_sync_client: Optional[CodeSyncMeshClient] = None
194195
self._logging_mesh_client: Optional[LoggingMeshClient] = None
195196
self._maybe_device_mesh: Optional["DeviceMesh"] = _device_mesh
197+
self._fork_processes = _fork_processes
196198
self._stopped = False
197199
self._controller_controller: Optional["_ControllerController"] = None
198200

@@ -225,7 +227,12 @@ def _new_with_shape(self, shape: Shape) -> "ProcMesh":
225227
if self._maybe_device_mesh is None
226228
else self._device_mesh._new_with_shape(shape)
227229
)
228-
pm = ProcMesh(self._proc_mesh, shape, _device_mesh=device_mesh)
230+
pm = ProcMesh(
231+
self._proc_mesh,
232+
shape,
233+
_fork_processes=self._fork_processes,
234+
_device_mesh=device_mesh,
235+
)
229236
pm._slice = True
230237
return pm
231238

@@ -300,7 +307,12 @@ async def task() -> HyProcMesh:
300307

301308
hy_proc_mesh = PythonTask.from_coroutine(task()).spawn()
302309

303-
pm = ProcMesh(hy_proc_mesh, shape)
310+
fork_processes: bool = alloc.fork_processes
311+
pm = ProcMesh(
312+
hy_proc_mesh,
313+
shape,
314+
_fork_processes=fork_processes,
315+
)
304316

305317
async def task(
306318
pm: "ProcMesh",
@@ -309,14 +321,15 @@ async def task(
309321
) -> HyProcMesh:
310322
hy_proc_mesh = await hy_proc_mesh_task
311323

312-
pm._logging_mesh_client = await LoggingMeshClient.spawn(
313-
proc_mesh=hy_proc_mesh
314-
)
315-
pm._logging_mesh_client.set_mode(
316-
stream_to_client=True,
317-
aggregate_window_sec=3,
318-
level=logging.INFO,
319-
)
324+
if fork_processes:
325+
pm._logging_mesh_client = await LoggingMeshClient.spawn(
326+
proc_mesh=hy_proc_mesh
327+
)
328+
pm._logging_mesh_client.set_mode(
329+
stream_to_client=True,
330+
aggregate_window_sec=3,
331+
level=logging.INFO,
332+
)
320333

321334
if setup_actor is not None:
322335
await setup_actor.setup.call()
@@ -483,6 +496,11 @@ async def logging_option(
483496
Returns:
484497
None
485498
"""
499+
if not self._fork_processes:
500+
raise RuntimeError(
501+
"Logging option is only available for allocators that fork processes. Allocators like LocalAllocator are not supported."
502+
)
503+
486504
if level < 0 or level > 255:
487505
raise ValueError("Invalid logging level: {}".format(level))
488506
await self.initialized

python/tests/test_python_actors.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1101,3 +1101,12 @@ def test_mesh_len():
11011101
proc_mesh = local_proc_mesh(gpus=12).get()
11021102
s = proc_mesh.spawn("sync_actor", SyncActor).get()
11031103
assert 12 == len(s)
1104+
1105+
1106+
async def test_logging_option_on_local_procs() -> None:
1107+
proc_mesh = local_proc_mesh(gpus=1)
1108+
with pytest.raises(
1109+
RuntimeError,
1110+
match="Logging option is only available for allocators that fork processes",
1111+
):
1112+
await proc_mesh.logging_option(stream_to_client=True)

0 commit comments

Comments
 (0)