Skip to content

Commit 8f75f99

Browse files
committed
[monarch] The root client is just a PythonActor
This diff makes the root client actor just another `PythonActor`. # Why? Right now the monarch codebase is peppered with special handling to distinguish between normal python actors and the root client "actor", which has type `()` and is actually just a detached `Instance` with no actor loop; it therefore has no message handlers and can't even process supervision events. As a result, we have to wrap the current context's instance in a special `ContextInstance` enum, and everywhere we want to use it, we either have to use the `instance_dispatch!` macro, or insert code that looks like: ```rust match instance { ContextInstance::PythonActor(ins) => { do something }, ContextInstance::Client(ins) => { do something else }, } ``` This makes the code more error-prone and harder to understand, with the added complication that the client handling is often not idiomatic w.r.t hyperactor due to the lack of message handlers/actor loop. Some examples: - [Confusing supervision handling where `owner` might not be defined but `is_owned` is still true and so we need to call into a special `unhandled` function instead of continuing to propagate up the hierarchy](https://fburl.com/code/andy3ggr) - [The root client can't have child actors due to no supervision event handling, so they have to be spawned directly on the root client proc, and even then, there is no way for the supervision event to reach `monarch.actor.unhandled_fault_hook`](https://fburl.com/code/kqd2iwvc) - [The root client handles undeliverable messages via a bespoke tokio task/thread](https://fburl.com/code/jjgfy5d5) Making the root client a normal python actor solves these problems, because: - We don't need a `ContextInstance` enum anymore -- `PyInstance` *always* contains `Instance<PythonActor>`. - Supervision events follow a unified path as they bubble up through the hierarchy, and *every* unhandled event reaches `RootClientActor.__supervise__`, defined in python, without special handling. - The root client can handle undeliverable messages using `RootClientActor._handle_undeliverable_message`, defined in python, without special handling. # Navigating the code changes (guide for reviewers) There are a lot of file changes here but only some of them are important. I would recommend reviewing them in the following order: - `monarch/_src/actor/actor_mesh.py` - Defines the `RootClientActor` python class and its behavior. - `hyperactor/src/proc.rs` - Introduces `Proc::actor_instance::<A>(...)`, which returns a detached `A`-typed actor instance/handle, along with its supervision receiver, signal receiver and message receiver. - `monarch_hyperactor/src/actor.rs` - Introduces `PythonActor::bootstrap_client()`, which replaces `global_root_client()` in the root client context. This function starts the root client proc, spawns the `RootClientActor`, starts its actor loop and returns the `Instance<PythonActor>`. - The root client actor can now handle `SupervisionFailureMessage` just like every other actor in the hierarchy. - Implements `PythonActor::handle_supervision_event` to pass the event to the actor's `SupervisionFailureMessage` handler. This way, **every unhandled supervision event in the system makes its way to `RootClientActor.__supervise__` eventually**. - `monarch_hyperactor/src/v1/actor_mesh.rs` - Deletes the special handling from the actor states monitor like `is_owned` and the explicit `unhandled_fault_hook` call. If `owner` is defined, it forwards the `SupervisionFailureMessage`, or else it does nothing. - Fixes (what I think was) a bug in `send_state_change`. A supervision event should only be forwarded as `SupervisionFailureMessage` to `owner` if it represents a failure. With the logic before this diff, stopping an actor mesh from inside an actor endpoint would generate a supervision event that reaches `unhandled_fault_hook` and crashes the root process even if it was a healthy stop. - `monarch_hyperactor/src/context.rs` - Deletes `ContextInstance` and replaces it in `PyInstance` with `Instance<PythonActor>`. - The rest of the changes are pretty much just cleaning up `instance_dispatch!` calls. Differential Revision: [D87296357](https://our.internmc.facebook.com/intern/diff/D87296357/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D87296357/)! ghstack-source-id: 325421344 Pull Request resolved: #1985
1 parent 240ebec commit 8f75f99

File tree

23 files changed

+557
-638
lines changed

23 files changed

+557
-638
lines changed

