Skip to content

Commit 0c49435

Browse files
James Sunfacebook-github-bot
authored andcommitted
disallow logging option for local procs (#825)
Summary: Pull Request resolved: #825 there is no way to tail local procs; simply disallow it; I don't quite like the new interface introduced. RFC is needed. Differential Revision: D80063700
1 parent eb07c1c commit 0c49435

File tree

4 files changed

+74
-46
lines changed

4 files changed

+74
-46
lines changed

hyperactor_mesh/src/logging.rs

Lines changed: 8 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ use hyperactor::Unbind;
3434
use hyperactor::channel;
3535
use hyperactor::channel::ChannelAddr;
3636
use hyperactor::channel::ChannelRx;
37-
use hyperactor::channel::ChannelTransport;
3837
use hyperactor::channel::ChannelTx;
3938
use hyperactor::channel::Rx;
4039
use hyperactor::channel::Tx;
@@ -539,6 +538,10 @@ impl<T: LogSender + Unpin + 'static, S: io::AsyncWrite + Send + Unpin + 'static>
539538
// Since LogSender::send takes &self, we don't need to clone it
540539
if let Err(e) = this.log_sender.send(output_target, data_to_send) {
541540
tracing::error!("error sending log: {}", e);
541+
return Poll::Ready(Err(io::Error::other(format!(
542+
"error sending write message: {}",
543+
e
544+
))));
542545
}
543546
// Return success with the full buffer size
544547
Poll::Ready(Ok(buf.len()))
@@ -610,37 +613,15 @@ impl Actor for LogForwardActor {
610613
async fn new(logging_client_ref: Self::Params) -> Result<Self> {
611614
let log_channel: ChannelAddr = match std::env::var(BOOTSTRAP_LOG_CHANNEL) {
612615
Ok(channel) => channel.parse()?,
613-
Err(err) => {
614-
tracing::debug!(
615-
"log forwarder actor failed to read env var {}: {}",
616-
BOOTSTRAP_LOG_CHANNEL,
617-
err
618-
);
619-
// TODO: an empty channel to serve
620-
ChannelAddr::any(ChannelTransport::Unix)
621-
}
616+
Err(err) => return Err(err.into()),
622617
};
623618
tracing::info!(
624619
"log forwarder {} serve at {}",
625620
std::process::id(),
626621
log_channel
627622
);
628623

629-
let rx = match channel::serve(log_channel.clone()).await {
630-
Ok((_, rx)) => rx,
631-
Err(err) => {
632-
// This can happen if we are not spanwed on a separate process like local.
633-
// For local mesh, log streaming anyway is not needed.
634-
tracing::error!(
635-
"log forwarder actor failed to bootstrap on given channel {}: {}",
636-
log_channel,
637-
err
638-
);
639-
channel::serve(ChannelAddr::any(ChannelTransport::Unix))
640-
.await?
641-
.1
642-
}
643-
};
624+
let (_, rx) = channel::serve(log_channel.clone()).await?;
644625
Ok(Self {
645626
rx,
646627
logging_client_ref,
@@ -743,15 +724,7 @@ impl Actor for LogFlushActor {
743724
async fn new(_: ()) -> Result<Self, anyhow::Error> {
744725
let log_channel: ChannelAddr = match std::env::var(BOOTSTRAP_LOG_CHANNEL) {
745726
Ok(channel) => channel.parse()?,
746-
Err(err) => {
747-
tracing::debug!(
748-
"log forwarder actor failed to read env var {}: {}",
749-
BOOTSTRAP_LOG_CHANNEL,
750-
err
751-
);
752-
// TODO: this should error out; it can only happen with local proc; we need to fix it.
753-
ChannelAddr::any(ChannelTransport::Unix)
754-
}
727+
Err(err) => return Err(err.into()),
755728
};
756729
let tx = channel::dial::<LogMessage>(log_channel)?;
757730

@@ -1025,6 +998,7 @@ mod tests {
1025998

1026999
use hyperactor::channel;
10271000
use hyperactor::channel::ChannelAddr;
1001+
use hyperactor::channel::ChannelTransport;
10281002
use hyperactor::channel::ChannelTx;
10291003
use hyperactor::channel::Tx;
10301004
use hyperactor::id;

python/monarch/_src/actor/allocator.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
class AllocHandle(DeprecatedNotAFuture):
3434
_hy_alloc: "Shared[Alloc]"
3535
_extent: Dict[str, int]
36+
_fork_processes: bool
3637

3738
@property
3839
def initialized(self) -> Future[Literal[True]]:
@@ -48,6 +49,10 @@ async def task() -> Literal[True]:
4849

4950
return Future(coro=task())
5051

52+
@property
53+
def fork_processes(self) -> bool:
54+
return self._fork_processes
55+
5156

5257
class AllocateMixin(abc.ABC):
5358
@abc.abstractmethod
@@ -63,7 +68,14 @@ def allocate(self, spec: AllocSpec) -> "AllocHandle":
6368
Returns:
6469
- A future that will be fulfilled when the requested allocation is fulfilled.
6570
"""
66-
return AllocHandle(self.allocate_nonblocking(spec).spawn(), spec.extent)
71+
return AllocHandle(
72+
self.allocate_nonblocking(spec).spawn(),
73+
spec.extent,
74+
self._fork_processes(),
75+
)
76+
77+
@abc.abstractmethod
78+
def _fork_processes(self) -> bool: ...
6779

6880

6981
@final
@@ -72,20 +84,29 @@ class ProcessAllocator(ProcessAllocatorBase, AllocateMixin):
7284
An allocator that allocates by spawning local processes.
7385
"""
7486

87+
def _fork_processes(self) -> bool:
88+
return True
89+
7590

7691
@final
7792
class LocalAllocator(LocalAllocatorBase, AllocateMixin):
7893
"""
7994
An allocator that allocates by spawning actors into the current process.
8095
"""
8196

97+
def _fork_processes(self) -> bool:
98+
return False
99+
82100

83101
@final
84102
class SimAllocator(SimAllocatorBase, AllocateMixin):
85103
"""
86104
An allocator that allocates by spawning actors into the current process using simulated channels for transport
87105
"""
88106

107+
def _fork_processes(self) -> bool:
108+
return False
109+
89110

90111
class RemoteAllocInitializer(abc.ABC):
91112
"""Subclass-able Python interface for `hyperactor_mesh::alloc::remoteprocess:RemoteProcessAllocInitializer`.
@@ -219,3 +240,6 @@ class RemoteAllocator(RemoteAllocatorBase, AllocateMixin):
219240
An allocator that allocates by spawning actors on a remote host.
220241
The remote host must be running hyperactor's remote-process-allocator.
221242
"""
243+
244+
def _fork_processes(self) -> bool:
245+
return True

python/monarch/_src/actor/proc_mesh.py

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ def __init__(
142142
self,
143143
hy_proc_mesh: "Shared[HyProcMesh]",
144144
shape: Shape,
145+
_fork_processes: bool,
145146
_device_mesh: Optional["DeviceMesh"] = None,
146147
) -> None:
147148
self._proc_mesh = hy_proc_mesh
@@ -155,6 +156,7 @@ def __init__(
155156
self._code_sync_client: Optional[CodeSyncMeshClient] = None
156157
self._logging_mesh_client: Optional[LoggingMeshClient] = None
157158
self._maybe_device_mesh: Optional["DeviceMesh"] = _device_mesh
159+
self._fork_processes = _fork_processes
158160
self._stopped = False
159161

160162
@property
@@ -172,27 +174,34 @@ async def task() -> Literal[True]:
172174

173175
return Future(coro=task())
174176

175-
def _init_manager_actors(self, setup: Callable[[], None] | None = None) -> None:
177+
def _init_manager_actors(
178+
self, setup: Callable[[], None] | None = None, _fork_processes: bool = True
179+
) -> None:
176180
self._proc_mesh = PythonTask.from_coroutine(
177-
self._init_manager_actors_coro(self._proc_mesh, setup)
181+
self._init_manager_actors_coro(self._proc_mesh, setup, _fork_processes)
178182
).spawn()
179183

180184
async def _init_manager_actors_coro(
181185
self,
182186
proc_mesh_: "Shared[HyProcMesh]",
183187
setup: Callable[[], None] | None = None,
188+
_fork_processes: bool = True,
184189
) -> "HyProcMesh":
185190
# WARNING: it is unsafe to await self._proc_mesh here
186191
# because self._proc_mesh is the result of this function itself!
187192

188193
proc_mesh = await proc_mesh_
189194

190-
self._logging_mesh_client = await LoggingMeshClient.spawn(proc_mesh=proc_mesh)
191-
self._logging_mesh_client.set_mode(
192-
stream_to_client=True,
193-
aggregate_window_sec=3,
194-
level=logging.INFO,
195-
)
195+
if _fork_processes:
196+
# logging mesh is only makes sense with forked (remote or local) processes
197+
self._logging_mesh_client = await LoggingMeshClient.spawn(
198+
proc_mesh=proc_mesh
199+
)
200+
self._logging_mesh_client.set_mode(
201+
stream_to_client=True,
202+
aggregate_window_sec=3,
203+
level=logging.INFO,
204+
)
196205

197206
_rdma_manager = (
198207
# type: ignore[16]
@@ -229,7 +238,12 @@ def _new_with_shape(self, shape: Shape) -> "ProcMesh":
229238
if self._maybe_device_mesh is None
230239
else self._device_mesh._new_with_shape(shape)
231240
)
232-
pm = ProcMesh(self._proc_mesh, shape, _device_mesh=device_mesh)
241+
pm = ProcMesh(
242+
self._proc_mesh,
243+
shape,
244+
_device_mesh=device_mesh,
245+
_fork_processes=self._fork_processes,
246+
)
233247
pm._slice = True
234248
return pm
235249

@@ -301,10 +315,12 @@ async def task() -> HyProcMesh:
301315
list(alloc._extent.keys()),
302316
Slice.new_row_major(list(alloc._extent.values())),
303317
)
304-
pm = ProcMesh(PythonTask.from_coroutine(task()).spawn(), shape)
318+
pm = ProcMesh(
319+
PythonTask.from_coroutine(task()).spawn(), shape, alloc.fork_processes
320+
)
305321

306322
if _init_manager_actors:
307-
pm._init_manager_actors(setup)
323+
pm._init_manager_actors(setup, alloc.fork_processes)
308324
# we do this here rather than in _init_manager_actors
309325
# because serializing debug_client() requires waiting on
310326
# its actor to spawn which would block the tokio event loop inside
@@ -450,6 +466,11 @@ async def logging_option(
450466
Returns:
451467
None
452468
"""
469+
if not self._fork_processes:
470+
raise RuntimeError(
471+
"Logging option is only available for allocators that fork processes. Allocators like LocalAllocator are not supported."
472+
)
473+
453474
if level < 0 or level > 255:
454475
raise ValueError("Invalid logging level: {}".format(level))
455476
await self.initialized

python/tests/test_python_actors.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1092,3 +1092,12 @@ def test_mesh_len():
10921092
proc_mesh = local_proc_mesh(gpus=12).get()
10931093
s = proc_mesh.spawn("sync_actor", SyncActor).get()
10941094
assert 12 == len(s)
1095+
1096+
1097+
async def test_logging_option_on_local_procs() -> None:
1098+
proc_mesh = local_proc_mesh(gpus=1)
1099+
with pytest.raises(
1100+
RuntimeError,
1101+
match="Logging option is only available for allocators that fork processes",
1102+
):
1103+
await proc_mesh.logging_option(stream_to_client=True)

0 commit comments

Comments
 (0)