Skip to content

Commit 54d25b4

Browse files
thomasywangfacebook-github-bot
authored andcommitted
Mesh slice supervision (#824)
Summary: Pull Request resolved: #824 Suppose we have a ProcMesh with 4 gpus. On this mesh we spawn a ActorMesh A, and an ActorMesh B. We create a slice of ActorMesh A SliceA_1 containing only gpu 4 and a slice SliceA_2 containing only gpu 1. If the Actor A on gpu 4 crashes we should have the following health states: - ActorMesh A is unhealthy (contains A gpu=4) - SliceA_1 is unhealthy (contains A gpu=4) - SliceA_2 is healthy (does not contain A gpu=4) - ActorMesh B is healthy (contains gpu=4 but not Actor A) Implementation: 1. All supervision event streams are created when a RootActorMesh is spawned. A tx-rx pair are created and the tx is inserted into a map acting as a router. 2. The router now holds onto a Vec of senders instead of a single sender. For each Actor mesh in the router, we can call bind to create another tx-rx pair. The router will manage the tx, sending a message using every tx for a given Actor mesh name every time there is a supervision event. The rx is returned to be used by mesh slices. 3. The spawned RootActorMesh gets a copy of the Arc holding the router so that mesh slices can bind to it 4. PythonActorMeshes contain a monitor which is just a loop that listens from the next supervision event from a stream. If a ::Crashed event comes in, we will update an Arc<> keeping track of the health state. This monitor will now also take in the shape of the mesh it is monitoring, and only update the health state to ::Crashed, if the crashed Actor is within the shape. 5. When a PythonActorMesh is sliced, a PythonActorMeshRef is created. We will add a monitor to PythonActorMeshRefs. It is an Option<> because if it is ever serialized and deserialized, we can no longer monitor it 6. When we cast to an PythonActorMeshRef, we will first check the health state and return a SupervisionError if the mesh is unhealthy Differential Revision: D79821712
1 parent fe1dcbc commit 54d25b4

File tree

4 files changed

+349
-20
lines changed

4 files changed

+349
-20
lines changed

hyperactor_mesh/src/actor_mesh.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ use crate::comm::multicast::CastMessageEnvelope;
5454
use crate::comm::multicast::Uslice;
5555
use crate::metrics;
5656
use crate::proc_mesh::ProcMesh;
57+
use crate::proc_mesh::SupervisionEventBroadcaster;
5758
use crate::reference::ActorMeshId;
5859
use crate::reference::ActorMeshRef;
5960
use crate::reference::ProcMeshId;
@@ -248,6 +249,7 @@ pub struct RootActorMesh<'a, A: RemoteActor> {
248249
// The receiver of supervision events. It is None if it has been transferred to
249250
// an actor event observer.
250251
actor_supervision_rx: Option<mpsc::UnboundedReceiver<ActorSupervisionEvent>>,
252+
supervision_event_broadcaster: SupervisionEventBroadcaster,
251253
}
252254

253255
impl<'a, A: RemoteActor> RootActorMesh<'a, A> {
@@ -256,12 +258,14 @@ impl<'a, A: RemoteActor> RootActorMesh<'a, A> {
256258
name: String,
257259
actor_supervision_rx: mpsc::UnboundedReceiver<ActorSupervisionEvent>,
258260
ranks: Vec<ActorRef<A>>,
261+
supervision_event_broadcaster: SupervisionEventBroadcaster,
259262
) -> Self {
260263
Self {
261264
proc_mesh: ProcMeshRef::Borrowed(proc_mesh),
262265
name,
263266
ranks,
264267
actor_supervision_rx: Some(actor_supervision_rx),
268+
supervision_event_broadcaster,
265269
}
266270
}
267271

@@ -270,11 +274,13 @@ impl<'a, A: RemoteActor> RootActorMesh<'a, A> {
270274
name: String,
271275
actor_supervision_rx: mpsc::UnboundedReceiver<ActorSupervisionEvent>,
272276
ranks: Vec<ActorRef<A>>,
277+
supervision_event_broadcaster: SupervisionEventBroadcaster,
273278
) -> Self {
274279
Self {
275280
proc_mesh: ProcMeshRef::Shared(Box::new(proc_mesh)),
276281
name,
277282
ranks,
283+
supervision_event_broadcaster,
278284
actor_supervision_rx: Some(actor_supervision_rx),
279285
}
280286
}
@@ -294,6 +300,10 @@ impl<'a, A: RemoteActor> RootActorMesh<'a, A> {
294300
mesh_id: self.id(),
295301
})
296302
}
303+
304+
pub fn supervision_event_broadcaster(&self) -> SupervisionEventBroadcaster {
305+
self.supervision_event_broadcaster.clone()
306+
}
297307
}
298308

299309
/// Supervision event stream for actor mesh. It emits actor supervision events.
@@ -315,6 +325,16 @@ impl ActorSupervisionEvents {
315325
}
316326
result
317327
}
328+
329+
pub fn new(
330+
actor_supervision_rx: mpsc::UnboundedReceiver<ActorSupervisionEvent>,
331+
mesh_id: ActorMeshId,
332+
) -> Self {
333+
Self {
334+
actor_supervision_rx,
335+
mesh_id,
336+
}
337+
}
318338
}
319339

320340
#[async_trait]

hyperactor_mesh/src/proc_mesh.rs

Lines changed: 60 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,49 @@ pub fn global_root_client() -> &'static Instance<()> {
101101
instance
102102
}
103103

