Skip to content

Commit 6757d53

Browse files
zdevitometa-codesync[bot]
authored andcommitted
propagate display name for process failures (#1911)
Summary: Pull Request resolved: #1911 Looks like this now: ``` The actor <root>.<tests.test_supervision_hierarchy.Nest top> and all its descendants have failed. This occurred because the actor <root>.<tests.test_supervision_hierarchy.Nest top>.<tests.test_supervision_hierarchy.Lambda nested{'a_dim': 0/1}> was running on a process which failed. The error was: process failure: Killed(sig=9) ``` Kinda janky because supervision code is messy. When we clean up supervision code, we should make sure the owning actor knows the friendly names of all of its children (e.g. has a map from actor/proc id to their fully qualified display names). A parent always knows what the name will be before it spawns it. ghstack-source-id: 324165921 exported-using-ghexport Reviewed By: dulinriley Differential Revision: D87284093 fbshipit-source-id: e32a32c5a138b3b2c7bb28711d164e0cca7864b2
1 parent 53b8a2e commit 6757d53

File tree

7 files changed

+134
-38
lines changed

7 files changed

+134
-38
lines changed

monarch_hyperactor/src/actor_mesh.rs

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,11 @@ pub(crate) trait ActorMeshProtocol: Send + Sync {
7979
/// will provide "supervision_event" with events.
8080
/// The default implementation does nothing, and it is not required that
8181
/// it has to be called before supervision_event.
82-
fn start_supervision(&self, _instance: &PyInstance) -> PyResult<()> {
82+
fn start_supervision(
83+
&self,
84+
_instance: &PyInstance,
85+
_supervision_display_name: String,
86+
) -> PyResult<()> {
8387
Ok(())
8488
}
8589

@@ -156,8 +160,13 @@ impl PythonActorMesh {
156160
self.inner.supervision_event(instance)
157161
}
158162

159-
fn start_supervision(&self, instance: &PyInstance) -> PyResult<()> {
160-
self.inner.start_supervision(instance)
163+
fn start_supervision(
164+
&self,
165+
instance: &PyInstance,
166+
supervision_display_name: String,
167+
) -> PyResult<()> {
168+
self.inner
169+
.start_supervision(instance, supervision_display_name)
161170
}
162171

163172
fn stop(&self, instance: &PyInstance) -> PyResult<PyPythonTask> {
@@ -387,8 +396,12 @@ impl PythonActorMeshImpl {
387396
fn supervision_event(&self, instance: &PyInstance) -> PyResult<Option<PyShared>> {
388397
ActorMeshProtocol::supervision_event(self, instance)
389398
}
390-
fn start_supervision(&self, instance: &PyInstance) -> PyResult<()> {
391-
ActorMeshProtocol::start_supervision(self, instance)
399+
fn start_supervision(
400+
&self,
401+
instance: &PyInstance,
402+
supervision_display_name: String,
403+
) -> PyResult<()> {
404+
ActorMeshProtocol::start_supervision(self, instance, supervision_display_name)
392405
}
393406

394407
fn stop(&self, instance: &PyInstance) -> PyResult<PyPythonTask> {
@@ -543,6 +556,8 @@ impl Drop for PythonActorMeshImpl {
543556
self.monitor.abort();
544557
}
545558
}
559+
560+
#[derive(Debug)]
546561
struct ClonePyErr {
547562
inner: PyErr,
548563
}
@@ -669,18 +684,28 @@ impl ActorMeshProtocol for AsyncActorMesh {
669684
if !self.supervised {
670685
return Ok(None);
671686
}
672-
let mesh = self.mesh.clone();
673687
let instance = Python::with_gil(|_py| instance.clone());
688+
let (tx, rx) = tokio::sync::oneshot::channel();
689+
let mesh = self.mesh.clone();
690+
self.push(async move {
691+
if tx.send(mesh.await.unwrap()).is_err() {
692+
panic!("oneshot failed");
693+
}
694+
});
674695
PyPythonTask::new(async move {
675-
let mut event = mesh.await?.supervision_event(&instance)?.unwrap();
696+
let mut event = rx.await.unwrap().supervision_event(&instance)?.unwrap();
676697
event.task()?.take_task()?.await
677698
})
678699
// This task must be aborted to run the Drop for the inner PyShared, in
679700
// case that one is also abortable.
680701
.map(|mut x| x.spawn_abortable().map(Some))?
681702
}
682703

683-
fn start_supervision(&self, instance: &PyInstance) -> PyResult<()> {
704+
fn start_supervision(
705+
&self,
706+
instance: &PyInstance,
707+
supervision_display_name: String,
708+
) -> PyResult<()> {
684709
if !self.supervised {
685710
return Ok(());
686711
}
@@ -689,7 +714,8 @@ impl ActorMeshProtocol for AsyncActorMesh {
689714
self.push(async move {
690715
let mesh = mesh.await;
691716
if let Ok(mesh) = mesh {
692-
mesh.start_supervision(&instance).unwrap();
717+
mesh.start_supervision(&instance, supervision_display_name)
718+
.unwrap();
693719
}
694720
});
695721
Ok(())

monarch_hyperactor/src/context.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ impl PyInstance {
131131
}
132132

133133
#[getter]
134-
fn actor_id(&self) -> PyActorId {
134+
pub fn actor_id(&self) -> PyActorId {
135135
self.inner.self_id().clone().into()
136136
}
137137
}

monarch_hyperactor/src/v1/actor_mesh.rs

Lines changed: 52 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,12 @@ impl PythonActorMeshImpl {
178178
}
179179
}
180180

181-
fn make_monitor<F>(&self, instance: PyInstance, unhandled: F) -> SupervisionMonitor
181+
fn make_monitor<F>(
182+
&self,
183+
instance: PyInstance,
184+
unhandled: F,
185+
supervision_display_name: String,
186+
) -> SupervisionMonitor
182187
where
183188
F: Fn(MeshFailure) + Send + 'static,
184189
{
@@ -190,6 +195,7 @@ impl PythonActorMeshImpl {
190195
inner.health_state.clone(),
191196
true,
192197
unhandled,
198+
supervision_display_name,
193199
),
194200
// Ref meshes send no message, they are only used to generate
195201
// the v0 style exception.
@@ -199,6 +205,7 @@ impl PythonActorMeshImpl {
199205
inner.health_state.clone(),
200206
false,
201207
unhandled,
208+
supervision_display_name,
202209
),
203210
}
204211
}
@@ -211,14 +218,19 @@ impl PythonActorMeshImpl {
211218
instance: &PyInstance,
212219
monitor: &Arc<Mutex<Option<SupervisionMonitor>>>,
213220
unhandled: F,
221+
supervision_display_name: Option<String>,
214222
) -> watch::Receiver<Option<PyErr>>
215223
where
216224
F: Fn(MeshFailure) + Send + 'static,
217225
{
218226
let mut guard = monitor.lock().unwrap();
219227
guard.get_or_insert_with(move || {
220228
let instance = Python::with_gil(|_py| instance.clone());
221-
self.make_monitor(instance, unhandled)
229+
self.make_monitor(
230+
instance,
231+
unhandled,
232+
supervision_display_name.unwrap_or_default(),
233+
)
222234
});
223235
let monitor = guard.as_ref().unwrap();
224236
monitor.receiver.clone()
@@ -296,6 +308,7 @@ impl PythonActorMeshImpl {
296308
health_state: Arc<RootHealthState>,
297309
is_owned: bool,
298310
unhandled: F,
311+
supervision_display_name: String,
299312
) -> SupervisionMonitor
300313
where
301314
F: Fn(MeshFailure) + Send + 'static,
@@ -324,6 +337,7 @@ impl PythonActorMeshImpl {
324337
time_between_checks,
325338
sender,
326339
canceled,
340+
supervision_display_name.clone(),
327341
)
328342
.await;
329343
}
@@ -345,6 +359,7 @@ impl PythonActorMeshImpl {
345359
time_between_checks,
346360
sender,
347361
canceled,
362+
supervision_display_name,
348363
)
349364
.await;
350365
}
@@ -492,6 +507,7 @@ async fn actor_states_monitor<A, F>(
492507
time_between_checks: tokio::time::Duration,
493508
sender: watch::Sender<Option<PyErr>>,
494509
canceled: CancellationToken,
510+
supervision_display_name: String,
495511
) where
496512
A: Actor + RemotableActor + Referable,
497513
A::Params: RemoteMessage,
@@ -554,24 +570,29 @@ async fn actor_states_monitor<A, F>(
554570
status
555571
))),
556572
};
557-
573+
let display_name = if !point.is_empty() {
574+
let coords_display = point.format_as_dict();
575+
if let Some(pos) = supervision_display_name.rfind('>') {
576+
format!(
577+
"{}{}{}",
578+
&supervision_display_name[..pos],
579+
coords_display,
580+
&supervision_display_name[pos..]
581+
)
582+
} else {
583+
format!("{}{}", supervision_display_name, coords_display)
584+
}
585+
} else {
586+
supervision_display_name.clone()
587+
};
558588
send_state_change(
559589
point.rank(),
560590
ActorSupervisionEvent::new(
561591
// Attribute this to the monitored actor, even if the underlying
562592
// cause is a proc_failure. We propagate the cause explicitly.
563593
mesh.get(point.rank()).unwrap().actor_id().clone(),
564-
None,
594+
Some(format!("{} was running on a process which", display_name)),
565595
actor_status,
566-
// ActorStatus::Failed(ActorErrorKind::Generic(format!(
567-
// "process failure: {}",
568-
// state
569-
// .state
570-
// .and_then(|state| state.proc_status)
571-
// .unwrap_or_else(|| ProcStatus::Failed {
572-
// reason: "unknown".to_string()
573-
// })
574-
// ))),
575596
None,
576597
),
577598
mesh.name(),
@@ -593,7 +614,7 @@ async fn actor_states_monitor<A, F>(
593614
0,
594615
ActorSupervisionEvent::new(
595616
cx.instance().self_id().clone(),
596-
None,
617+
Some(supervision_display_name.clone()),
597618
ActorStatus::generic_failure(format!(
598619
"unable to query for actor states: {:?}",
599620
e
@@ -720,7 +741,7 @@ impl ActorMeshProtocol for PythonActorMeshImpl {
720741
// Make a clone so each endpoint can get the same supervision events.
721742
let unhandled = self.get_unhandled(instance);
722743
let monitor = self.monitor().clone();
723-
let mut receiver = self.supervision_receiver(instance, &monitor, unhandled);
744+
let mut receiver = self.supervision_receiver(instance, &monitor, unhandled, None);
724745
PyPythonTask::new(async move {
725746
receiver.changed().await.map_err(|e| {
726747
PyValueError::new_err(format!("Waiting for supervision event change: {}", e))
@@ -744,11 +765,20 @@ impl ActorMeshProtocol for PythonActorMeshImpl {
744765
.map(Some)
745766
}
746767

747-
fn start_supervision(&self, instance: &PyInstance) -> PyResult<()> {
768+
fn start_supervision(
769+
&self,
770+
instance: &PyInstance,
771+
supervision_display_name: String,
772+
) -> PyResult<()> {
748773
// Fetch the receiver once, this will initialize the monitor task.
749774
let unhandled = self.get_unhandled(instance);
750775
let monitor = self.monitor().clone();
751-
self.supervision_receiver(instance, &monitor, unhandled);
776+
self.supervision_receiver(
777+
instance,
778+
&monitor,
779+
unhandled,
780+
Some(supervision_display_name),
781+
);
752782
Ok(())
753783
}
754784

@@ -827,7 +857,11 @@ impl ActorMeshProtocol for ActorMeshRef<PythonActor> {
827857
))
828858
}
829859

830-
fn start_supervision(&self, _instance: &PyInstance) -> PyResult<()> {
860+
fn start_supervision(
861+
&self,
862+
_instance: &PyInstance,
863+
_supervision_display_name: String,
864+
) -> PyResult<()> {
831865
Err(PyErr::new::<PyNotImplementedError, _>(
832866
"This should never be called on ActorMeshRef directly",
833867
))

ndslice/src/view.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -769,6 +769,31 @@ impl Point {
769769
pub fn is_empty(&self) -> bool {
770770
self.extent.len() == 0
771771
}
772+
773+
/// Formats the coordinates of this Point as a string suitable for display names.
774+
/// Returns a string in the format: "{'label': coord/size, 'label': coord/size, ...}"
775+
///
776+
/// # Examples
777+
/// ```
778+
/// use ndslice::extent;
779+
///
780+
/// let ext = extent!(x = 2, y = 3);
781+
/// let point = ext.point(vec![1, 2]).unwrap();
782+
/// assert_eq!(point.format_as_dict(), "{'x': 1/2, 'y': 2/3}");
783+
/// ```
784+
pub fn format_as_dict(&self) -> String {
785+
format!(
786+
"{{{}}}",
787+
self.extent()
788+
.labels()
789+
.iter()
790+
.zip(self.coords_iter())
791+
.zip(self.extent().sizes())
792+
.map(|((label, coord), size)| format!("'{}': {}/{}", label, coord, size))
793+
.collect::<Vec<_>>()
794+
.join(", ")
795+
)
796+
}
772797
}
773798

774799
/// Formats a `Point` as a comma-separated list of per-axis

python/monarch/_rust_bindings/monarch_hyperactor/actor_mesh.pyi

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ class ActorMeshProtocol(Protocol):
3131
self, instance: Instance
3232
) -> "Optional[Shared[Exception]]": ...
3333
# Starts supervision monitoring for future uses of "supervision_event".
34-
def start_supervision(self, instance: Instance) -> None: ...
34+
def start_supervision(
35+
self, instance: Instance, supervision_display_name: str
36+
) -> None: ...
3537
def stop(self, instance: Instance) -> PythonTask[None]: ...
3638
def initialized(self) -> PythonTask[None]: ...
3739

python/monarch/_src/actor/actor_mesh.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -468,7 +468,9 @@ def new_with_region(self, region: Region) -> "ActorMeshProtocol":
468468
def supervision_event(self, instance: HyInstance) -> "Optional[Shared[Exception]]":
469469
return None
470470

471-
def start_supervision(self, instance: HyInstance) -> None:
471+
def start_supervision(
472+
self, instance: HyInstance, supervision_display_name: str
473+
) -> None:
472474
return None
473475

474476
def stop(self, instance: HyInstance) -> "PythonTask[None]":
@@ -1255,10 +1257,6 @@ def __init__(
12551257
self._inner: "ActorMeshProtocol" = inner
12561258
self._shape = shape
12571259
self._proc_mesh = proc_mesh
1258-
# We don't start the supervision polling loop until the first call to
1259-
# supervision_event, which needs an Instance. Initialize here so events
1260-
# can be collected even without any endpoints being awaited.
1261-
self._inner.start_supervision(context().actor_instance._as_rust())
12621260

12631261
async_endpoints = []
12641262
sync_endpoints = []
@@ -1347,6 +1345,15 @@ def _create(
13471345
) -> "ActorMesh[T]":
13481346
mesh = cls(Class, actor_mesh, shape, proc_mesh)
13491347

1348+
# We don't start the supervision polling loop until the first call to
1349+
# supervision_event, which needs an Instance. Initialize here so events
1350+
# can be collected even without any endpoints being awaited.
1351+
instance = context().actor_instance
1352+
supervision_display_name = (
1353+
f"{str(instance)}.<{Class.__module__}.{Class.__name__} {name}>"
1354+
)
1355+
mesh._inner.start_supervision(instance._as_rust(), supervision_display_name)
1356+
13501357
async def null_func(*_args: Iterable[Any], **_kwargs: Dict[str, Any]) -> None:
13511358
return None
13521359

python/tests/test_supervision_hierarchy.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ def run(self, func: Callable[[], T]) -> T:
2727

2828
class Nest(Actor):
2929
def __init__(self):
30-
self.nest = this_host().spawn_procs().spawn("nested", Lambda)
30+
self.nest = (
31+
this_host().spawn_procs(per_host={"a_dim": 1}).spawn("nested", Lambda)
32+
)
3133

3234
# pyre-ignore[56]
3335
@endpoint
@@ -112,10 +114,10 @@ def test_proc_failure():
112114
If a proc dies, the client should receive an unhandled fault.
113115
"""
114116
with FaultCapture() as capture:
115-
actor = this_host().spawn_procs().spawn("actor", Nest)
117+
actor = this_host().spawn_procs().spawn("top", Nest)
116118
actor.kill_nest.call_one().get()
117119

118-
capture.assert_fault_occurred()
120+
capture.assert_fault_occurred("nested{'a_dim': 0/1}")
119121

120122

121123
def test_nested_mesh_kills_actor_actor_error():
@@ -129,5 +131,5 @@ def test_nested_mesh_kills_actor_actor_error():
129131
actor.nested.call_one(error).get()
130132
print("ERRORED THE ACTOR")
131133
capture.assert_fault_occurred(
132-
"actor <root>.<tests.test_supervision_hierarchy.Nest actor>.<tests.test_supervision_hierarchy.Lambda nested> failed"
134+
"actor <root>.<tests.test_supervision_hierarchy.Nest actor>.<tests.test_supervision_hierarchy.Lambda nested{'a_dim': 0/1}> failed"
133135
)

0 commit comments

Comments
 (0)