Skip to content

Commit b1aeca9

Browse files
dulinrileymeta-codesync[bot]
authored andcommitted
Stop the proc mesh from the alloc in HostMesh::shutdown (#1963)
Summary: Pull Request resolved: #1963 In order to stop the remote process allocation, we need to call `Alloc::stop`, which is done when the ProcMesh created from the allocation is itself stopped. This should ensure jobs finish entirely. Note that the Drop for ProcMesh and Drop for RemoteProcessAlloc both do not try to stop the underlying allocation, which is why this is necessary. Reviewed By: pzhan9 Differential Revision: D87264086 fbshipit-source-id: 68acbecc04dc3ef53ae6d098314cd0c6bae9755f
1 parent 6b22037 commit b1aeca9

File tree

5 files changed

+30
-13
lines changed

5 files changed

+30
-13
lines changed

hyperactor_mesh/src/bootstrap.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3622,7 +3622,7 @@ mod tests {
36223622
// stored `BootstrapProcManager`, which does a
36233623
// `Command::spawn()` to launch a new OS child process for
36243624
// that proc.
3625-
let host_mesh = HostMesh::allocate(&instance, Box::new(alloc), "test", None)
3625+
let mut host_mesh = HostMesh::allocate(&instance, Box::new(alloc), "test", None)
36263626
.await
36273627
.unwrap();
36283628

hyperactor_mesh/src/v1/actor_mesh.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -789,7 +789,7 @@ mod tests {
789789
let _guard = config.override_key(crate::bootstrap::MESH_BOOTSTRAP_ENABLE_PDEATHSIG, false);
790790

791791
let instance = testing::instance().await;
792-
let host_mesh = testing::host_mesh(extent!(host = 4)).await;
792+
let mut host_mesh = testing::host_mesh(extent!(host = 4)).await;
793793
let proc_mesh = host_mesh
794794
.spawn(instance, "test", Extent::unity())
795795
.await

hyperactor_mesh/src/v1/host_mesh.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,7 @@ impl HostMesh {
476476
/// table and sends SIGKILL to any procs it spawned—tying proc
477477
/// lifetimes to their hosts and preventing leaks.
478478
#[hyperactor::instrument(fields(host_mesh=self.name.to_string()))]
479-
pub async fn shutdown(&self, cx: &impl hyperactor::context::Actor) -> anyhow::Result<()> {
479+
pub async fn shutdown(&mut self, cx: &impl hyperactor::context::Actor) -> anyhow::Result<()> {
480480
tracing::info!(name = "HostMeshStatus", status = "Shutdown::Attempt");
481481
let mut failed_hosts = vec![];
482482
for host in self.current_ref.values() {
@@ -501,6 +501,13 @@ impl HostMesh {
501501
failed_hosts
502502
);
503503
}
504+
505+
match &mut self.allocation {
506+
HostMeshAllocation::ProcMesh { proc_mesh, .. } => {
507+
proc_mesh.stop(cx).await?;
508+
}
509+
HostMeshAllocation::Owned { .. } => {}
510+
}
504511
Ok(())
505512
}
506513
}
@@ -1325,7 +1332,7 @@ mod tests {
13251332
let instance = testing::instance().await;
13261333

13271334
for alloc in testing::allocs(extent!(replicas = 4)).await {
1328-
let host_mesh = HostMesh::allocate(instance, alloc, "test", None)
1335+
let mut host_mesh = HostMesh::allocate(instance, alloc, "test", None)
13291336
.await
13301337
.unwrap();
13311338

monarch_hyperactor/src/v1/host_mesh.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -197,12 +197,22 @@ impl PyHostMesh {
197197
match self {
198198
PyHostMesh::Owned(inner) => {
199199
let instance = instance.clone();
200-
let mesh_borrow = inner.0.borrow().map_err(anyhow::Error::from)?;
200+
let mesh_borrow = inner.0.clone();
201201
let fut = async move {
202-
instance_dispatch!(instance, |cx_instance| {
203-
mesh_borrow.shutdown(cx_instance).await
204-
})?;
205-
Ok(())
202+
match mesh_borrow.take().await {
203+
Ok(mut mesh) => {
204+
instance_dispatch!(instance, |cx_instance| {
205+
mesh.shutdown(cx_instance).await
206+
})?;
207+
Ok(())
208+
}
209+
Err(_) => {
210+
// Don't return an exception, silently ignore the stop request
211+
// because it was already done.
212+
tracing::info!("shutdown was already called on host mesh");
213+
Ok(())
214+
}
215+
}
206216
};
207217
PyPythonTask::new(fut)
208218
}

monarch_hyperactor/src/v1/logging.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@ mod tests {
511511

512512
#[tokio::test]
513513
async fn test_world_smoke() {
514-
let (proc, instance, host_mesh, proc_mesh) = test_world().await.expect("world failed");
514+
let (proc, instance, mut host_mesh, proc_mesh) = test_world().await.expect("world failed");
515515

516516
assert_eq!(
517517
host_mesh.region().num_ranks(),
@@ -534,7 +534,7 @@ mod tests {
534534

535535
#[tokio::test]
536536
async fn spawn_respects_forwarding_flag() {
537-
let (_, instance, host_mesh, proc_mesh) = test_world().await.expect("world failed");
537+
let (_, instance, mut host_mesh, proc_mesh) = test_world().await.expect("world failed");
538538

539539
let py_instance = PyInstance::from(&instance);
540540
let py_proc_mesh = PyProcMesh::new_owned(proc_mesh);
@@ -591,7 +591,7 @@ mod tests {
591591

592592
#[tokio::test]
593593
async fn set_mode_behaviors() {
594-
let (_proc, instance, host_mesh, proc_mesh) = test_world().await.expect("world failed");
594+
let (_proc, instance, mut host_mesh, proc_mesh) = test_world().await.expect("world failed");
595595

596596
let py_instance = PyInstance::from(&instance);
597597
let py_proc_mesh = PyProcMesh::new_owned(proc_mesh);
@@ -706,7 +706,7 @@ mod tests {
706706

707707
#[tokio::test]
708708
async fn flush_behaviors() {
709-
let (_proc, instance, host_mesh, proc_mesh) = test_world().await.expect("world failed");
709+
let (_proc, instance, mut host_mesh, proc_mesh) = test_world().await.expect("world failed");
710710

711711
let py_instance = PyInstance::from(&instance);
712712
let py_proc_mesh = PyProcMesh::new_owned(proc_mesh);

0 commit comments

Comments
 (0)