Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions hyperactor/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,10 @@ where
/// with the ID of the actor being served.
#[derive(Debug)]
pub struct ActorError {
pub(crate) actor_id: Box<ActorId>,
pub(crate) kind: Box<ActorErrorKind>,
/// The ActorId for the actor that generated this error.
pub actor_id: Box<ActorId>,
/// The kind of error that occurred.
pub kind: Box<ActorErrorKind>,
}

/// The kinds of actor serving errors.
Expand Down
69 changes: 46 additions & 23 deletions hyperactor/src/proc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,9 @@ impl Proc {
.map_err(|existing| anyhow::anyhow!("coordinator port is already set to {existing}"))
}

fn handle_supervision_event(&self, event: ActorSupervisionEvent) {
/// Handle a supervision event received by the proc. Attempt to forward it to the
/// supervision coordinator port if one is set, otherwise crash the process.
pub fn handle_supervision_event(&self, event: ActorSupervisionEvent) {
let result = match self.state().supervision_coordinator_port.get() {
Some(port) => port.send(event.clone()).map_err(anyhow::Error::from),
None => Err(anyhow::anyhow!(
Expand Down Expand Up @@ -530,26 +532,46 @@ impl Proc {
Ok(instance.start(actor, actor_loop_receivers.take().unwrap(), work_rx))
}

/// Create and return an actor instance and its corresponding handle. This allows actors to be
/// "inverted": the caller can use the returned [`Instance`] to send and receive messages,
/// launch child actors, etc. The actor itself does not handle any messages, and supervision events
/// are always forwarded to the proc. Otherwise the instance acts as a normal actor, and can be
/// referenced and stopped.
/// Wrapper for [`Proc::actor_instance::<()>`].
pub fn instance(&self, name: &str) -> Result<(Instance<()>, ActorHandle<()>), anyhow::Error> {
let (instance, handle, ..) = self.actor_instance(name)?;

Ok((instance, handle))
}

/// Create and return an actor instance, its corresponding handle, its signal port receiver,
/// its supervision port receiver, and its message receiver. This allows actors to be
/// "inverted": the caller can use the returned [`Instance`] to send and receive messages,
/// launch child actors, etc. The actor itself does not handle any messages unless driven by
/// the caller. Otherwise the instance acts as a normal actor, and can be referenced and
/// stopped.
pub fn actor_instance<A: Actor>(
&self,
name: &str,
) -> Result<
(
Instance<A>,
ActorHandle<A>,
PortReceiver<ActorSupervisionEvent>,
PortReceiver<Signal>,
mpsc::UnboundedReceiver<WorkCell<A>>,
),
anyhow::Error,
> {
let actor_id = self.allocate_root_id(name)?;
let _ = tracing::debug_span!(
let span = tracing::debug_span!(
"actor_instance",
actor_name = name,
actor_type = std::any::type_name::<()>(),
actor_type = std::any::type_name::<A>(),
actor_id = actor_id.to_string(),
);

let (instance, _, _) = Instance::new(self.clone(), actor_id.clone(), true, None);
let _guard = span.enter();
let (instance, actor_loop_receivers, work_rx) =
Instance::new(self.clone(), actor_id.clone(), false, None);
let (signal_rx, supervision_rx) = actor_loop_receivers.unwrap();
let handle = ActorHandle::new(instance.inner.cell.clone(), instance.inner.ports.clone());

instance.change_status(ActorStatus::Client);

Ok((instance, handle))
Ok((instance, handle, supervision_rx, signal_rx, work_rx))
}

/// Create a child instance. Called from `Instance`.
Expand Down Expand Up @@ -874,11 +896,11 @@ impl MailboxSender for WeakProc {

/// Represents a single work item used by the instance to dispatch to
/// actor handles. Specifically, this enables handler polymorphism.
struct WorkCell<A: Actor + Send>(
pub struct WorkCell<A: Actor + Send>(
Box<
dyn for<'a> FnOnce(
&'a mut A,
&'a mut Instance<A>,
&'a Instance<A>,
)
-> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + 'a + Send>>
+ Send
Expand All @@ -891,7 +913,7 @@ impl<A: Actor + Send> WorkCell<A> {
fn new(
f: impl for<'a> FnOnce(
&'a mut A,
&'a mut Instance<A>,
&'a Instance<A>,
)
-> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + 'a + Send>>
+ Send
Expand All @@ -902,10 +924,10 @@ impl<A: Actor + Send> WorkCell<A> {
}

/// Handle the message represented by this work cell.
fn handle<'a>(
pub fn handle<'a>(
self,
actor: &'a mut A,
instance: &'a mut Instance<A>,
instance: &'a Instance<A>,
) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send + 'a>> {
(self.0)(actor, instance)
}
Expand Down Expand Up @@ -1451,7 +1473,8 @@ impl<A: Actor> Instance<A> {
Ok(())
}

async fn handle_supervision_event(
/// Handle a supervision event using the provided actor.
pub async fn handle_supervision_event(
&self,
actor: &mut A,
supervision_event: ActorSupervisionEvent,
Expand Down Expand Up @@ -1483,7 +1506,7 @@ impl<A: Actor> Instance<A> {

#[hyperactor::instrument(fields(actor_id = self.self_id().to_string(), actor_name = self.self_id().name()))]
async unsafe fn handle_message<M: Message>(
&mut self,
&self,
actor: &mut A,
type_info: Option<&'static TypeInfo>,
headers: Attrs,
Expand Down Expand Up @@ -1519,8 +1542,8 @@ impl<A: Actor> Instance<A> {
actor.handle(&context, message).await
}

/// Spawn on child on this instance. Currently used only by cap::CanSpawn.
pub(crate) fn spawn<C: Actor>(&self, actor: C) -> anyhow::Result<ActorHandle<C>> {
/// Spawn on child on this instance.
pub fn spawn<C: Actor>(&self, actor: C) -> anyhow::Result<ActorHandle<C>> {
self.inner.proc.spawn_child(self.inner.cell.clone(), actor)
}

Expand Down Expand Up @@ -2041,7 +2064,7 @@ impl<A: Actor> Ports<A> {
let port = self.mailbox.open_enqueue_port(move |headers, msg: M| {
let seq_info = headers.get(SEQ_INFO).cloned();

let work = WorkCell::new(move |actor: &mut A, instance: &mut Instance<A>| {
let work = WorkCell::new(move |actor: &mut A, instance: &Instance<A>| {
Box::pin(async move {
// SAFETY: we guarantee that the passed type_info is for type M.
unsafe {
Expand Down
2 changes: 1 addition & 1 deletion hyperactor_mesh/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ mod metrics;
pub mod proc_mesh;
pub mod reference;
pub mod resource;
mod router;
pub mod router;
pub mod shared_cell;
pub mod shortuuid;
#[cfg(target_os = "linux")]
Expand Down
91 changes: 44 additions & 47 deletions monarch_extension/src/code_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use hyperactor::context;
use hyperactor_mesh::Mesh;
use hyperactor_mesh::RootActorMesh;
use hyperactor_mesh::shared_cell::SharedCell;
use monarch_hyperactor;
use monarch_hyperactor::code_sync::WorkspaceLocation;
use monarch_hyperactor::code_sync::manager::CodeSyncManager;
use monarch_hyperactor::code_sync::manager::CodeSyncManagerParams;
Expand All @@ -27,8 +28,6 @@ use monarch_hyperactor::code_sync::manager::WorkspaceConfig;
use monarch_hyperactor::code_sync::manager::WorkspaceShape;
use monarch_hyperactor::code_sync::manager::code_sync_mesh;
use monarch_hyperactor::context::PyInstance;
use monarch_hyperactor::instance_dispatch;
use monarch_hyperactor::instance_into_dispatch;
use monarch_hyperactor::proc_mesh::PyProcMesh;
use monarch_hyperactor::runtime::signal_safe_block_on;
use monarch_hyperactor::v1::proc_mesh::PyProcMesh as PyProcMeshV1;
Expand Down Expand Up @@ -279,32 +278,34 @@ impl CodeSyncMeshClient {
if let Ok(v0) = proc_mesh.downcast::<PyProcMesh>() {
let proc_mesh = v0.borrow().try_inner()?;
signal_safe_block_on(py, async move {
let actor_mesh = instance_dispatch!(client, |cx| {
proc_mesh
.spawn(cx, "code_sync_manager", &CodeSyncManagerParams {})
.await?
});
let actor_mesh = proc_mesh
.spawn(
client.deref(),
"code_sync_manager",
&CodeSyncManagerParams {},
)
.await?;
Ok(Self { actor_mesh })
})?
} else {
let proc_mesh = proc_mesh.downcast::<PyProcMeshV1>()?.borrow().mesh_ref()?;
signal_safe_block_on(py, async move {
let actor_mesh = instance_dispatch!(client, |cx| {
proc_mesh
.spawn_service(cx, "code_sync_manager", &CodeSyncManagerParams {})
.await
.map_err(|err| PyException::new_err(err.to_string()))?
});
instance_dispatch!(client, |cx| {
actor_mesh
.cast(
cx,
SetActorMeshMessage {
actor_mesh: actor_mesh.deref().clone(),
},
)
.map_err(|err| PyException::new_err(err.to_string()))?
});
let actor_mesh = proc_mesh
.spawn_service(
client.deref(),
"code_sync_manager",
&CodeSyncManagerParams {},
)
.await
.map_err(|err| PyException::new_err(err.to_string()))?;
actor_mesh
.cast(
client.deref(),
SetActorMeshMessage {
actor_mesh: actor_mesh.deref().clone(),
},
)
.map_err(|err| PyException::new_err(err.to_string()))?;
Ok(Self {
actor_mesh: SharedCell::from(RootActorMesh::from(actor_mesh)),
})
Expand All @@ -324,19 +325,17 @@ impl CodeSyncMeshClient {
) -> PyResult<Bound<'py, PyAny>> {
let instance = instance.clone();
let actor_mesh = self.actor_mesh.clone();
instance_into_dispatch!(instance, |cx| {
monarch_hyperactor::runtime::future_into_py(py, async move {
CodeSyncMeshClient::sync_workspace_(
&cx,
actor_mesh,
local,
remote,
method.into(),
auto_reload,
)
.err_into()
.await
})
monarch_hyperactor::runtime::future_into_py(py, async move {
CodeSyncMeshClient::sync_workspace_(
instance.deref(),
actor_mesh,
local,
remote,
method.into(),
auto_reload,
)
.err_into()
.await
})
}

Expand All @@ -354,17 +353,15 @@ impl CodeSyncMeshClient {
py,
async move {
for workspace in workspaces.into_iter() {
instance_dispatch!(instance, async |cx| {
CodeSyncMeshClient::sync_workspace_(
cx,
actor_mesh.clone(),
workspace.local,
workspace.remote,
workspace.method.into(),
auto_reload,
)
.await
})?;
CodeSyncMeshClient::sync_workspace_(
instance.deref(),
actor_mesh.clone(),
workspace.local,
workspace.remote,
workspace.method.into(),
auto_reload,
)
.await?;
}
anyhow::Ok(())
}
Expand Down
13 changes: 5 additions & 8 deletions monarch_extension/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#![allow(unsafe_op_in_unsafe_fn)]

use std::ops::Deref;
use std::time::Duration;

use hyperactor::ActorHandle;
Expand All @@ -22,7 +23,6 @@ use hyperactor_mesh::logging::LogForwardMessage;
use hyperactor_mesh::selection::Selection;
use hyperactor_mesh::shared_cell::SharedCell;
use monarch_hyperactor::context::PyInstance;
use monarch_hyperactor::instance_dispatch;
use monarch_hyperactor::logging::LoggerRuntimeActor;
use monarch_hyperactor::logging::LoggerRuntimeMessage;
use monarch_hyperactor::proc_mesh::PyProcMesh;
Expand Down Expand Up @@ -94,13 +94,10 @@ impl LoggingMeshClient {
.client_proc()
.spawn("log_client", LogClientActor::default())?;
let client_actor_ref = client_actor.bind();
let forwarder_mesh = instance_dispatch!(instance, |cx| {
proc_mesh
.spawn(cx, "log_forwarder", &client_actor_ref)
.await?
});
let logger_mesh =
instance_dispatch!(instance, |cx| { proc_mesh.spawn(cx, "logger", &()).await? });
let forwarder_mesh = proc_mesh
.spawn(instance.deref(), "log_forwarder", &client_actor_ref)
.await?;
let logger_mesh = proc_mesh.spawn(instance.deref(), "logger", &()).await?;

// Register flush_internal as a on-stop callback
let client_actor_for_callback = client_actor.clone();
Expand Down
27 changes: 13 additions & 14 deletions monarch_extension/src/mesh_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ use monarch_hyperactor::actor::PythonMessage;
use monarch_hyperactor::actor::PythonMessageKind;
use monarch_hyperactor::buffers::FrozenBuffer;
use monarch_hyperactor::context::PyInstance;
use monarch_hyperactor::instance_dispatch;
use monarch_hyperactor::local_state_broker::LocalStateBrokerActor;
use monarch_hyperactor::mailbox::PyPortId;
use monarch_hyperactor::ndslice::PySlice;
Expand Down Expand Up @@ -140,17 +139,14 @@ impl _Controller {
let id = NEXT_ID.fetch_add(1, atomic::Ordering::Relaxed);
let controller_handle: Arc<Mutex<ActorHandle<MeshControllerActor>>> =
signal_safe_block_on(py, async move {
let controller_handle = instance_dispatch!(client, |instance| {
instance.proc().spawn(
&Name::new("mesh_controller").to_string(),
MeshControllerActor::new(MeshControllerActorParams {
proc_mesh,
id,
rank_map,
})
.await,
)?
});
let controller_handle = client.spawn(
MeshControllerActor::new(MeshControllerActorParams {
proc_mesh,
id,
rank_map,
})
.await,
)?;
Ok::<_, anyhow::Error>(Arc::new(Mutex::new(controller_handle)))
})??;

Expand Down Expand Up @@ -231,8 +227,7 @@ impl _Controller {
}

fn _drain_and_stop(&mut self, py: Python<'_>, instance: &PyInstance) -> PyResult<()> {
let (stop_worker_port, stop_worker_receiver) =
instance_dispatch!(instance, |cx_instance| { cx_instance.open_once_port() });
let (stop_worker_port, stop_worker_receiver) = instance.open_once_port();

self.controller_handle
.blocking_lock()
Expand Down Expand Up @@ -817,6 +812,10 @@ impl Actor for MeshControllerActor {
self.brokers = Some(brokers);
Ok(())
}

fn display_name(&self) -> Option<String> {
Some(format!("mesh_controller_{}", self.id))
}
}

impl Debug for MeshControllerActor {
Expand Down
Loading
Loading