hyperactor/src/actor.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -312,8 +312,10 @@ where
312312
/// with the ID of the actor being served.
313313
#[derive(Debug)]
314314
pub struct ActorError {
315-
pub(crate) actor_id: Box<ActorId>,
316-
pub(crate) kind: Box<ActorErrorKind>,
315+
/// The ActorId for the actor that generated this error.
316+
pub actor_id: Box<ActorId>,
317+
/// The kind of error that occurred.
318+
pub kind: Box<ActorErrorKind>,
317319
}
318320

319321
/// The kinds of actor serving errors.

hyperactor/src/proc.rs

Lines changed: 46 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -411,7 +411,9 @@ impl Proc {
411411
.map_err(|existing| anyhow::anyhow!("coordinator port is already set to {existing}"))
412412
}
413413

414-
fn handle_supervision_event(&self, event: ActorSupervisionEvent) {
414+
/// Handle a supervision event received by the proc. Attempt to forward it to the
415+
/// supervision coordinator port if one is set, otherwise crash the process.
416+
pub fn handle_supervision_event(&self, event: ActorSupervisionEvent) {
415417
let result = match self.state().supervision_coordinator_port.get() {
416418
Some(port) => port.send(event.clone()).map_err(anyhow::Error::from),
417419
None => Err(anyhow::anyhow!(
@@ -530,26 +532,46 @@ impl Proc {
530532
Ok(instance.start(actor, actor_loop_receivers.take().unwrap(), work_rx))
531533
}
532534

533-
/// Create and return an actor instance and its corresponding handle. This allows actors to be
534-
/// "inverted": the caller can use the returned [`Instance`] to send and receive messages,
535-
/// launch child actors, etc. The actor itself does not handle any messages, and supervision events
536-
/// are always forwarded to the proc. Otherwise the instance acts as a normal actor, and can be
537-
/// referenced and stopped.
535+
/// Wrapper for [`Proc::actor_instance::<()>`].
538536
pub fn instance(&self, name: &str) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> {
537+
let (instance, handle, ..) = self.actor_instance(name)?;
538+
539+
Ok((instance, handle))
540+
}
541+
542+
/// Create and return an actor instance, its corresponding handle, its signal port receiver,
543+
/// its supervision port receiver, and its message receiver. This allows actors to be
544+
/// "inverted": the caller can use the returned [`Instance`] to send and receive messages,
545+
/// launch child actors, etc. The actor itself does not handle any messages unless driven by
546+
/// the caller. Otherwise the instance acts as a normal actor, and can be referenced and
547+
/// stopped.
548+
pub fn actor_instance<A: Actor>(
549+
&self,
550+
name: &str,
551+
) -> Result<
552+
(
553+
Instance<A>,
554+
ActorHandle<A>,
555+
PortReceiver<ActorSupervisionEvent>,
556+
PortReceiver<Signal>,
557+
mpsc::UnboundedReceiver<WorkCell<A>>,
558+
),
559+
anyhow::Error,
560+
> {
539561
let actor_id = self.allocate_root_id(name)?;
540-
let _ = tracing::debug_span!(
562+
let span = tracing::debug_span!(
541563
"actor_instance",
542564
actor_name = name,
543-
actor_type = std::any::type_name::<()>(),
565+
actor_type = std::any::type_name::<A>(),
544566
actor_id = actor_id.to_string(),
545567
);
546-
547-
let (instance, _, _) = Instance::new(self.clone(), actor_id.clone(), true, None);
568+
let _guard = span.enter();
569+
let (instance, actor_loop_receivers, work_rx) =
570+
Instance::new(self.clone(), actor_id.clone(), false, None);
571+
let (signal_rx, supervision_rx) = actor_loop_receivers.unwrap();
548572
let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone());
549-
550573
instance.change_status(ActorStatus::Client);
551-
552-
Ok((instance, handle))
574+
Ok((instance, handle, supervision_rx, signal_rx, work_rx))
553575
}
554576

555577
/// Create a child instance. Called from `Instance`.
@@ -874,11 +896,11 @@ impl MailboxSender for WeakProc {
874896

875897
/// Represents a single work item used by the instance to dispatch to
876898
/// actor handles. Specifically, this enables handler polymorphism.
877-
struct WorkCell<A: Actor + Send>(
899+
pub struct WorkCell<A: Actor + Send>(
878900
Box<
879901
dyn for<'a> FnOnce(
880902
&'a mut A,
881-
&'a mut Instance<A>,
903+
&'a Instance<A>,
882904
)
883905
-> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + 'a + Send>>
884906
+ Send
@@ -891,7 +913,7 @@ impl<A: Actor + Send> WorkCell<A> {
891913
fn new(
892914
f: impl for<'a> FnOnce(
893915
&'a mut A,
894-
&'a mut Instance<A>,
916+
&'a Instance<A>,
895917
)
896918
-> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + 'a + Send>>
897919
+ Send
@@ -902,10 +924,10 @@ impl<A: Actor + Send> WorkCell<A> {
902924
}
903925

904926
/// Handle the message represented by this work cell.
905-
fn handle<'a>(
927+
pub fn handle<'a>(
906928
self,
907929
actor: &'a mut A,
908-
instance: &'a mut Instance<A>,
930+
instance: &'a Instance<A>,
909931
) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + 'a>> {
910932
(self.0)(actor, instance)
911933
}
@@ -1451,7 +1473,8 @@ impl<A: Actor> Instance<A> {
14511473
Ok(())
14521474
}
14531475

1454-
async fn handle_supervision_event(
1476+
/// Handle a supervision event using the provided actor.
1477+
pub async fn handle_supervision_event(
14551478
&self,
14561479
actor: &mut A,
14571480
supervision_event: ActorSupervisionEvent,
@@ -1483,7 +1506,7 @@ impl<A: Actor> Instance<A> {
14831506

14841507
#[hyperactor::instrument(fields(actor_id = self.self_id().to_string(), actor_name = self.self_id().name()))]
14851508
async unsafe fn handle_message<M: Message>(
1486-
&mut self,
1509+
&self,
14871510
actor: &mut A,
14881511
type_info: Option<&'static TypeInfo>,
14891512
headers: Attrs,
@@ -1519,8 +1542,8 @@ impl<A: Actor> Instance<A> {
15191542
actor.handle(&context, message).await
15201543
}
15211544

1522-
/// Spawn on child on this instance. Currently used only by cap::CanSpawn.
1523-
pub(crate) fn spawn<C: Actor>(&self, actor: C) -> anyhow::Result<ActorHandle<C>> {
1545+
/// Spawn on child on this instance.
1546+
pub fn spawn<C: Actor>(&self, actor: C) -> anyhow::Result<ActorHandle<C>> {
15241547
self.inner.proc.spawn_child(self.inner.cell.clone(), actor)
15251548
}
15261549

@@ -2041,7 +2064,7 @@ impl<A: Actor> Ports<A> {
20412064
let port = self.mailbox.open_enqueue_port(move |headers, msg: M| {
20422065
let seq_info = headers.get(SEQ_INFO).cloned();
20432066

2044-
let work = WorkCell::new(move |actor: &mut A, instance: &mut Instance<A>| {
2067+
let work = WorkCell::new(move |actor: &mut A, instance: &Instance<A>| {
20452068
Box::pin(async move {
20462069
// SAFETY: we guarantee that the passed type_info is for type M.
20472070
unsafe {

hyperactor_mesh/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ mod metrics;
2929
pub mod proc_mesh;
3030
pub mod reference;
3131
pub mod resource;
32-
mod router;
32+
pub mod router;
3333
pub mod shared_cell;
3434
pub mod shortuuid;
3535
#[cfg(target_os = "linux")]

monarch_extension/src/code_sync.rs

Lines changed: 44 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use hyperactor::context;
1818
use hyperactor_mesh::Mesh;
1919
use hyperactor_mesh::RootActorMesh;
2020
use hyperactor_mesh::shared_cell::SharedCell;
21+
use monarch_hyperactor;
2122
use monarch_hyperactor::code_sync::WorkspaceLocation;
2223
use monarch_hyperactor::code_sync::manager::CodeSyncManager;
2324
use monarch_hyperactor::code_sync::manager::CodeSyncManagerParams;
@@ -27,8 +28,6 @@ use monarch_hyperactor::code_sync::manager::WorkspaceConfig;
2728
use monarch_hyperactor::code_sync::manager::WorkspaceShape;
2829
use monarch_hyperactor::code_sync::manager::code_sync_mesh;
2930
use monarch_hyperactor::context::PyInstance;
30-
use monarch_hyperactor::instance_dispatch;
31-
use monarch_hyperactor::instance_into_dispatch;
3231
use monarch_hyperactor::proc_mesh::PyProcMesh;
3332
use monarch_hyperactor::runtime::signal_safe_block_on;
3433
use monarch_hyperactor::v1::proc_mesh::PyProcMesh as PyProcMeshV1;
@@ -279,32 +278,34 @@ impl CodeSyncMeshClient {
279278
if let Ok(v0) = proc_mesh.downcast::<PyProcMesh>() {
280279
let proc_mesh = v0.borrow().try_inner()?;
281280
signal_safe_block_on(py, async move {
282-
let actor_mesh = instance_dispatch!(client, |cx| {
283-
proc_mesh
284-
.spawn(cx, "code_sync_manager", &CodeSyncManagerParams {})
285-
.await?
286-
});
281+
let actor_mesh = proc_mesh
282+
.spawn(
283+
client.deref(),
284+
"code_sync_manager",
285+
&CodeSyncManagerParams {},
286+
)
287+
.await?;
287288
Ok(Self { actor_mesh })
288289
})?
289290
} else {
290291
let proc_mesh = proc_mesh.downcast::<PyProcMeshV1>()?.borrow().mesh_ref()?;
291292
signal_safe_block_on(py, async move {
292-
let actor_mesh = instance_dispatch!(client, |cx| {
293-
proc_mesh
294-
.spawn_service(cx, "code_sync_manager", &CodeSyncManagerParams {})
295-
.await
296-
.map_err(|err| PyException::new_err(err.to_string()))?
297-
});
298-
instance_dispatch!(client, |cx| {
299-
actor_mesh
300-
.cast(
301-
cx,
302-
SetActorMeshMessage {
303-
actor_mesh: actor_mesh.deref().clone(),
304-
},
305-
)
306-
.map_err(|err| PyException::new_err(err.to_string()))?
307-
});
293+
let actor_mesh = proc_mesh
294+
.spawn_service(
295+
client.deref(),
296+
"code_sync_manager",
297+
&CodeSyncManagerParams {},
298+
)
299+
.await
300+
.map_err(|err| PyException::new_err(err.to_string()))?;
301+
actor_mesh
302+
.cast(
303+
client.deref(),
304+
SetActorMeshMessage {
305+
actor_mesh: actor_mesh.deref().clone(),
306+
},
307+
)
308+
.map_err(|err| PyException::new_err(err.to_string()))?;
308309
Ok(Self {
309310
actor_mesh: SharedCell::from(RootActorMesh::from(actor_mesh)),
310311
})
@@ -324,19 +325,17 @@ impl CodeSyncMeshClient {
324325
) -> PyResult<Bound<'py, PyAny>> {
325326
let instance = instance.clone();
326327
let actor_mesh = self.actor_mesh.clone();
327-
instance_into_dispatch!(instance, |cx| {
328-
monarch_hyperactor::runtime::future_into_py(py, async move {
329-
CodeSyncMeshClient::sync_workspace_(
330-
&cx,
331-
actor_mesh,
332-
local,
333-
remote,
334-
method.into(),
335-
auto_reload,
336-
)
337-
.err_into()
338-
.await
339-
})
328+
monarch_hyperactor::runtime::future_into_py(py, async move {
329+
CodeSyncMeshClient::sync_workspace_(
330+
instance.deref(),
331+
actor_mesh,
332+
local,
333+
remote,
334+
method.into(),
335+
auto_reload,
336+
)
337+
.err_into()
338+
.await
340339
})
341340
}
342341

@@ -354,17 +353,15 @@ impl CodeSyncMeshClient {
354353
py,
355354
async move {
356355
for workspace in workspaces.into_iter() {
357-
instance_dispatch!(instance, async |cx| {
358-
CodeSyncMeshClient::sync_workspace_(
359-
cx,
360-
actor_mesh.clone(),
361-
workspace.local,
362-
workspace.remote,
363-
workspace.method.into(),
364-
auto_reload,
365-
)
366-
.await
367-
})?;
356+
CodeSyncMeshClient::sync_workspace_(
357+
instance.deref(),
358+
actor_mesh.clone(),
359+
workspace.local,
360+
workspace.remote,
361+
workspace.method.into(),
362+
auto_reload,
363+
)
364+
.await?;
368365
}
369366
anyhow::Ok(())
370367
}

monarch_extension/src/logging.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
#![allow(unsafe_op_in_unsafe_fn)]
1010

11+
use std::ops::Deref;
1112
use std::time::Duration;
1213

1314
use hyperactor::ActorHandle;
@@ -22,7 +23,6 @@ use hyperactor_mesh::logging::LogForwardMessage;
2223
use hyperactor_mesh::selection::Selection;
2324
use hyperactor_mesh::shared_cell::SharedCell;
2425
use monarch_hyperactor::context::PyInstance;
25-
use monarch_hyperactor::instance_dispatch;
2626
use monarch_hyperactor::logging::LoggerRuntimeActor;
2727
use monarch_hyperactor::logging::LoggerRuntimeMessage;
2828
use monarch_hyperactor::proc_mesh::PyProcMesh;
@@ -94,13 +94,10 @@ impl LoggingMeshClient {
9494
.client_proc()
9595
.spawn("log_client", LogClientActor::default())?;
9696
let client_actor_ref = client_actor.bind();
97-
let forwarder_mesh = instance_dispatch!(instance, |cx| {
98-
proc_mesh
99-
.spawn(cx, "log_forwarder", &client_actor_ref)
100-
.await?
101-
});
102-
let logger_mesh =
103-
instance_dispatch!(instance, |cx| { proc_mesh.spawn(cx, "logger", &()).await? });
97+
let forwarder_mesh = proc_mesh
98+
.spawn(instance.deref(), "log_forwarder", &client_actor_ref)
99+
.await?;
100+
let logger_mesh = proc_mesh.spawn(instance.deref(), "logger", &()).await?;
104101

105102
// Register flush_internal as a on-stop callback
106103
let client_actor_for_callback = client_actor.clone();

monarch_extension/src/mesh_controller.rs

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ use monarch_hyperactor::actor::PythonMessage;
4646
use monarch_hyperactor::actor::PythonMessageKind;
4747
use monarch_hyperactor::buffers::FrozenBuffer;
4848
use monarch_hyperactor::context::PyInstance;
49-
use monarch_hyperactor::instance_dispatch;
5049
use monarch_hyperactor::local_state_broker::LocalStateBrokerActor;
5150
use monarch_hyperactor::mailbox::PyPortId;
5251
use monarch_hyperactor::ndslice::PySlice;
@@ -140,17 +139,14 @@ impl _Controller {
140139
let id = NEXT_ID.fetch_add(1, atomic::Ordering::Relaxed);
141140
let controller_handle: Arc<Mutex<ActorHandle<MeshControllerActor>>> =
142141
signal_safe_block_on(py, async move {
143-
let controller_handle = instance_dispatch!(client, |instance| {
144-
instance.proc().spawn(
145-
&Name::new("mesh_controller").to_string(),
146-
MeshControllerActor::new(MeshControllerActorParams {
147-
proc_mesh,
148-
id,
149-
rank_map,
150-
})
151-
.await,
152-
)?
153-
});
142+
let controller_handle = client.spawn(
143+
MeshControllerActor::new(MeshControllerActorParams {
144+
proc_mesh,
145+
id,
146+
rank_map,
147+
})
148+
.await,
149+
)?;
154150
Ok::<_, anyhow::Error>(Arc::new(Mutex::new(controller_handle)))
155151
})??;
156152

@@ -231,8 +227,7 @@ impl _Controller {
231227
}
232228

233229
fn _drain_and_stop(&mut self, py: Python<'_>, instance: &PyInstance) -> PyResult<()> {
234-
let (stop_worker_port, stop_worker_receiver) =
235-
instance_dispatch!(instance, |cx_instance| { cx_instance.open_once_port() });
230+
let (stop_worker_port, stop_worker_receiver) = instance.open_once_port();
236231

237232
self.controller_handle
238233
.blocking_lock()
@@ -817,6 +812,10 @@ impl Actor for MeshControllerActor {
817812
self.brokers = Some(brokers);
818813
Ok(())
819814
}
815+
816+
fn display_name(&self) -> Option<String> {
817+
Some(format!("mesh_controller_{}", self.id))
818+
}
820819
}
821820

822821
impl Debug for MeshControllerActor {

0 commit comments

Comments
 (0)