Skip to content

Commit 9130b82

Browse files
pzhan9meta-codesync[bot]
authored andcommitted
Add events regarding ProcMesh::stop (#1914)
Summary: Pull Request resolved: #1914 Reviewed By: mariusae, vidhyav Differential Revision: D87269409 fbshipit-source-id: ec4288eb13163ea95e398d043bc93f79c448f0fa
1 parent 91a708c commit 9130b82

File tree

3 files changed

+92
-25
lines changed

3 files changed

+92
-25
lines changed

hyperactor_mesh/src/alloc.rs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -359,10 +359,23 @@ pub trait Alloc {
359359
/// Stop this alloc and wait for all procs to stop. Call will
360360
/// block until all ProcState events have been drained.
361361
async fn stop_and_wait(&mut self) -> Result<(), AllocatorError> {
362+
tracing::error!(
363+
name = "AllocStatus",
364+
alloc_name = %self.world_id(),
365+
status = "StopAndWait",
366+
);
362367
self.stop().await?;
363368
while let Some(event) = self.next().await {
364-
tracing::debug!("drained event: {:?}", event);
369+
tracing::debug!(
370+
alloc_name = %self.world_id(),
371+
"drained event: {event:?}"
372+
);
365373
}
374+
tracing::error!(
375+
name = "AllocStatus",
376+
alloc_name = %self.world_id(),
377+
status = "Stopped",
378+
);
366379
Ok(())
367380
}
368381

hyperactor_mesh/src/v1/host_mesh.rs

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -900,6 +900,7 @@ impl HostMeshRef {
900900
pub(crate) async fn stop_proc_mesh(
901901
&self,
902902
cx: &impl hyperactor::context::Actor,
903+
proc_mesh_name: &Name,
903904
procs: impl IntoIterator<Item = ProcId>,
904905
region: Region,
905906
) -> anyhow::Result<()> {
@@ -940,8 +941,18 @@ impl HostMeshRef {
940941
host.mesh_agent()
941942
.get_rank_status(cx, proc_name, port.bind())
942943
.await?;
944+
945+
tracing::info!(
946+
name = "ProcMeshStatus",
947+
mesh_name = %proc_mesh_name,
948+
%proc_id,
949+
status = "Stop::Sent",
950+
);
943951
}
944952
tracing::info!(
953+
mesh_name = %self.name,
954+
name = "HostMeshStatus",
955+
status = "ProcMesh::Stop::Sent",
945956
"Sending Stop to host mesh {} for {:?} procs",
946957
self.name,
947958
proc_names
@@ -960,11 +971,23 @@ impl HostMeshRef {
960971
Ok(statuses) => {
961972
let failed = statuses.values().any(|s| s.is_failure());
962973
if failed {
974+
tracing::error!(
975+
name = "ProcMeshStatus",
976+
mesh_name = %proc_mesh_name,
977+
status = "FailedToStop",
978+
"failed to terminate proc mesh: {:?}",
979+
statuses,
980+
);
963981
return Err(anyhow::anyhow!(
964982
"failed to terminate proc mesh: {:?}",
965983
statuses,
966984
));
967985
}
986+
tracing::info!(
987+
name = "ProcMeshStatus",
988+
mesh_name = %proc_mesh_name,
989+
status = "Stopped",
990+
);
968991
}
969992
Err(complete) => {
970993
// Fill remaining ranks with a timeout status via the
@@ -975,8 +998,16 @@ impl HostMeshRef {
975998
Status::is_not_exist,
976999
num_ranks,
9771000
);
1001+
tracing::error!(
1002+
name = "ProcMeshStatus",
1003+
mesh_name = %proc_mesh_name,
1004+
status = "StoppingTimeout",
1005+
"failed to terminate proc mesh before timeout: {:?}",
1006+
legacy,
1007+
);
9781008
return Err(anyhow::anyhow!(
979-
"failed to terminate proc mesh: {:?}",
1009+
"failed to terminate proc mesh {} before timeout: {:?}",
1010+
proc_mesh_name,
9801011
legacy
9811012
));
9821013
}

hyperactor_mesh/src/v1/proc_mesh.rs

Lines changed: 46 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -435,41 +435,52 @@ impl ProcMesh {
435435

436436
let stop = Arc::new(Notify::new());
437437
let extent = alloc.extent().clone();
438+
let alloc_name = alloc.world_id().to_string();
438439

439440
{
440441
let stop = Arc::clone(&stop);
441-
let name = name.clone();
442-
443-
tokio::spawn(async move {
444-
loop {
445-
tokio::select! {
446-
_ = stop.notified() => {
447-
// If we are explicitly stopped, the alloc is torn down.
448-
if let Err(e) = alloc.stop_and_wait().await {
449-
tracing::error!("alloc {}: failed to stop: {}", name, e);
450-
}
451-
break;
452-
}
453-
// We are mostly just using this to drive allocation events.
454-
proc_state = alloc.next() => {
455-
match proc_state {
456-
// The alloc was stopped.
457-
None => break,
458-
Some(proc_state) => {
459-
tracing::info!("unmonitored allocation event for {}: {}", name, proc_state);
442+
443+
tokio::spawn(
444+
async move {
445+
loop {
446+
tokio::select! {
447+
_ = stop.notified() => {
448+
// If we are explicitly stopped, the alloc is torn down.
449+
if let Err(error) = alloc.stop_and_wait().await {
450+
tracing::error!(
451+
name = "ProcMeshStatus",
452+
alloc_name = %alloc.world_id(),
453+
status = "FailedToStopAlloc",
454+
%error,
455+
);
460456
}
457+
break;
461458
}
459+
// We are mostly just using this to drive allocation events.
460+
proc_state = alloc.next() => {
461+
match proc_state {
462+
// The alloc was stopped.
463+
None => break,
464+
Some(proc_state) => {
465+
tracing::debug!(
466+
alloc_name = %alloc.world_id(),
467+
"unmonitored allocation event: {}", proc_state);
468+
}
469+
}
462470

471+
}
463472
}
464473
}
465474
}
466-
}.instrument(tracing::info_span!("alloc_monitor")));
475+
.instrument(tracing::info_span!("alloc_monitor")),
476+
);
467477
}
468478

469479
let mesh = Self::create(
470480
cx,
471481
name,
472482
ProcMeshAllocation::Allocated {
483+
alloc_name,
473484
stop,
474485
extent,
475486
ranks: Arc::new(ranks),
@@ -497,15 +508,24 @@ impl ProcMesh {
497508
pub async fn stop(&mut self, cx: &impl context::Actor) -> anyhow::Result<()> {
498509
let region = self.region.clone();
499510
match &mut self.allocation {
500-
ProcMeshAllocation::Allocated { stop, .. } => {
511+
ProcMeshAllocation::Allocated {
512+
stop, alloc_name, ..
513+
} => {
501514
stop.notify_one();
515+
tracing::info!(
516+
name = "ProcMeshStatus",
517+
mesh_name = %self.name,
518+
alloc_name,
519+
status = "StoppingAlloc",
520+
"sending stop to alloc {alloc_name}; check its log for stop status",
521+
);
502522
Ok(())
503523
}
504524
ProcMeshAllocation::Owned { hosts, .. } => {
505-
let names = self.current_ref.proc_ids().collect::<Vec<ProcId>>();
525+
let procs = self.current_ref.proc_ids().collect::<Vec<ProcId>>();
506526
// We use the proc mesh region rather than the host mesh region
507527
// because the host agent stores one entry per proc, not per host.
508-
hosts.stop_proc_mesh(cx, names, region).await
528+
hosts.stop_proc_mesh(cx, &self.name, procs, region).await
509529
}
510530
}
511531
}
@@ -539,6 +559,9 @@ impl Drop for ProcMesh {
539559
enum ProcMeshAllocation {
540560
/// A mesh that has been allocated from an `Alloc`.
541561
Allocated {
562+
// The name of the alloc from which this mesh was allocated.
563+
alloc_name: String,
564+
542565
// A cancellation token used to stop the task keeping the alloc alive.
543566
stop: Arc<Notify>,
544567

0 commit comments

Comments
 (0)