104-
type ActorEventRouter = Arc<DashMap<ActorMeshName, mpsc::UnboundedSender<ActorSupervisionEvent>>>;
104+
type ActorEventRouter = Arc<DashMap<ActorMeshName, SupervisionEventBroadcaster>>;
105+
106+
/// Wrapper class around a Vec<mpsc::UnboundedSender<ActorSupervisionEvent>>>>
107+
/// allowing for new listeners to be added and events to be broadcasted
108+
#[derive(Clone, Debug)]
109+
pub struct SupervisionEventBroadcaster {
110+
inner: Arc<std::sync::Mutex<Vec<mpsc::UnboundedSender<ActorSupervisionEvent>>>>,
111+
}
112+
113+
impl Deref for SupervisionEventBroadcaster {
114+
type Target = Arc<std::sync::Mutex<Vec<mpsc::UnboundedSender<ActorSupervisionEvent>>>>;
115+
fn deref(&self) -> &Self::Target {
116+
&self.inner
117+
}
118+
}
119+
120+
impl SupervisionEventBroadcaster {
121+
fn new() -> Self {
122+
Self {
123+
inner: Arc::new(std::sync::Mutex::new(Vec::new())),
124+
}
125+
}
126+
pub fn subscribe(&self) -> mpsc::UnboundedReceiver<ActorSupervisionEvent> {
127+
let (tx, rx) = mpsc::unbounded_channel();
128+
{
129+
self.inner
130+
.lock()
131+
.unwrap_or_else(|err| err.into_inner())
132+
.push(tx);
133+
}
134+
rx
135+
}
136+
137+
fn broadcast(&self, event: ActorSupervisionEvent) {
138+
let mut guard = self.inner.lock().unwrap_or_else(|err| err.into_inner());
139+
for tx in guard.iter_mut() {
140+
tx.send(event.clone()).unwrap_or_else(|err| {
141+
tracing::warn!("failed to send supervision event: {err:?}");
142+
});
143+
}
144+
}
145+
}
146+
105147
/// A ProcMesh maintains a mesh of procs whose lifecycles are managed by
106148
/// an allocator.
107149
pub struct ProcMesh {
@@ -436,16 +478,20 @@ impl ProcMesh {
436478
where
437479
A::Params: RemoteMessage,
438480
{
439-
let (tx, rx) = mpsc::unbounded_channel::<ActorSupervisionEvent>();
481+
let supervision_event_broadcaster = SupervisionEventBroadcaster::new();
440482
{
441483
// Instantiate supervision routing BEFORE spawning the actor mesh.
442-
self.actor_event_router.insert(actor_name.to_string(), tx);
484+
self.actor_event_router.insert(
485+
actor_name.to_string(),
486+
supervision_event_broadcaster.clone(),
487+
);
443488
}
444489
let root_mesh = RootActorMesh::new(
445490
self,
446491
actor_name.to_string(),
447-
rx,
492+
supervision_event_broadcaster.subscribe(),
448493
Self::spawn_on_procs::<A>(&self.client, self.agents(), actor_name, params).await?,
494+
supervision_event_broadcaster,
449495
);
450496
Ok(root_mesh)
451497
}
@@ -517,6 +563,7 @@ impl ProcMesh {
517563
}
518564
}
519565
}
566+
self.actor_event_router.remove(&mesh_name.to_string());
520567
Ok(())
521568
}
522569
}
@@ -598,9 +645,7 @@ impl ProcEvents {
598645
message_headers: None,
599646
caused_by: None,
600647
};
601-
if entry.value().send(event).is_err() {
602-
tracing::warn!("unable to transmit supervision event to actor {}", entry.key());
603-
}
648+
entry.value().broadcast(event);
604649
}
605650

606651
break Some(ProcEvent::Stopped(*rank, reason));
@@ -634,9 +679,7 @@ impl ProcEvents {
634679
// transmit to the correct root actor mesh.
635680
{
636681
if let Some(tx) = self.actor_event_router.get(actor_id.name()) {
637-
if tx.send(event).is_err() {
638-
tracing::warn!("unable to transmit supervision event to actor {}", actor_id);
639-
}
682+
tx.broadcast(event.clone());
640683
} else {
641684
tracing::warn!("received supervision event for unregistered actor {}", actor_id);
642685
}
@@ -685,18 +728,22 @@ impl<D: Deref<Target = ProcMesh> + Send + Sync + 'static> SharedSpawnable for D
685728
where
686729
A::Params: RemoteMessage,
687730
{
688-
let (tx, rx) = mpsc::unbounded_channel::<ActorSupervisionEvent>();
731+
let supervision_event_broadcaster = SupervisionEventBroadcaster::new();
689732
{
690733
// Instantiate supervision routing BEFORE spawning the actor mesh.
691-
self.actor_event_router.insert(actor_name.to_string(), tx);
734+
self.actor_event_router.insert(
735+
actor_name.to_string(),
736+
supervision_event_broadcaster.clone(),
737+
);
692738
}
693739
let ranks =
694740
ProcMesh::spawn_on_procs::<A>(&self.client, self.agents(), actor_name, params).await?;
695741
Ok(RootActorMesh::new_shared(
696742
self,
697743
actor_name.to_string(),
698-
rx,
744+
supervision_event_broadcaster.subscribe(),
699745
ranks,
746+
supervision_event_broadcaster,
700747
))
701748
}
702749
}

0 commit comments

Comments
 (0)