Skip to content

Commit 028676b

Browse files
pzhan9meta-codesync[bot]
authored andcommitted
Add events for ActorMesh (#1938)
Summary: Pull Request resolved: #1938 As title. Reviewed By: mariusae, vidhyav Differential Revision: D87382588 fbshipit-source-id: 8cf97b71f17c21f5546633ad3a2e7877893a60b8
1 parent 92b8eab commit 028676b

File tree

2 files changed

+77
-1
lines changed

2 files changed

+77
-1
lines changed

hyperactor_mesh/src/v1/actor_mesh.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ impl<A: Referable> ActorMesh<A> {
7676

7777
/// Detach this mesh from the lifetime of `self`, and return its reference.
7878
pub(crate) fn detach(self) -> ActorMeshRef<A> {
79-
self.current_ref
79+
self.current_ref.clone()
8080
}
8181

8282
/// Stop actors on this mesh across all procs.
@@ -113,6 +113,16 @@ impl<A: Referable> Clone for ActorMesh<A> {
113113
}
114114
}
115115

116+
impl<A: Referable> Drop for ActorMesh<A> {
117+
fn drop(&mut self) {
118+
tracing::info!(
119+
name = "ActorMeshStatus",
120+
actor_mesh = %self.name,
121+
status = "Dropped",
122+
);
123+
}
124+
}
125+
116126
/// Influences paging behavior for the lazy cache. Smaller pages
117127
/// reduce over-allocation for sparse access; larger pages reduce the
118128
/// number of heap allocations for contiguous scans.

hyperactor_mesh/src/v1/proc_mesh.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -688,6 +688,10 @@ impl ProcMeshRef {
688688
&self.name
689689
}
690690

691+
fn host_mesh_name(&self) -> Option<&Name> {
692+
self.host_mesh.as_ref().map(|h| h.name())
693+
}
694+
691695
/// Returns the HostMeshRef that this ProcMeshRef might be backed by.
692696
/// Returns None if this ProcMeshRef is backed by an Alloc instead of a host mesh.
693697
pub fn hosts(&self) -> Option<&HostMeshRef> {
@@ -875,12 +879,48 @@ impl ProcMeshRef {
875879
/// inside the `ActorMesh`.
876880
/// - `A::Params: RemoteMessage` - spawn parameters must be
877881
/// serializable and routable.
882+
#[hyperactor::instrument(fields(
883+
host_mesh=self.host_mesh_name().map(|n| n.to_string()),
884+
proc_mesh=self.name.to_string(),
885+
actor_mesh=name.to_string(),
886+
))]
878887
pub(crate) async fn spawn_with_name<A: Actor + Referable>(
879888
&self,
880889
cx: &impl context::Actor,
881890
name: Name,
882891
params: &A::Params,
883892
) -> v1::Result<ActorMesh<A>>
893+
where
894+
A::Params: RemoteMessage,
895+
{
896+
tracing::info!(
897+
name = "ProcMeshStatus",
898+
status = "ActorMesh::Spawn::Attempt",
899+
);
900+
tracing::info!(name = "ActorMeshStatus", status = "Spawn::Attempt");
901+
let result = self.spawn_with_name_inner(cx, name, params).await;
902+
match &result {
903+
Ok(_) => {
904+
tracing::info!(
905+
name = "ProcMeshStatus",
906+
status = "ActorMesh::Spawn::Success",
907+
);
908+
tracing::info!(name = "ActorMeshStatus", status = "Spawn::Success");
909+
}
910+
Err(error) => {
911+
tracing::error!(name = "ProcMeshStatus", status = "ActorMesh::Spawn::Failed", %error);
912+
tracing::error!(name = "ActorMeshStatus", status = "Spawn::Failed", %error);
913+
}
914+
}
915+
result
916+
}
917+
918+
async fn spawn_with_name_inner<A: Actor + Referable>(
919+
&self,
920+
cx: &impl context::Actor,
921+
name: Name,
922+
params: &A::Params,
923+
) -> v1::Result<ActorMesh<A>>
884924
where
885925
A::Params: RemoteMessage,
886926
{
@@ -998,10 +1038,36 @@ impl ProcMeshRef {
9981038
}
9991039

10001040
/// Send stop actors message to all mesh agents for a specific mesh name
1041+
#[hyperactor::instrument(fields(
1042+
host_mesh = self.host_mesh_name().map(|n| n.to_string()),
1043+
proc_mesh = self.name.to_string(),
1044+
actor_mesh = mesh_name.to_string(),
1045+
))]
10011046
pub(crate) async fn stop_actor_by_name(
10021047
&self,
10031048
cx: &impl context::Actor,
10041049
mesh_name: Name,
1050+
) -> v1::Result<()> {
1051+
tracing::info!(name = "ProcMeshStatus", status = "ActorMesh::Stop::Attempt");
1052+
tracing::info!(name = "ActorMeshStatus", status = "Stop::Attempt");
1053+
let result = self.stop_actor_by_name_inner(cx, mesh_name).await;
1054+
match &result {
1055+
Ok(_) => {
1056+
tracing::info!(name = "ProcMeshStatus", status = "ActorMesh::Stop::Success");
1057+
tracing::info!(name = "ActorMeshStatus", status = "Stop::Success");
1058+
}
1059+
Err(error) => {
1060+
tracing::error!(name = "ProcMeshStatus", status = "ActorMesh::Stop::Failed", %error);
1061+
tracing::error!(name = "ActorMeshStatus", status = "Stop::Failed", %error);
1062+
}
1063+
}
1064+
result
1065+
}
1066+
1067+
async fn stop_actor_by_name_inner(
1068+
&self,
1069+
cx: &impl context::Actor,
1070+
mesh_name: Name,
10051071
) -> v1::Result<()> {
10061072
let region = self.region().clone();
10071073
let agent_mesh = self.agent_mesh();

0 commit comments

Comments
 (0)