Skip to content

Commit 92b8eab

Browse files
pzhan9meta-codesync[bot]
authored andcommitted
Replace field mesh_name with host_mesh and proc_mesh (#1937)
Summary: Pull Request resolved: #1937 Right now I use `mesh_name` for both proc and host mesh. I find it is not adequate, since for many logs, I could have both fields. This diff deprecates `mesh_name`, and add two new columns: `proc_mesh` and `host_mesh`. Reviewed By: vidhyav, shayne-fletcher Differential Revision: D87374168 fbshipit-source-id: fccb57ac7d022df2c9670a25fb89ef187bb77cee
1 parent 2682fc6 commit 92b8eab

File tree

2 files changed

+45
-51
lines changed

2 files changed

+45
-51
lines changed

hyperactor_mesh/src/v1/host_mesh.rs

Lines changed: 40 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ impl HostMesh {
371371
}
372372

373373
// Use allocate_inner to set field mesh_name in span
374-
#[hyperactor::instrument(fields(mesh_name=name.to_string()))]
374+
#[hyperactor::instrument(fields(host_mesh=name.to_string()))]
375375
async fn allocate_inner(
376376
cx: &impl context::Actor,
377377
alloc: Box<dyn Alloc + Send + Sync>,
@@ -475,7 +475,7 @@ impl HostMesh {
475475
/// `BootstrapProcManager`. On drop, the manager walks its PID
476476
/// table and sends SIGKILL to any procs it spawned—tying proc
477477
/// lifetimes to their hosts and preventing leaks.
478-
#[hyperactor::instrument(fields(mesh_name=self.name.to_string()))]
478+
#[hyperactor::instrument(fields(host_mesh=self.name.to_string()))]
479479
pub async fn shutdown(&self, cx: &impl hyperactor::context::Actor) -> anyhow::Result<()> {
480480
tracing::info!(name = "HostMeshStatus", status = "Shutdown::Attempt");
481481
let mut failed_hosts = vec![];
@@ -534,7 +534,7 @@ impl Drop for HostMesh {
534534
fn drop(&mut self) {
535535
tracing::info!(
536536
name = "HostMeshStatus",
537-
mesh_name = %self.name,
537+
host_mesh = %self.name,
538538
status = "Dropping",
539539
);
540540
// Snapshot the owned hosts we're responsible for.
@@ -556,7 +556,7 @@ impl Drop for HostMesh {
556556
handle.spawn(async move {
557557
let span = tracing::info_span!(
558558
"hostmesh_drop_cleanup",
559-
%mesh_name,
559+
host_mesh = %mesh_name,
560560
allocation = %allocation_label,
561561
hosts = hosts.len(),
562562
);
@@ -619,15 +619,15 @@ impl Drop for HostMesh {
619619
// No runtime here; PDEATHSIG and manager Drop remain the
620620
// last-resort safety net.
621621
tracing::warn!(
622-
mesh_name = %self.name,
622+
host_mesh = %self.name,
623623
hosts = hosts.len(),
624624
"HostMesh dropped without a tokio runtime; skipping best-effort shutdown"
625625
);
626626
}
627627

628628
tracing::info!(
629629
name = "HostMeshStatus",
630-
mesh_name = %self.name,
630+
host_mesh = %self.name,
631631
status = "Dropped",
632632
);
633633
}
@@ -725,37 +725,36 @@ impl HostMeshRef {
725725
name: &str,
726726
per_host: Extent,
727727
) -> v1::Result<ProcMesh> {
728-
let proc_mesh_name = Name::new(name);
729-
tracing::info!(
730-
name = "HostMeshStatus",
731-
status = "ProcMesh::Spawn::Attempt",
732-
mesh_name = %self.name,
733-
"spawning proc mesh {}", proc_mesh_name
734-
);
735-
let result = self.spawn_inner(cx, proc_mesh_name.clone(), per_host).await;
736-
if result.is_ok() {
737-
tracing::info!(
738-
name = "HostMeshStatus",
739-
status = "ProcMesh::Spawn::Success",
740-
mesh_name = %self.name,
741-
"spawned proc mesh {}", proc_mesh_name
742-
);
743-
} else {
744-
tracing::error!(
745-
name = "HostMeshStatus",
746-
status = "ProcMesh::Spawn::Failed",
747-
mesh_name = %self.name,
748-
"failed to spawn proc mesh {}", proc_mesh_name
749-
);
728+
self.spawn_inner(cx, Name::new(name), per_host).await
729+
}
730+
731+
#[hyperactor::instrument(fields(host_mesh=self.name.to_string(), proc_mesh=proc_mesh_name.to_string()))]
732+
async fn spawn_inner(
733+
&self,
734+
cx: &impl context::Actor,
735+
proc_mesh_name: Name,
736+
per_host: Extent,
737+
) -> v1::Result<ProcMesh> {
738+
tracing::info!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Attempt");
739+
tracing::info!(name = "ProcMeshStatus", status = "Spawn::Attempt",);
740+
let result = self.spawn_inner_inner(cx, proc_mesh_name, per_host).await;
741+
match &result {
742+
Ok(_) => {
743+
tracing::info!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Success");
744+
tracing::info!(name = "ProcMeshStatus", status = "Spawn::Success");
745+
}
746+
Err(error) => {
747+
tracing::error!(name = "HostMeshStatus", status = "ProcMesh::Spawn::Failed", %error);
748+
tracing::error!(name = "ProcMeshStatus", status = "Spawn::Failed", %error);
749+
}
750750
}
751751
result
752752
}
753753

754-
#[hyperactor::instrument(fields(mesh_name=mesh_name.to_string()))]
755-
async fn spawn_inner(
754+
async fn spawn_inner_inner(
756755
&self,
757756
cx: &impl context::Actor,
758-
mesh_name: Name,
757+
proc_mesh_name: Name,
759758
per_host: Extent,
760759
) -> v1::Result<ProcMesh> {
761760
let per_host_labels = per_host.labels().iter().collect::<HashSet<_>>();
@@ -807,7 +806,7 @@ impl HostMeshRef {
807806
for (host_rank, host) in self.ranks.iter().enumerate() {
808807
for per_host_rank in 0..per_host.num_ranks() {
809808
let create_rank = per_host.num_ranks() * host_rank + per_host_rank;
810-
let proc_name = Name::new(format!("{}_{}", mesh_name.name(), per_host_rank));
809+
let proc_name = Name::new(format!("{}_{}", proc_mesh_name.name(), per_host_rank));
811810
proc_names.push(proc_name.clone());
812811
host.mesh_agent()
813812
.create_or_update(
@@ -939,8 +938,7 @@ impl HostMeshRef {
939938
}
940939

941940
let mesh =
942-
ProcMesh::create_owned_unchecked(cx, mesh_name, extent, self.clone(), procs).await;
943-
tracing::info!(name = "ProcMeshStatus", status = "Spawn::Created",);
941+
ProcMesh::create_owned_unchecked(cx, proc_mesh_name, extent, self.clone(), procs).await;
944942
if let Ok(ref mesh) = mesh {
945943
// Spawn a unique mesh controller for each proc mesh, so the type of the
946944
// mesh can be preserved.
@@ -957,6 +955,7 @@ impl HostMeshRef {
957955
&self.name
958956
}
959957

958+
#[hyperactor::instrument(fields(host_mesh=self.name.to_string(), proc_mesh=proc_mesh_name.to_string()))]
960959
pub(crate) async fn stop_proc_mesh(
961960
&self,
962961
cx: &impl hyperactor::context::Actor,
@@ -1004,19 +1003,20 @@ impl HostMeshRef {
10041003

10051004
tracing::info!(
10061005
name = "ProcMeshStatus",
1007-
mesh_name = %proc_mesh_name,
10081006
%proc_id,
10091007
status = "Stop::Sent",
10101008
);
10111009
}
10121010
tracing::info!(
1013-
mesh_name = %self.name,
10141011
name = "HostMeshStatus",
10151012
status = "ProcMesh::Stop::Sent",
1016-
"sending Stop to proc mesh {} for {} procs: {}",
1017-
proc_mesh_name,
1013+
"sending Stop to proc mesh for {} procs: {}",
10181014
proc_names.len(),
1019-
proc_names.iter().map(|n| n.to_string()).collect::<Vec<_>>().join(", ")
1015+
proc_names
1016+
.iter()
1017+
.map(|n| n.to_string())
1018+
.collect::<Vec<_>>()
1019+
.join(", ")
10201020
);
10211021

10221022
let start_time = RealClock.now();
@@ -1034,7 +1034,6 @@ impl HostMeshRef {
10341034
if !all_stopped {
10351035
tracing::error!(
10361036
name = "ProcMeshStatus",
1037-
mesh_name = %proc_mesh_name,
10381037
status = "FailedToStop",
10391038
"failed to terminate proc mesh: {:?}",
10401039
statuses,
@@ -1044,11 +1043,7 @@ impl HostMeshRef {
10441043
statuses,
10451044
));
10461045
}
1047-
tracing::info!(
1048-
name = "ProcMeshStatus",
1049-
mesh_name = %proc_mesh_name,
1050-
status = "Stopped",
1051-
);
1046+
tracing::info!(name = "ProcMeshStatus", status = "Stopped");
10521047
}
10531048
Err(complete) => {
10541049
// Fill remaining ranks with a timeout status via the
@@ -1061,7 +1056,6 @@ impl HostMeshRef {
10611056
);
10621057
tracing::error!(
10631058
name = "ProcMeshStatus",
1064-
mesh_name = %proc_mesh_name,
10651059
status = "StoppingTimeout",
10661060
"failed to terminate proc mesh before timeout: {:?}",
10671061
legacy,

hyperactor_mesh/src/v1/proc_mesh.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ impl ProcMesh {
306306
}
307307

308308
// Use allocate_inner to set field mesh_name in span
309-
#[hyperactor::instrument(fields(mesh_name=name.to_string()))]
309+
#[hyperactor::instrument(fields(proc_mesh=name.to_string()))]
310310
async fn allocate_inner(
311311
cx: &impl context::Actor,
312312
mut alloc: Box<dyn Alloc + Send + Sync + 'static>,
@@ -328,7 +328,7 @@ impl ProcMesh {
328328
.instrument(tracing::info_span!(
329329
"ProcMeshStatus::Allocate::Initialize",
330330
alloc_id,
331-
mesh_name = name.to_string()
331+
proc_mesh = %name
332332
))
333333
.await?;
334334

@@ -348,7 +348,7 @@ impl ProcMesh {
348348
tracing::info!(
349349
name = "ProcMeshStatus",
350350
status = "Allocate::ChannelServe",
351-
mesh_name = name.to_string(),
351+
proc_mesh = %name,
352352
%addr,
353353
"proc started listening on addr: {addr}"
354354
);
@@ -516,7 +516,7 @@ impl ProcMesh {
516516
stop.notify_one();
517517
tracing::info!(
518518
name = "ProcMeshStatus",
519-
mesh_name = %self.name,
519+
proc_mesh = %self.name,
520520
alloc_name,
521521
status = "StoppingAlloc",
522522
"sending stop to alloc {alloc_name}; check its log for stop status",
@@ -551,7 +551,7 @@ impl Drop for ProcMesh {
551551
fn drop(&mut self) {
552552
tracing::info!(
553553
name = "ProcMeshStatus",
554-
mesh_name = %self.name,
554+
proc_mesh = %self.name,
555555
status = "Dropped",
556556
);
557557
}

0 commit comments

Comments
 (0)