Skip to content

Commit ccae6a8

Browse files
thomasywangfacebook-github-bot
authored andcommitted
Mesh slice supervision (#824)
Summary: Suppose we have a ProcMesh with 4 gpus. On this mesh we spawn a ActorMesh A, and an ActorMesh B. We create a slice of ActorMesh A SliceA_1 containing only gpu 4 and a slice SliceA_2 containing only gpu 1. If the Actor A on gpu 4 crashes we should have the following health states: - ActorMesh A is unhealthy (contains A gpu=4) - SliceA_1 is unhealthy (contains A gpu=4) - SliceA_2 is healthy (does not contain A gpu=4) - ActorMesh B is healthy (contains gpu=4 but not Actor A) Implementation: 1. All supervision event streams are created when a RootActorMesh is spawned. A tx-rx pair are created and the tx is inserted into a map acting as a router. 2. The router now holds onto a Vec of senders instead of a single sender. For each Actor mesh in the router, we can call bind to create another tx-rx pair. The router will manage the tx, sending a message using every tx for a given Actor mesh name every time there is a supervision event. The rx is returned to be used by mesh slices. 3. The spawned RootActorMesh gets a copy of the Arc holding the router so that mesh slices can bind to it 4. PythonActorMeshes contain a monitor which is just a loop that listens from the next supervision event from a stream. If a ::Crashed event comes in, we will update an Arc<> keeping track of the health state. This monitor will now also take in the shape of the mesh it is monitoring, and only update the health state to ::Crashed, if the crashed Actor is within the shape. 5. When a PythonActorMesh is sliced, a PythonActorMeshRef is created. We will add a monitor to PythonActorMeshRefs. It is an Option<> because if it is ever serialized and deserialized, we can no longer monitor it 6. When we cast to an PythonActorMeshRef, we will first check the health state and return a SupervisionError if the mesh is unhealthy Differential Revision: D79821712
1 parent 3db8984 commit ccae6a8

File tree

7 files changed

+331
-22
lines changed

7 files changed

+331
-22
lines changed

hyperactor_mesh/src/actor_mesh.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ use crate::comm::multicast::CastMessage;
5353
use crate::comm::multicast::CastMessageEnvelope;
5454
use crate::comm::multicast::Uslice;
5555
use crate::metrics;
56+
use crate::proc_mesh::ActorEventRouter;
5657
use crate::proc_mesh::ProcMesh;
5758
use crate::reference::ActorMeshId;
5859
use crate::reference::ActorMeshRef;
@@ -248,6 +249,7 @@ pub struct RootActorMesh<'a, A: RemoteActor> {
248249
// The receiver of supervision events. It is None if it has been transferred to
249250
// an actor event observer.
250251
actor_supervision_rx: Option<mpsc::UnboundedReceiver<ActorSupervisionEvent>>,
252+
actor_event_router: ActorEventRouter,
251253
}
252254

253255
impl<'a, A: RemoteActor> RootActorMesh<'a, A> {
@@ -256,12 +258,14 @@ impl<'a, A: RemoteActor> RootActorMesh<'a, A> {
256258
name: String,
257259
actor_supervision_rx: mpsc::UnboundedReceiver<ActorSupervisionEvent>,
258260
ranks: Vec<ActorRef<A>>,
261+
actor_event_router: ActorEventRouter,
259262
) -> Self {
260263
Self {
261264
proc_mesh: ProcMeshRef::Borrowed(proc_mesh),
262265
name,
263266
ranks,
264267
actor_supervision_rx: Some(actor_supervision_rx),
268+
actor_event_router,
265269
}
266270
}
267271

@@ -270,11 +274,13 @@ impl<'a, A: RemoteActor> RootActorMesh<'a, A> {
270274
name: String,
271275
actor_supervision_rx: mpsc::UnboundedReceiver<ActorSupervisionEvent>,
272276
ranks: Vec<ActorRef<A>>,
277+
actor_event_router: ActorEventRouter,
273278
) -> Self {
274279
Self {
275280
proc_mesh: ProcMeshRef::Shared(Box::new(proc_mesh)),
276281
name,
277282
ranks,
283+
actor_event_router,
278284
actor_supervision_rx: Some(actor_supervision_rx),
279285
}
280286
}
@@ -294,6 +300,10 @@ impl<'a, A: RemoteActor> RootActorMesh<'a, A> {
294300
mesh_id: self.id(),
295301
})
296302
}
303+
304+
pub fn actor_event_router(&self) -> &ActorEventRouter {
305+
&self.actor_event_router
306+
}
297307
}
298308

299309
/// Supervision event stream for actor mesh. It emits actor supervision events.
@@ -315,6 +325,16 @@ impl ActorSupervisionEvents {
315325
}
316326
result
317327
}
328+
329+
pub fn new(
330+
actor_supervision_rx: mpsc::UnboundedReceiver<ActorSupervisionEvent>,
331+
mesh_id: ActorMeshId,
332+
) -> Self {
333+
Self {
334+
actor_supervision_rx,
335+
mesh_id,
336+
}
337+
}
318338
}
319339

320340
#[async_trait]

hyperactor_mesh/src/proc_mesh.rs

Lines changed: 49 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,33 @@ pub fn global_mailbox() -> Mailbox {
9999
.clone()
100100
}
101101

