Skip to content

Commit e8e6268

Browse files
dulinrileymeta-codesync[bot]
authored andcommitted
Add supervision_event monitor to v1 ActorMeshRef (#1454)
Summary: Pull Request resolved: #1454 Part of #1209 Implement `supervision_event` for the v1 PythonActorMesh and References, for parity with the v0 API. This does not yet implement the goal of calling `__supervise__` on the owning PythonActor, but that'll be the next step. For parity with v0, had to add a few things: * For the "monitor", poll the `actor_states` API every few seconds. If any Actor has collected supervision events, surface those as a SupervisionError to python. This should be optimized into a push-based API instead, or one based on accumulation since we only need the first event. * Before doing a `cast`, have to check if any previous cast had encountered a supervision error, to avoid sending an undeliverable message * Disallow spawning new actors on a ProcMeshAgent which has had a supervision event in its past * If there's an error that kills the whole proc (process exit, signal, etc.), the agent itself will be down. Augment the `actor_states` API to handle this by reporting stopped for all remaining ranks. Reviewed By: colin2328, pablorfb-meta Differential Revision: D82494380 fbshipit-source-id: 789586bd89e1d1164438509912b0a565f5174c36
1 parent cd0e0e3 commit e8e6268

File tree

12 files changed

+617
-105
lines changed

12 files changed

+617
-105
lines changed

hyperactor_mesh/src/proc_mesh.rs

Lines changed: 3 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ use tracing::span;
5959

6060
use crate::CommActor;
6161
use crate::Mesh;
62-
use crate::actor_mesh::CAST_ACTOR_MESH_ID;
6362
use crate::actor_mesh::RootActorMesh;
6463
use crate::alloc::Alloc;
6564
use crate::alloc::AllocExt;
@@ -73,7 +72,7 @@ use crate::proc_mesh::mesh_agent::GspawnResult;
7372
use crate::proc_mesh::mesh_agent::MeshAgentMessageClient;
7473
use crate::proc_mesh::mesh_agent::ProcMeshAgent;
7574
use crate::proc_mesh::mesh_agent::StopActorResult;
76-
use crate::reference::ActorMeshId;
75+
use crate::proc_mesh::mesh_agent::update_event_actor_id;
7776
use crate::reference::ProcMeshId;
7877
use crate::router;
7978
use crate::shortuuid::ShortUuid;
@@ -770,7 +769,7 @@ impl ProcEvents {
770769
// rewriting `actor_id` to a synthetic mesh-level id
771770
// so that routing reaches the correct `ActorMesh`
772771
// subscribers.
773-
Ok(mut event) = self.event_state.supervision_events.recv() => {
772+
Ok(event) = self.event_state.supervision_events.recv() => {
774773
let had_headers = event.message_headers.is_some();
775774
tracing::info!(
776775
name = SupervisionEventState::SupervisionEventReceived.as_ref(),
@@ -782,33 +781,7 @@ impl ProcEvents {
782781
tracing::debug!(?event, "proc supervision: full event");
783782

784783
// Normalize events that came via the comm tree.
785-
if let Some(headers) = &event.message_headers {
786-
if let Some(actor_mesh_id) = headers.get(CAST_ACTOR_MESH_ID) {
787-
match actor_mesh_id {
788-
ActorMeshId::V0(proc_mesh_id, actor_name) => {
789-
let old_actor = event.actor_id.clone();
790-
event.actor_id = ActorId(
791-
ProcId::Ranked(WorldId(proc_mesh_id.0.clone()), 0),
792-
actor_name.clone(),
793-
0,
794-
);
795-
tracing::debug!(
796-
actor_id = %old_actor,
797-
"proc supervision: remapped comm-actor id to mesh id from CAST_ACTOR_MESH_ID {}", event.actor_id
798-
);
799-
}
800-
ActorMeshId::V1(_) => {
801-
tracing::debug!("proc supervision: headers present but V1 ActorMeshId; leaving actor_id unchanged");
802-
}
803-
}
804-
} else {
805-
tracing::debug!(
806-
"proc supervision: headers present but no CAST_ACTOR_MESH_ID; leaving actor_id unchanged"
807-
);
808-
}
809-
} else {
810-
tracing::debug!("proc supervision: no headers attached; leaving actor_id unchanged");
811-
}
784+
let event = update_event_actor_id(event);
812785

813786
// Forward the supervision event to the ActorMesh (keyed by its mesh name)
814787
// that registered for events in this ProcMesh. The routing table

hyperactor_mesh/src/proc_mesh/mesh_agent.rs

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use hyperactor::PortRef;
3434
use hyperactor::ProcId;
3535
use hyperactor::RefClient;
3636
use hyperactor::Unbind;
37+
use hyperactor::WorldId;
3738
use hyperactor::actor::ActorStatus;
3839
use hyperactor::actor::remote::Remote;
3940
use hyperactor::channel;
@@ -52,7 +53,9 @@ use hyperactor::supervision::ActorSupervisionEvent;
5253
use serde::Deserialize;
5354
use serde::Serialize;
5455

56+
use crate::actor_mesh::CAST_ACTOR_MESH_ID;
5557
use crate::proc_mesh::SupervisionEventState;
58+
use crate::reference::ActorMeshId;
5659
use crate::resource;
5760
use crate::v1::Name;
5861

@@ -171,6 +174,41 @@ struct ActorInstanceState {
171174
spawn: Result<ActorId, anyhow::Error>,
172175
}
173176

177+
/// Normalize events that came via the comm tree. Updates their actor id based on
178+
/// the message headers for the event.
179+
pub(crate) fn update_event_actor_id(mut event: ActorSupervisionEvent) -> ActorSupervisionEvent {
180+
if let Some(headers) = &event.message_headers {
181+
if let Some(actor_mesh_id) = headers.get(CAST_ACTOR_MESH_ID) {
182+
match actor_mesh_id {
183+
ActorMeshId::V0(proc_mesh_id, actor_name) => {
184+
let old_actor = event.actor_id.clone();
185+
event.actor_id = ActorId(
186+
ProcId::Ranked(WorldId(proc_mesh_id.0.clone()), 0),
187+
actor_name.clone(),
188+
0,
189+
);
190+
tracing::debug!(
191+
actor_id = %old_actor,
192+
"proc supervision: remapped comm-actor id to mesh id from CAST_ACTOR_MESH_ID {}", event.actor_id
193+
);
194+
}
195+
ActorMeshId::V1(_) => {
196+
tracing::debug!(
197+
"proc supervision: headers present but V1 ActorMeshId; leaving actor_id unchanged"
198+
);
199+
}
200+
}
201+
} else {
202+
tracing::debug!(
203+
"proc supervision: headers present but no CAST_ACTOR_MESH_ID; leaving actor_id unchanged"
204+
);
205+
}
206+
} else {
207+
tracing::debug!("proc supervision: no headers attached; leaving actor_id unchanged");
208+
}
209+
event
210+
}
211+
174212
/// A mesh agent is responsible for managing procs in a [`ProcMesh`].
175213
#[derive(Debug)]
176214
#[hyperactor::export(
@@ -397,8 +435,13 @@ impl Handler<ActorSupervisionEvent> for ProcMeshAgent {
397435
cx: &Context<Self>,
398436
event: ActorSupervisionEvent,
399437
) -> anyhow::Result<()> {
438+
let event = update_event_actor_id(event);
400439
if self.record_supervision_events {
401-
tracing::info!("Received supervision event: {:?}, recording", event);
440+
tracing::info!(
441+
"Received supervision event on proc {}: {:?}, recording",
442+
self.proc.proc_id(),
443+
event
444+
);
402445
self.supervision_events
403446
.entry(event.actor_id.clone())
404447
.or_default()
@@ -457,6 +500,22 @@ impl Handler<resource::CreateOrUpdate<ActorSpec>> for ProcMeshAgent {
457500
// There is no update.
458501
return Ok(());
459502
}
503+
let create_rank = create_or_update.rank.unwrap();
504+
// If there have been supervision events for any actors on this proc,
505+
// we disallow spawning new actors on it, as this proc may be in an
506+
// invalid state.
507+
if !self.supervision_events.is_empty() {
508+
self.actor_states.insert(
509+
create_or_update.name.clone(),
510+
ActorInstanceState {
511+
spawn: Err(anyhow::anyhow!(
512+
"Cannot spawn new actors on mesh with supervision events"
513+
)),
514+
create_rank,
515+
},
516+
);
517+
return Ok(());
518+
}
460519

461520
let ActorSpec {
462521
actor_type,
@@ -465,7 +524,7 @@ impl Handler<resource::CreateOrUpdate<ActorSpec>> for ProcMeshAgent {
465524
self.actor_states.insert(
466525
create_or_update.name.clone(),
467526
ActorInstanceState {
468-
create_rank: create_or_update.rank.unwrap(),
527+
create_rank,
469528
spawn: self
470529
.remote
471530
.gspawn(

hyperactor_mesh/src/v1/actor_mesh.rs

Lines changed: 151 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ impl<A: Referable> ActorMesh<A> {
7070
current_ref,
7171
}
7272
}
73+
74+
pub fn name(&self) -> &Name {
75+
&self.name
76+
}
7377
}
7478

7579
impl<A: Referable> Deref for ActorMesh<A> {
@@ -491,7 +495,7 @@ mod tests {
491495
}
492496

493497
#[async_timed_test(timeout_secs = 30)]
494-
async fn test_actor_states() {
498+
async fn test_actor_states_with_panic() {
495499
hyperactor_telemetry::initialize_logging_for_test();
496500

497501
let instance = testing::instance().await;
@@ -526,22 +530,20 @@ mod tests {
526530
// status such that when a process switches to unhealthy it sets a
527531
// supervision event.
528532
let supervision_task = tokio::spawn(async move {
529-
match actor_mesh.actor_states(&instance).await {
530-
Ok(events) => {
531-
for state in events.values() {
532-
supervisor.send(instance, state.clone()).unwrap();
533-
}
534-
}
535-
Err(e) => {
536-
println!("error: {:?}", e);
537-
}
538-
};
533+
let events = actor_mesh.actor_states(&instance).await.unwrap();
534+
for state in events.values() {
535+
supervisor.send(instance, state.clone()).unwrap();
536+
}
539537
});
540538
// Make sure the task completes first without a panic.
541539
supervision_task.await.unwrap();
542540

543541
for _ in 0..num_replicas {
544-
let state = supervision_receiver.recv().await.unwrap();
542+
let state = RealClock
543+
.timeout(Duration::from_secs(10), supervision_receiver.recv())
544+
.await
545+
.expect("timeout")
546+
.unwrap();
545547
if let resource::Status::Failed(s) = state.status {
546548
assert!(s.contains("supervision events"));
547549
} else {
@@ -558,6 +560,143 @@ mod tests {
558560
}
559561
}
560562

563+
#[async_timed_test(timeout_secs = 30)]
564+
async fn test_actor_states_with_process_exit() {
565+
hyperactor_telemetry::initialize_logging_for_test();
566+
567+
let instance = testing::instance().await;
568+
// Listen for supervision events sent to the parent instance.
569+
let (supervision_port, mut supervision_receiver) =
570+
instance.open_port::<resource::State<ActorState>>();
571+
let supervisor = supervision_port.bind();
572+
let num_replicas = 4;
573+
let meshes = testing::proc_meshes(instance, extent!(replicas = num_replicas)).await;
574+
let proc_mesh = &meshes[1];
575+
let child_name = Name::new("child");
576+
577+
let actor_mesh = proc_mesh
578+
.spawn_with_name::<testactor::TestActor>(instance, child_name.clone(), &())
579+
.await
580+
.unwrap();
581+
582+
actor_mesh
583+
.cast(
584+
instance,
585+
testactor::CauseSupervisionEvent(testactor::SupervisionEventType::ProcessExit(1)),
586+
)
587+
.unwrap();
588+
589+
// Wait for the casted message to cause a process exit on all actors.
590+
// We can't use a reply port because the handler for the message will
591+
// by definition not complete and send a reply.
592+
#[allow(clippy::disallowed_methods)]
593+
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
594+
595+
// Now that all ranks have completed, set up a continuous poll of the
596+
// status such that when a process switches to unhealthy it sets a
597+
// supervision event.
598+
let supervision_task = tokio::spawn(async move {
599+
let events = actor_mesh.actor_states(&instance).await.unwrap();
600+
for state in events.values() {
601+
supervisor.send(instance, state.clone()).unwrap();
602+
}
603+
});
604+
// Make sure the task completes first without a panic.
605+
RealClock
606+
.timeout(Duration::from_secs(10), supervision_task)
607+
.await
608+
.expect("timeout")
609+
.unwrap();
610+
611+
for _ in 0..num_replicas {
612+
let state = RealClock
613+
.timeout(Duration::from_secs(10), supervision_receiver.recv())
614+
.await
615+
.expect("timeout")
616+
.unwrap();
617+
assert_matches!(state.status, resource::Status::Stopped);
618+
let events = state
619+
.state
620+
.expect("state should be present")
621+
.supervision_events;
622+
assert_eq!(events.len(), 1);
623+
assert_eq!(events[0].actor_status, ActorStatus::Stopped);
624+
}
625+
}
626+
627+
#[async_timed_test(timeout_secs = 30)]
628+
async fn test_actor_states_on_sliced_mesh() {
629+
hyperactor_telemetry::initialize_logging_for_test();
630+
631+
let instance = testing::instance().await;
632+
// Listen for supervision events sent to the parent instance.
633+
let (supervision_port, mut supervision_receiver) =
634+
instance.open_port::<resource::State<ActorState>>();
635+
let supervisor = supervision_port.bind();
636+
let num_replicas = 4;
637+
let meshes = testing::proc_meshes(instance, extent!(replicas = num_replicas)).await;
638+
let proc_mesh = &meshes[1];
639+
let child_name = Name::new("child");
640+
641+
let actor_mesh = proc_mesh
642+
.spawn_with_name::<testactor::TestActor>(instance, child_name.clone(), &())
643+
.await
644+
.unwrap();
645+
let sliced = actor_mesh
646+
.range("replicas", 1..3)
647+
.expect("slice should be valid");
648+
let sliced_replicas = sliced.len();
649+
650+
sliced
651+
.cast(
652+
instance,
653+
testactor::CauseSupervisionEvent(testactor::SupervisionEventType::Panic),
654+
)
655+
.unwrap();
656+
657+
// Wait for the casted message to cause a process exit on all actors.
658+
// We can't use a reply port because the handler for the message will
659+
// by definition not complete and send a reply.
660+
#[allow(clippy::disallowed_methods)]
661+
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
662+
663+
// Now that all ranks have completed, set up a continuous poll of the
664+
// status such that when a process switches to unhealthy it sets a
665+
// supervision event.
666+
let supervision_task = tokio::spawn(async move {
667+
let events = sliced.actor_states(&instance).await.unwrap();
668+
for state in events.values() {
669+
supervisor.send(instance, state.clone()).unwrap();
670+
}
671+
});
672+
// Make sure the task completes first without a panic.
673+
RealClock
674+
.timeout(Duration::from_secs(10), supervision_task)
675+
.await
676+
.expect("timeout")
677+
.unwrap();
678+
679+
for _ in 0..sliced_replicas {
680+
let state = RealClock
681+
.timeout(Duration::from_secs(10), supervision_receiver.recv())
682+
.await
683+
.expect("timeout")
684+
.unwrap();
685+
if let resource::Status::Failed(s) = state.status {
686+
assert!(s.contains("supervision events"));
687+
} else {
688+
panic!("Not failed: {:?}", state.status);
689+
}
690+
if let Some(ref inner) = state.state {
691+
assert!(!inner.supervision_events.is_empty());
692+
for event in &inner.supervision_events {
693+
assert_eq!(event.actor_id.name(), format!("{}", child_name.clone()));
694+
assert_matches!(event.actor_status, ActorStatus::Failed(_));
695+
}
696+
}
697+
}
698+
}
699+
561700
#[async_timed_test(timeout_secs = 30)]
562701
async fn test_cast() {
563702
let config = hyperactor::config::global::lock();

0 commit comments

Comments
 (0)