Skip to content

Mesh slice supervision #824

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions hyperactor_mesh/src/actor_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use crate::comm::multicast::CastMessage;
use crate::comm::multicast::CastMessageEnvelope;
use crate::comm::multicast::Uslice;
use crate::metrics;
use crate::proc_mesh::ActorEventRouter;
use crate::proc_mesh::ProcMesh;
use crate::reference::ActorMeshId;
use crate::reference::ActorMeshRef;
Expand Down Expand Up @@ -248,6 +249,7 @@ pub struct RootActorMesh<'a, A: RemoteActor> {
// The receiver of supervision events. It is None if it has been transferred to
// an actor event observer.
actor_supervision_rx: Option<mpsc::UnboundedReceiver<ActorSupervisionEvent>>,
actor_event_router: ActorEventRouter,
}

impl<'a, A: RemoteActor> RootActorMesh<'a, A> {
Expand All @@ -256,12 +258,14 @@ impl<'a, A: RemoteActor> RootActorMesh<'a, A> {
name: String,
actor_supervision_rx: mpsc::UnboundedReceiver<ActorSupervisionEvent>,
ranks: Vec<ActorRef<A>>,
actor_event_router: ActorEventRouter,
) -> Self {
Self {
proc_mesh: ProcMeshRef::Borrowed(proc_mesh),
name,
ranks,
actor_supervision_rx: Some(actor_supervision_rx),
actor_event_router,
}
}

Expand All @@ -270,11 +274,13 @@ impl<'a, A: RemoteActor> RootActorMesh<'a, A> {
name: String,
actor_supervision_rx: mpsc::UnboundedReceiver<ActorSupervisionEvent>,
ranks: Vec<ActorRef<A>>,
actor_event_router: ActorEventRouter,
) -> Self {
Self {
proc_mesh: ProcMeshRef::Shared(Box::new(proc_mesh)),
name,
ranks,
actor_event_router,
actor_supervision_rx: Some(actor_supervision_rx),
}
}
Expand All @@ -294,6 +300,10 @@ impl<'a, A: RemoteActor> RootActorMesh<'a, A> {
mesh_id: self.id(),
})
}

pub fn actor_event_router(&self) -> &ActorEventRouter {
&self.actor_event_router
}
}

/// Supervision event stream for actor mesh. It emits actor supervision events.
Expand All @@ -315,6 +325,16 @@ impl ActorSupervisionEvents {
}
result
}

pub fn new(
actor_supervision_rx: mpsc::UnboundedReceiver<ActorSupervisionEvent>,
mesh_id: ActorMeshId,
) -> Self {
Self {
actor_supervision_rx,
mesh_id,
}
}
}

#[async_trait]
Expand Down
66 changes: 49 additions & 17 deletions hyperactor_mesh/src/proc_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,33 @@ pub fn global_mailbox() -> Mailbox {
.clone()
}

type ActorEventRouter = Arc<DashMap<ActorMeshName, mpsc::UnboundedSender<ActorSupervisionEvent>>>;
#[derive(Clone, Debug)]
pub struct ActorEventRouter {
inner: Arc<DashMap<ActorMeshName, Vec<mpsc::UnboundedSender<ActorSupervisionEvent>>>>,
}

impl ActorEventRouter {
pub fn bind(&self, name: ActorMeshName) -> mpsc::UnboundedReceiver<ActorSupervisionEvent> {
let (tx, rx) = mpsc::unbounded_channel();
self.inner.entry(name).or_insert(vec![]).push(tx);
rx
}

fn new() -> Self {
Self {
inner: Arc::new(DashMap::new()),
}
}
}

impl Deref for ActorEventRouter {
type Target = DashMap<ActorMeshName, Vec<mpsc::UnboundedSender<ActorSupervisionEvent>>>;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

/// A ProcMesh maintains a mesh of procs whose lifecycles are managed by
/// an allocator.
pub struct ProcMesh {
Expand Down Expand Up @@ -335,7 +361,7 @@ impl ProcMesh {
alloc: Box::new(alloc),
supervision_events,
}),
actor_event_router: Arc::new(DashMap::new()),
actor_event_router: ActorEventRouter::new(),
shape,
ranks: proc_ids
.into_iter()
Expand Down Expand Up @@ -434,16 +460,13 @@ impl ProcMesh {
where
A::Params: RemoteMessage,
{
let (tx, rx) = mpsc::unbounded_channel::<ActorSupervisionEvent>();
{
// Instantiate supervision routing BEFORE spawning the actor mesh.
self.actor_event_router.insert(actor_name.to_string(), tx);
}
let rx = self.actor_event_router.bind(actor_name.to_string());
let root_mesh = RootActorMesh::new(
self,
actor_name.to_string(),
rx,
Self::spawn_on_procs::<A>(&self.client, self.agents(), actor_name, params).await?,
self.actor_event_router.clone(),
);
Ok(root_mesh)
}
Expand Down Expand Up @@ -515,10 +538,17 @@ impl ProcMesh {
}
}
}
self.actor_event_router.remove(&mesh_name.to_string());
Ok(())
}
}

impl Drop for ProcMesh {
fn drop(&mut self) {
self.actor_event_router.clear();
}
}

/// Proc lifecycle events.
#[derive(Debug, Clone)]
pub enum ProcEvent {
Expand Down Expand Up @@ -596,8 +626,10 @@ impl ProcEvents {
message_headers: None,
caused_by: None,
};
if entry.value().send(event).is_err() {
tracing::warn!("unable to transmit supervision event to actor {}", entry.key());
for tx in entry.value().iter() {
if tx.send(event.clone()).is_err() {
tracing::warn!("unable to transmit supervision event to actor {}", entry.key());
}
}
}

Expand Down Expand Up @@ -631,9 +663,11 @@ impl ProcEvents {
};
// transmit to the correct root actor mesh.
{
if let Some(tx) = self.actor_event_router.get(actor_id.name()) {
if tx.send(event).is_err() {
tracing::warn!("unable to transmit supervision event to actor {}", actor_id);
if let Some(txs) = self.actor_event_router.get(actor_id.name()) {
for tx in txs.iter() {
if tx.send(event.clone()).is_err() {
tracing::warn!("unable to transmit supervision event to actor {}", actor_id);
}
}
} else {
tracing::warn!("received supervision event for unregistered actor {}", actor_id);
Expand Down Expand Up @@ -683,18 +717,16 @@ impl<D: Deref<Target = ProcMesh> + Send + Sync + 'static> SharedSpawnable for D
where
A::Params: RemoteMessage,
{
let (tx, rx) = mpsc::unbounded_channel::<ActorSupervisionEvent>();
{
// Instantiate supervision routing BEFORE spawning the actor mesh.
self.actor_event_router.insert(actor_name.to_string(), tx);
}
let rx = self.actor_event_router.bind(actor_name.to_string());
let ranks =
ProcMesh::spawn_on_procs::<A>(&self.client, self.agents(), actor_name, params).await?;
let actor_event_router = self.actor_event_router.clone();
Ok(RootActorMesh::new_shared(
self,
actor_name.to_string(),
rx,
ranks,
actor_event_router,
))
}
}
Expand Down
Loading
Loading