102-
type ActorEventRouter = Arc<DashMap<ActorMeshName, mpsc::UnboundedSender<ActorSupervisionEvent>>>;
102+
#[derive(Clone, Debug)]
103+
pub struct ActorEventRouter {
104+
inner: Arc<DashMap<ActorMeshName, Vec<mpsc::UnboundedSender<ActorSupervisionEvent>>>>,
105+
}
106+
107+
impl ActorEventRouter {
108+
pub fn bind(&self, name: ActorMeshName) -> mpsc::UnboundedReceiver<ActorSupervisionEvent> {
109+
let (tx, rx) = mpsc::unbounded_channel();
110+
self.inner.entry(name).or_insert(vec![]).push(tx);
111+
rx
112+
}
113+
114+
fn new() -> Self {
115+
Self {
116+
inner: Arc::new(DashMap::new()),
117+
}
118+
}
119+
}
120+
121+
impl Deref for ActorEventRouter {
122+
type Target = DashMap<ActorMeshName, Vec<mpsc::UnboundedSender<ActorSupervisionEvent>>>;
123+
124+
fn deref(&self) -> &Self::Target {
125+
&self.inner
126+
}
127+
}
128+
103129
/// A ProcMesh maintains a mesh of procs whose lifecycles are managed by
104130
/// an allocator.
105131
pub struct ProcMesh {
@@ -335,7 +361,7 @@ impl ProcMesh {
335361
alloc: Box::new(alloc),
336362
supervision_events,
337363
}),
338-
actor_event_router: Arc::new(DashMap::new()),
364+
actor_event_router: ActorEventRouter::new(),
339365
shape,
340366
ranks: proc_ids
341367
.into_iter()
@@ -434,16 +460,13 @@ impl ProcMesh {
434460
where
435461
A::Params: RemoteMessage,
436462
{
437-
let (tx, rx) = mpsc::unbounded_channel::<ActorSupervisionEvent>();
438-
{
439-
// Instantiate supervision routing BEFORE spawning the actor mesh.
440-
self.actor_event_router.insert(actor_name.to_string(), tx);
441-
}
463+
let rx = self.actor_event_router.bind(actor_name.to_string());
442464
let root_mesh = RootActorMesh::new(
443465
self,
444466
actor_name.to_string(),
445467
rx,
446468
Self::spawn_on_procs::<A>(&self.client, self.agents(), actor_name, params).await?,
469+
self.actor_event_router.clone(),
447470
);
448471
Ok(root_mesh)
449472
}
@@ -515,10 +538,17 @@ impl ProcMesh {
515538
}
516539
}
517540
}
541+
self.actor_event_router.remove(&mesh_name.to_string());
518542
Ok(())
519543
}
520544
}
521545

546+
impl Drop for ProcMesh {
547+
fn drop(&mut self) {
548+
self.actor_event_router.clear();
549+
}
550+
}
551+
522552
/// Proc lifecycle events.
523553
#[derive(Debug, Clone)]
524554
pub enum ProcEvent {
@@ -596,8 +626,10 @@ impl ProcEvents {
596626
message_headers: None,
597627
caused_by: None,
598628
};
599-
if entry.value().send(event).is_err() {
600-
tracing::warn!("unable to transmit supervision event to actor {}", entry.key());
629+
for tx in entry.value().iter() {
630+
if tx.send(event.clone()).is_err() {
631+
tracing::warn!("unable to transmit supervision event to actor {}", entry.key());
632+
}
601633
}
602634
}
603635

@@ -631,9 +663,11 @@ impl ProcEvents {
631663
};
632664
// transmit to the correct root actor mesh.
633665
{
634-
if let Some(tx) = self.actor_event_router.get(actor_id.name()) {
635-
if tx.send(event).is_err() {
636-
tracing::warn!("unable to transmit supervision event to actor {}", actor_id);
666+
if let Some(txs) = self.actor_event_router.get(actor_id.name()) {
667+
for tx in txs.iter() {
668+
if tx.send(event.clone()).is_err() {
669+
tracing::warn!("unable to transmit supervision event to actor {}", actor_id);
670+
}
637671
}
638672
} else {
639673
tracing::warn!("received supervision event for unregistered actor {}", actor_id);
@@ -683,18 +717,16 @@ impl<D: Deref<Target = ProcMesh> + Send + Sync + 'static> SharedSpawnable for D
683717
where
684718
A::Params: RemoteMessage,
685719
{
686-
let (tx, rx) = mpsc::unbounded_channel::<ActorSupervisionEvent>();
687-
{
688-
// Instantiate supervision routing BEFORE spawning the actor mesh.
689-
self.actor_event_router.insert(actor_name.to_string(), tx);
690-
}
720+
let rx = self.actor_event_router.bind(actor_name.to_string());
691721
let ranks =
692722
ProcMesh::spawn_on_procs::<A>(&self.client, self.agents(), actor_name, params).await?;
723+
let actor_event_router = self.actor_event_router.clone();
693724
Ok(RootActorMesh::new_shared(
694725
self,
695726
actor_name.to_string(),
696727
rx,
697728
ranks,
729+
actor_event_router,
698730
))
699731
}
700732
}

0 commit comments

Comments
 (0)