Skip to content

Commit fc3e30e

Browse files
pzhan9meta-codesync[bot]
authored andcommitted
Attribute SupervisionEvent to actor mesh if we can (#1939)
Summary: Pull Request resolved: #1939 This diff attributes supervision events fired from `send_state_change` to the corresponding actor mesh. Reviewed By: vidhyav Differential Revision: D87449139 fbshipit-source-id: 052e1fc38061a03a9984a8b559ebb36e2c053b9b
1 parent 028676b commit fc3e30e

File tree

2 files changed

+26
-21
lines changed

2 files changed

+26
-21
lines changed

hyperactor_mesh/src/v1/proc_mesh.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -688,7 +688,7 @@ impl ProcMeshRef {
688688
&self.name
689689
}
690690

691-
fn host_mesh_name(&self) -> Option<&Name> {
691+
pub fn host_mesh_name(&self) -> Option<&Name> {
692692
self.host_mesh.as_ref().map(|h| h.name())
693693
}
694694

monarch_hyperactor/src/v1/actor_mesh.rs

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,7 @@ fn actor_state_to_supervision_events(
406406
fn send_state_change<F>(
407407
rank: usize,
408408
event: ActorSupervisionEvent,
409-
mesh_name: &Name,
409+
actor_mesh_name: &Name,
410410
owner: &Option<ActorHandle<PythonActor>>,
411411
is_owned: bool,
412412
is_proc_stopped: bool,
@@ -416,7 +416,7 @@ fn send_state_change<F>(
416416
) where
417417
F: Fn(MeshFailure),
418418
{
419-
let failure = MeshFailure::new(mesh_name, rank, event.clone());
419+
let failure = MeshFailure::new(actor_mesh_name, rank, event.clone());
420420
// Any supervision event that is not a failure should not generate
421421
// call "unhandled".
422422
// This includes the Stopped status, which is a state that occurs when the
@@ -426,30 +426,30 @@ fn send_state_change<F>(
426426
let is_failed = event.is_error();
427427
if is_failed {
428428
tracing::warn!(
429-
name = "SupervisionEvent",
430-
%mesh_name,
429+
name = "ActorMeshStatus",
430+
status = "SupervisionError",
431431
%event,
432-
"detected supervision error on monitored mesh: name={mesh_name}",
432+
"detected supervision error on monitored mesh: name={actor_mesh_name}",
433433
);
434434
} else {
435435
tracing::debug!(
436-
name = "SupervisionEvent",
437-
%mesh_name,
436+
name = "ActorMeshStatus",
437+
status = "SupervisionEvent",
438438
%event,
439-
"detected non-error supervision event on monitored mesh: name={mesh_name}",
439+
"detected non-error supervision event on monitored mesh: name={actor_mesh_name}",
440440
);
441441
}
442442

443443
// Send a notification to the owning actor of this mesh, if there is one.
444444
if let Some(owner) = owner {
445445
if let Err(error) = owner.send(SupervisionFailureMessage {
446-
mesh_name: mesh_name.to_string(),
446+
mesh_name: actor_mesh_name.to_string(),
447447
rank,
448448
event: event.clone(),
449449
}) {
450450
tracing::warn!(
451-
name = "SupervisionEvent",
452-
%mesh_name,
451+
name = "ActorMeshStatus",
452+
status = "SupervisionError",
453453
%event,
454454
%error,
455455
"failed to send supervision event to owner {}: {}. dropping event",
@@ -497,9 +497,14 @@ fn send_state_change<F>(
497497
/// a message will be sent to "owner" if it is not None. If owner is None,
498498
/// then a panic will be raised instead to crash the client.
499499
/// * time_between_tasks 1trols how frequently to poll.
500+
#[hyperactor::instrument_infallible(fields(
501+
host_mesh=actor_mesh.proc_mesh().host_mesh_name().map(|n| n.to_string()),
502+
proc_mesh=actor_mesh.proc_mesh().name().to_string(),
503+
actor_mesh=actor_mesh.name().to_string(),
504+
))]
500505
async fn actor_states_monitor<A, F>(
501506
cx: &impl context::Actor,
502-
mesh: ActorMeshRef<A>,
507+
actor_mesh: ActorMeshRef<A>,
503508
owner: Option<ActorHandle<PythonActor>>,
504509
is_owned: bool,
505510
unhandled: F,
@@ -526,7 +531,7 @@ async fn actor_states_monitor<A, F>(
526531
_ = canceled.cancelled() => break,
527532
}
528533
// First check if the proc mesh is dead before trying to query their agents.
529-
let proc_states = mesh.proc_mesh().proc_states(cx).await;
534+
let proc_states = actor_mesh.proc_mesh().proc_states(cx).await;
530535
if let Err(e) = proc_states {
531536
send_state_change(
532537
0,
@@ -539,7 +544,7 @@ async fn actor_states_monitor<A, F>(
539544
)),
540545
None,
541546
),
542-
mesh.name(),
547+
actor_mesh.name(),
543548
&owner,
544549
is_owned,
545550
false,
@@ -590,12 +595,12 @@ async fn actor_states_monitor<A, F>(
590595
ActorSupervisionEvent::new(
591596
// Attribute this to the monitored actor, even if the underlying
592597
// cause is a proc_failure. We propagate the cause explicitly.
593-
mesh.get(point.rank()).unwrap().actor_id().clone(),
598+
actor_mesh.get(point.rank()).unwrap().actor_id().clone(),
594599
Some(display_name),
595600
actor_status,
596601
None,
597602
),
598-
mesh.name(),
603+
actor_mesh.name(),
599604
&owner,
600605
is_owned,
601606
true,
@@ -608,7 +613,7 @@ async fn actor_states_monitor<A, F>(
608613
}
609614

610615
// Now that we know the proc mesh is alive, check for actor state changes.
611-
let events = mesh.actor_states(cx).await;
616+
let events = actor_mesh.actor_states(cx).await;
612617
if let Err(e) = events {
613618
send_state_change(
614619
0,
@@ -621,7 +626,7 @@ async fn actor_states_monitor<A, F>(
621626
)),
622627
None,
623628
),
624-
mesh.name(),
629+
actor_mesh.name(),
625630
&owner,
626631
is_owned,
627632
false,
@@ -649,7 +654,7 @@ async fn actor_states_monitor<A, F>(
649654
send_state_change(
650655
rank,
651656
events[0].clone(),
652-
mesh.name(),
657+
actor_mesh.name(),
653658
&owner,
654659
is_owned,
655660
false,
@@ -673,7 +678,7 @@ async fn actor_states_monitor<A, F>(
673678
send_state_change(
674679
rank,
675680
events[0].clone(),
676-
mesh.name(),
681+
actor_mesh.name(),
677682
&owner,
678683
is_owned,
679684
false,

0 commit comments

Comments
 (0)