Skip to content

Commit 7cbc75c

Browse files
James Sunfacebook-github-bot
authored andcommitted
disallow logging option for local procs (meta-pytorch#825)
Summary: there is no way to tail local procs; simply disallow it; I don't quite like the new interface introduced. RFC is needed. Differential Revision: D80063700
1 parent 6a64f2a commit 7cbc75c

File tree

4 files changed

+74
-46
lines changed

4 files changed

+74
-46
lines changed

hyperactor_mesh/src/logging.rs

Lines changed: 8 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ use hyperactor::Unbind;
3434
use hyperactor::channel;
3535
use hyperactor::channel::ChannelAddr;
3636
use hyperactor::channel::ChannelRx;
37-
use hyperactor::channel::ChannelTransport;
3837
use hyperactor::channel::ChannelTx;
3938
use hyperactor::channel::Rx;
4039
use hyperactor::channel::Tx;
@@ -539,6 +538,10 @@ impl<T: LogSender + Unpin + 'static, S: io::AsyncWrite + Send + Unpin + 'static>
539538
// Since LogSender::send takes &self, we don't need to clone it
540539
if let Err(e) = this.log_sender.send(output_target, data_to_send) {
541540
tracing::error!("error sending log: {}", e);
541+
return Poll::Ready(Err(io::Error::other(format!(
542+
"error sending write message: {}",
543+
e
544+
))));
542545
}
543546
// Return success with the full buffer size
544547
Poll::Ready(Ok(buf.len()))
@@ -610,37 +613,15 @@ impl Actor for LogForwardActor {
610613
async fn new(logging_client_ref: Self::Params) -> Result<Self> {
611614
let log_channel: ChannelAddr = match std::env::var(BOOTSTRAP_LOG_CHANNEL) {
612615
Ok(channel) => channel.parse()?,
613-
Err(err) => {
614-
tracing::debug!(
615-
"log forwarder actor failed to read env var {}: {}",
616-
BOOTSTRAP_LOG_CHANNEL,
617-
err
618-
);
619-
// TODO: an empty channel to serve
620-
ChannelAddr::any(ChannelTransport::Unix)
621-
}
616+
Err(err) => return Err(err.into()),
622617
};
623618
tracing::info!(
624619
"log forwarder {} serve at {}",
625620
std::process::id(),
626621
log_channel
627622
);
628623

629-
let rx = match channel::serve(log_channel.clone()).await {
630-
Ok((_, rx)) => rx,
631-
Err(err) => {
632-
// This can happen if we are not spanwed on a separate process like local.
633-
// For local mesh, log streaming anyway is not needed.
634-
tracing::error!(
635-
"log forwarder actor failed to bootstrap on given channel {}: {}",
636-
log_channel,
637-
err
638-
);
639-
channel::serve(ChannelAddr::any(ChannelTransport::Unix))
640-
.await?
641-
.1
642-
}
643-
};
624+
let (_, rx) = channel::serve(log_channel.clone()).await?;
644625
Ok(Self {
645626
rx,
646627
logging_client_ref,
@@ -743,15 +724,7 @@ impl Actor for LogFlushActor {
743724
async fn new(_: ()) -> Result<Self, anyhow::Error> {
744725
let log_channel: ChannelAddr = match std::env::var(BOOTSTRAP_LOG_CHANNEL) {
745726
Ok(channel) => channel.parse()?,
746-
Err(err) => {
747-
tracing::debug!(
748-
"log forwarder actor failed to read env var {}: {}",
749-
BOOTSTRAP_LOG_CHANNEL,
750-
err
751-
);
752-
// TODO: this should error out; it can only happen with local proc; we need to fix it.
753-
ChannelAddr::any(ChannelTransport::Unix)
754-
}
727+
Err(err) => return Err(err.into()),
755728
};
756729
let tx = channel::dial::<LogMessage>(log_channel)?;
757730

@@ -1014,6 +987,7 @@ mod tests {
1014987

1015988
use hyperactor::channel;
1016989
use hyperactor::channel::ChannelAddr;
990+
use hyperactor::channel::ChannelTransport;
1017991
use hyperactor::channel::ChannelTx;
1018992
use hyperactor::channel::Tx;
1019993
use hyperactor::id;

python/monarch/_src/actor/allocator.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
class AllocHandle(DeprecatedNotAFuture):
3434
_hy_alloc: "Shared[Alloc]"
3535
_extent: Dict[str, int]
36+
_fork_processes: bool
3637

3738
@property
3839
def initialized(self) -> Future[Literal[True]]:
@@ -48,6 +49,10 @@ async def task() -> Literal[True]:
4849

4950
return Future(coro=task())
5051

52+
@property
53+
def fork_processes(self) -> bool:
54+
return self._fork_processes
55+
5156

5257
class AllocateMixin(abc.ABC):
5358
@abc.abstractmethod
@@ -63,7 +68,14 @@ def allocate(self, spec: AllocSpec) -> "AllocHandle":
6368
Returns:
6469
- A future that will be fulfilled when the requested allocation is fulfilled.
6570
"""
66-
return AllocHandle(self.allocate_nonblocking(spec).spawn(), spec.extent)
71+
return AllocHandle(
72+
self.allocate_nonblocking(spec).spawn(),
73+
spec.extent,
74+
self._fork_processes(),
75+
)
76+
77+
@abc.abstractmethod
78+
def _fork_processes(self) -> bool: ...
6779

6880

6981
@final
@@ -72,20 +84,29 @@ class ProcessAllocator(ProcessAllocatorBase, AllocateMixin):
7284
An allocator that allocates by spawning local processes.
7385
"""
7486

87+
def _fork_processes(self) -> bool:
88+
return True
89+
7590

7691
@final
7792
class LocalAllocator(LocalAllocatorBase, AllocateMixin):
7893
"""
7994
An allocator that allocates by spawning actors into the current process.
8095
"""
8196

97+
def _fork_processes(self) -> bool:
98+
return False
99+
82100

83101
@final
84102
class SimAllocator(SimAllocatorBase, AllocateMixin):
85103
"""
86104
An allocator that allocates by spawning actors into the current process using simulated channels for transport
87105
"""
88106

107+
def _fork_processes(self) -> bool:
108+
return False
109+
89110

90111
class RemoteAllocInitializer(abc.ABC):
91112
"""Subclass-able Python interface for `hyperactor_mesh::alloc::remoteprocess:RemoteProcessAllocInitializer`.
@@ -219,3 +240,6 @@ class RemoteAllocator(RemoteAllocatorBase, AllocateMixin):
219240
An allocator that allocates by spawning actors on a remote host.
220241
The remote host must be running hyperactor's remote-process-allocator.
221242
"""
243+
244+
def _fork_processes(self) -> bool:
245+
return True

python/monarch/_src/actor/proc_mesh.py

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ def __init__(
124124
self,
125125
hy_proc_mesh: "Shared[HyProcMesh]",
126126
shape: Shape,
127+
_fork_processes: bool,
127128
_device_mesh: Optional["DeviceMesh"] = None,
128129
) -> None:
129130
self._proc_mesh = hy_proc_mesh
@@ -137,6 +138,7 @@ def __init__(
137138
self._code_sync_client: Optional[CodeSyncMeshClient] = None
138139
self._logging_mesh_client: Optional[LoggingMeshClient] = None
139140
self._maybe_device_mesh: Optional["DeviceMesh"] = _device_mesh
141+
self._fork_processes = _fork_processes
140142
self._stopped = False
141143

142144
@property
@@ -154,27 +156,34 @@ async def task() -> Literal[True]:
154156

155157
return Future(coro=task())
156158

157-
def _init_manager_actors(self, setup: Callable[[], None] | None = None) -> None:
159+
def _init_manager_actors(
160+
self, setup: Callable[[], None] | None = None, _fork_processes: bool = True
161+
) -> None:
158162
self._proc_mesh = PythonTask.from_coroutine(
159-
self._init_manager_actors_coro(self._proc_mesh, setup)
163+
self._init_manager_actors_coro(self._proc_mesh, setup, _fork_processes)
160164
).spawn()
161165

162166
async def _init_manager_actors_coro(
163167
self,
164168
proc_mesh_: "Shared[HyProcMesh]",
165169
setup: Callable[[], None] | None = None,
170+
_fork_processes: bool = True,
166171
) -> "HyProcMesh":
167172
# WARNING: it is unsafe to await self._proc_mesh here
168173
# because self._proc_mesh is the result of this function itself!
169174

170175
proc_mesh = await proc_mesh_
171176

172-
self._logging_mesh_client = await LoggingMeshClient.spawn(proc_mesh=proc_mesh)
173-
self._logging_mesh_client.set_mode(
174-
stream_to_client=True,
175-
aggregate_window_sec=3,
176-
level=logging.INFO,
177-
)
177+
if _fork_processes:
178+
# logging mesh is only makes sense with forked (remote or local) processes
179+
self._logging_mesh_client = await LoggingMeshClient.spawn(
180+
proc_mesh=proc_mesh
181+
)
182+
self._logging_mesh_client.set_mode(
183+
stream_to_client=True,
184+
aggregate_window_sec=3,
185+
level=logging.INFO,
186+
)
178187

179188
_rdma_manager = (
180189
# type: ignore[16]
@@ -211,7 +220,12 @@ def _new_with_shape(self, shape: Shape) -> "ProcMesh":
211220
if self._maybe_device_mesh is None
212221
else self._device_mesh._new_with_shape(shape)
213222
)
214-
pm = ProcMesh(self._proc_mesh, shape, _device_mesh=device_mesh)
223+
pm = ProcMesh(
224+
self._proc_mesh,
225+
shape,
226+
_device_mesh=device_mesh,
227+
_fork_processes=self._fork_processes,
228+
)
215229
pm._slice = True
216230
return pm
217231

@@ -283,10 +297,12 @@ async def task() -> HyProcMesh:
283297
list(alloc._extent.keys()),
284298
Slice.new_row_major(list(alloc._extent.values())),
285299
)
286-
pm = ProcMesh(PythonTask.from_coroutine(task()).spawn(), shape)
300+
pm = ProcMesh(
301+
PythonTask.from_coroutine(task()).spawn(), shape, alloc.fork_processes
302+
)
287303

288304
if _init_manager_actors:
289-
pm._init_manager_actors(setup)
305+
pm._init_manager_actors(setup, alloc.fork_processes)
290306
# we do this here rather than in _init_manager_actors
291307
# because serializing debug_client() requires waiting on
292308
# its actor to spawn which would block the tokio event loop inside
@@ -407,6 +423,11 @@ async def logging_option(
407423
Returns:
408424
None
409425
"""
426+
if not self._fork_processes:
427+
raise RuntimeError(
428+
"Logging option is only available for allocators that fork processes. Allocators like LocalAllocator are not supported."
429+
)
430+
410431
if level < 0 or level > 255:
411432
raise ValueError("Invalid logging level: {}".format(level))
412433
await self.initialized

python/tests/test_python_actors.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1040,3 +1040,12 @@ def test_mesh_len():
10401040
proc_mesh = local_proc_mesh(gpus=12).get()
10411041
s = proc_mesh.spawn("sync_actor", SyncActor).get()
10421042
assert 12 == len(s)
1043+
1044+
1045+
async def test_logging_option_on_local_procs() -> None:
1046+
proc_mesh = local_proc_mesh(gpus=1)
1047+
with pytest.raises(
1048+
RuntimeError,
1049+
match="Logging option is only available for allocators that fork processes",
1050+
):
1051+
await proc_mesh.logging_option(stream_to_client=True)

0 commit comments

Comments
 (0)