diff --git a/hyperactor/src/lib.rs b/hyperactor/src/lib.rs index 25e3614fa..8a30e9c51 100644 --- a/hyperactor/src/lib.rs +++ b/hyperactor/src/lib.rs @@ -92,7 +92,7 @@ pub mod supervision; pub mod sync; /// Test utilities pub mod test_utils; -mod time; +pub mod time; pub use actor::Actor; pub use actor::ActorHandle; diff --git a/hyperactor/src/time.rs b/hyperactor/src/time.rs index f0a4008ac..2f89fae24 100644 --- a/hyperactor/src/time.rs +++ b/hyperactor/src/time.rs @@ -7,6 +7,7 @@ */ //! This module contains various utilities for dealing with time. +//! (This probably belongs in a separate crate.) use std::sync::Arc; use std::sync::Mutex; diff --git a/hyperactor_mesh/src/resource.rs b/hyperactor_mesh/src/resource.rs index f239b91be..c7c251b91 100644 --- a/hyperactor_mesh/src/resource.rs +++ b/hyperactor_mesh/src/resource.rs @@ -10,12 +10,17 @@ //! in hyperactor meshes. use core::slice::GetDisjointMutIndex as _; +use std::collections::HashMap; +use std::fmt; use std::fmt::Debug; use std::hash::Hash; use std::marker::PhantomData; use std::mem::replace; use std::mem::take; +use std::ops::Deref; +use std::ops::DerefMut; use std::ops::Range; +use std::time::Duration; use enum_as_inner::EnumAsInner; use hyperactor::Bind; @@ -30,6 +35,8 @@ use hyperactor::accum::Accumulator; use hyperactor::accum::CommReducer; use hyperactor::accum::ReducerFactory; use hyperactor::accum::ReducerSpec; +use hyperactor::mailbox::MailboxError; +use hyperactor::mailbox::PortReceiver; use hyperactor::message::Bind; use hyperactor::message::Bindings; use hyperactor::message::Unbind; @@ -50,7 +57,8 @@ use crate::v1::Name; PartialEq, Eq, Hash, - EnumAsInner + EnumAsInner, + strum::Display )] pub enum Status { /// The resource does not exist. @@ -65,12 +73,17 @@ pub enum Status { Stopped, /// The resource has failed, with an error message. Failed(String), + /// The resource has been declared failed after a timeout. + Timeout(Duration), } impl Status { /// Returns whether the status is a terminating status. pub fn is_terminating(&self) -> bool { - matches!(self, Status::Stopping | Status::Stopped | Status::Failed(_)) + matches!( + self, + Status::Stopping | Status::Stopped | Status::Failed(_) | Status::Timeout(_) + ) } } @@ -128,6 +141,34 @@ pub struct GetRankStatus { pub reply: PortRef>, } +impl GetRankStatus { + pub async fn wait( + mut rx: PortReceiver>, + num_ranks: usize, + max_idle_time: Duration, + ) -> Result, RankedValues> { + let mut alarm = hyperactor::time::Alarm::new(); + alarm.arm(max_idle_time); + let mut statuses = RankedValues::default(); + loop { + let mut sleeper = alarm.sleeper(); + tokio::select! { + _ = sleeper.sleep() => return Err(statuses), + new_statuses = rx.recv() => { + match new_statuses { + Ok(new_statuses) => statuses = new_statuses, + Err(_) => return Err(statuses), + } + } + } + alarm.arm(max_idle_time); + if statuses.rank(num_ranks) == num_ranks { + break Ok(statuses); + } + } + } +} + /// The state of a resource. #[derive(Clone, Debug, Serialize, Deserialize, Named, PartialEq, Eq)] pub struct State { @@ -214,6 +255,14 @@ pub struct RankedValues { intervals: Vec<(Range, T)>, } +impl PartialEq for RankedValues { + fn eq(&self, other: &Self) -> bool { + self.intervals == other.intervals + } +} + +impl Eq for RankedValues {} + impl Default for RankedValues { fn default() -> Self { Self { @@ -238,6 +287,28 @@ impl RankedValues { } } +impl RankedValues { + pub fn materialized_iter(&self, until: usize) -> impl Iterator + '_ { + assert_eq!(self.rank(until), until, "insufficient rank"); + self.iter() + .flat_map(|(range, value)| std::iter::repeat(value).take(range.end - range.start)) + } +} + +impl RankedValues { + /// Invert this ranked values into a [`ValuesByRank`]. + pub fn invert(&self) -> ValuesByRank { + let mut inverted: HashMap>> = HashMap::new(); + for (range, value) in self.iter() { + inverted + .entry(value.clone()) + .or_default() + .push(range.clone()); + } + ValuesByRank { values: inverted } + } +} + impl RankedValues { /// Merge `other` into this set of ranked values. Values in `other` that overlap /// with `self` take prededence. @@ -298,6 +369,11 @@ impl RankedValues { } } + /// Merge the contents of this RankedValues into another RankedValues. + pub fn merge_into(self, other: &mut Self) { + other.merge_from(self); + } + fn append(&mut self, range: Range, value: T) { if let Some(last) = self.intervals.last_mut() && last.0.end == range.start @@ -310,6 +386,15 @@ impl RankedValues { } } +impl RankedValues { + pub fn first_terminating(&self) -> Option<(usize, Status)> { + self.intervals + .iter() + .find(|(_, status)| status.is_terminating()) + .map(|(range, status)| (range.start, status.clone())) + } +} + impl From<(usize, T)> for RankedValues { fn from((rank, value): (usize, T)) -> Self { Self { @@ -318,6 +403,67 @@ impl From<(usize, T)> for RankedValues { } } +impl From<(Range, T)> for RankedValues { + fn from((range, value): (Range, T)) -> Self { + Self { + intervals: vec![(range, value)], + } + } +} + +/// An inverted index of RankedValues, providing all ranks for +/// which each unique T-typed value appears. +#[derive(Clone, Debug)] +pub struct ValuesByRank { + values: HashMap>>, +} + +impl PartialEq for ValuesByRank { + fn eq(&self, other: &Self) -> bool { + self.values == other.values + } +} + +impl Eq for ValuesByRank {} + +impl fmt::Display for ValuesByRank { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut first_value = true; + for (value, ranges) in self.iter() { + if first_value { + first_value = false; + } else { + write!(f, ";")?; + } + write!(f, "{}=", value)?; + let mut first_range = true; + for range in ranges.iter() { + if first_range { + first_range = false; + } else { + write!(f, ",")?; + } + write!(f, "{}..{}", range.start, range.end)?; + } + } + Ok(()) + } +} + +impl Deref for ValuesByRank { + type Target = HashMap>>; + + fn deref(&self) -> &Self::Target { + &self.values + } +} + +impl DerefMut for ValuesByRank { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.values + } +} + /// Enabled for test only because we have to guarantee that the input /// iterator is well-formed. #[cfg(test)] @@ -425,4 +571,36 @@ mod tests { assert_eq!(left.rank(70), 62); assert_eq!(left.rank(100), 62); } + + #[test] + fn test_equality() { + assert_eq!( + RankedValues::from((0..10, 123)), + RankedValues::from((0..10, 123)) + ); + assert_eq!( + RankedValues::from((0..10, Status::Failed("foo".to_string()))), + RankedValues::from((0..10, Status::Failed("foo".to_string()))), + ); + } + + #[test] + fn test_default_through_merging() { + let values: RankedValues = + [(0..10, 1), (15..20, 1), (30..50, 1)].into_iter().collect(); + + let mut default = RankedValues::from((0..50, 0)); + default.merge_from(values); + + assert_eq!( + default.iter().cloned().collect::>(), + vec![ + (0..10, 1), + (10..15, 0), + (15..20, 1), + (20..30, 0), + (30..50, 1) + ] + ); + } } diff --git a/hyperactor_mesh/src/v1.rs b/hyperactor_mesh/src/v1.rs index a2af7580a..d566f877d 100644 --- a/hyperactor_mesh/src/v1.rs +++ b/hyperactor_mesh/src/v1.rs @@ -17,10 +17,14 @@ pub mod testactor; pub mod testing; pub mod value_mesh; +use std::collections::HashMap; +use std::ops::Range; use std::str::FromStr; +use std::time::Duration; pub use actor_mesh::ActorMesh; pub use actor_mesh::ActorMeshRef; +use enum_as_inner::EnumAsInner; pub use host_mesh::HostMeshRef; use hyperactor::ActorId; use hyperactor::ActorRef; @@ -33,12 +37,15 @@ use serde::Serialize; pub use value_mesh::ValueMesh; use crate::resource; +use crate::resource::RankedValues; +use crate::resource::Status; +use crate::resource::ValuesByRank; use crate::shortuuid::ShortUuid; use crate::v1::host_mesh::HostMeshAgent; use crate::v1::host_mesh::HostMeshRefParseError; /// Errors that occur during mesh operations. -#[derive(Debug, thiserror::Error)] +#[derive(Debug, EnumAsInner, thiserror::Error)] pub enum Error { #[error("invalid mesh ref: expected {expected} ranks, but contains {actual} ranks")] InvalidRankCardinality { expected: usize, actual: usize }, @@ -96,6 +103,18 @@ pub enum Error { status: resource::Status, }, + #[error( + "error spawning proc mesh: statuses: {}", + RankedValues::invert(&*.statuses) + )] + ProcSpawnError { statuses: RankedValues }, + + #[error( + "error spawning actor mesh: statuses: {}", + RankedValues::invert(&*.statuses) + )] + ActorSpawnError { statuses: RankedValues }, + #[error("error: {0} does not exist")] NotExist(Name), } diff --git a/hyperactor_mesh/src/v1/host_mesh.rs b/hyperactor_mesh/src/v1/host_mesh.rs index f951df479..661297d4c 100644 --- a/hyperactor_mesh/src/v1/host_mesh.rs +++ b/hyperactor_mesh/src/v1/host_mesh.rs @@ -7,12 +7,18 @@ */ use hyperactor::channel::ChannelTransport; +use hyperactor::clock::Clock; +use hyperactor::clock::RealClock; +use hyperactor::config; +use hyperactor::config::CONFIG_ENV_VAR; +use hyperactor::declare_attrs; pub mod mesh_agent; use std::collections::HashSet; use std::ops::Deref; use std::str::FromStr; use std::sync::Arc; +use std::time::Duration; use hyperactor::ActorRef; use hyperactor::Named; @@ -33,8 +39,10 @@ use crate::alloc::Alloc; use crate::bootstrap::BootstrapCommand; use crate::resource; use crate::resource::CreateOrUpdateClient; +use crate::resource::GetRankStatus; use crate::resource::GetRankStatusClient; use crate::resource::RankedValues; +use crate::resource::Status; use crate::v1; use crate::v1::Name; use crate::v1::ProcMesh; @@ -44,6 +52,12 @@ use crate::v1::host_mesh::mesh_agent::HostMeshAgentProcMeshTrampoline; use crate::v1::host_mesh::mesh_agent::ShutdownHostClient; use crate::v1::proc_mesh::ProcRef; +declare_attrs! { + /// The maximum idle time between updates while spawning proc meshes. + @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESH_PROC_SPAWN_MAX_IDLE".to_string()) + pub attr PROC_SPAWN_MAX_IDLE: Duration = Duration::from_secs(30); +} + /// A reference to a single host. #[derive(Debug, Clone, PartialEq, Eq, Hash, Named, Serialize, Deserialize)] pub struct HostRef(ChannelAddr); @@ -520,7 +534,7 @@ impl HostMeshRef { let mesh_name = Name::new(name); let mut procs = Vec::new(); let num_ranks = self.region().num_ranks() * per_host.num_ranks(); - let (port, mut rx) = cx.mailbox().open_accum_port(RankedValues::default()); + let (port, rx) = cx.mailbox().open_accum_port(RankedValues::default()); // We CreateOrUpdate each proc, and then fence on getting statuses back. // This is currently necessary because otherwise there is a race between // the procs being created, and subsequent messages becoming unroutable @@ -557,24 +571,27 @@ impl HostMeshRef { } } - // fence: wait for everyone to report back. - loop { - let statuses = rx.recv().await?; - if let Some((ranks, status)) = - statuses.iter().find(|(_, status)| status.is_terminating()) - { - let rank = ranks.start; - let proc_name = Name::new(format!("{}-{}", name, rank % per_host.num_ranks())); - return Err(v1::Error::ProcCreationError { - proc_name, - mesh_agent: self.ranks[rank].mesh_agent(), - host_rank: rank / per_host.num_ranks(), - status: status.clone(), - }); + let start_time = RealClock.now(); + + match GetRankStatus::wait(rx, num_ranks, config::global::get(PROC_SPAWN_MAX_IDLE)).await { + Ok(statuses) => { + if let Some((rank, status)) = statuses.first_terminating() { + let proc_name = Name::new(format!("{}-{}", name, rank % per_host.num_ranks())); + return Err(v1::Error::ProcCreationError { + proc_name, + mesh_agent: self.ranks[rank].mesh_agent(), + host_rank: rank / per_host.num_ranks(), + status: status.clone(), + }); + } } + Err(complete) => { + // Fill the remaining statuses with a timeout error. + let mut statuses = + RankedValues::from((0..num_ranks, Status::Timeout(start_time.elapsed()))); + statuses.merge_from(complete); - if statuses.rank(num_ranks) == num_ranks { - break; + return Err(v1::Error::ProcSpawnError { statuses }); } } @@ -670,6 +687,7 @@ mod tests { use super::*; use crate::Bootstrap; + use crate::resource::Status; use crate::v1::ActorMesh; use crate::v1::testactor; use crate::v1::testing; @@ -859,7 +877,7 @@ mod tests { #[tokio::test] async fn test_failing_proc_allocation() { - let program = buck_resources::get("monarch/hyperactor_mesh/bootstrap").unwrap(); + let program = crate::testresource::get("monarch/hyperactor_mesh/bootstrap"); let hosts = vec![free_localhost_addr(), free_localhost_addr()]; @@ -870,7 +888,7 @@ mod tests { addr: host.clone(), config: None, // The entire purpose of this is to fail: - command: Some(BootstrapCommand::from("/bin/false")), + command: Some(BootstrapCommand::from("false")), }; boot.to_env(&mut cmd); cmd.kill_on_drop(true); @@ -889,4 +907,48 @@ mod tests { if msg.contains("failed to configure process: Terminal(Stopped { exit_code: 1") ); } + + #[tokio::test] + async fn test_halting_proc_allocation() { + let config = config::global::lock(); + let _guard1 = config.override_key(PROC_SPAWN_MAX_IDLE, Duration::from_secs(5)); + + let program = crate::testresource::get("monarch/hyperactor_mesh/bootstrap"); + + let hosts = vec![free_localhost_addr(), free_localhost_addr()]; + + let mut children = Vec::new(); + + for (index, host) in hosts.iter().enumerate() { + let mut cmd = Command::new(program.clone()); + let command = if index == 0 { + let mut command = BootstrapCommand::from("sleep"); + command.args.push("60".to_string()); + Some(command) + } else { + None + }; + let boot = Bootstrap::Host { + addr: host.clone(), + config: None, + command, + }; + boot.to_env(&mut cmd); + cmd.kill_on_drop(true); + children.push(cmd.spawn().unwrap()); + } + let host_mesh = HostMeshRef::from_hosts(hosts); + + let instance = testing::instance().await; + + let err = host_mesh + .spawn(&instance, "test", Extent::unity()) + .await + .unwrap_err(); + let statuses = err.into_proc_spawn_error().unwrap(); + assert_matches!( + &statuses.materialized_iter(2).cloned().collect::>()[..], + &[Status::Timeout(_), Status::Running] + ); + } } diff --git a/hyperactor_mesh/src/v1/proc_mesh.rs b/hyperactor_mesh/src/v1/proc_mesh.rs index b0d04a33e..e98f6209c 100644 --- a/hyperactor_mesh/src/v1/proc_mesh.rs +++ b/hyperactor_mesh/src/v1/proc_mesh.rs @@ -11,6 +11,7 @@ use std::collections::HashMap; use std::fmt; use std::ops::Deref; use std::sync::Arc; +use std::time::Duration; use hyperactor::Actor; use hyperactor::ActorId; @@ -22,7 +23,12 @@ use hyperactor::actor::Referable; use hyperactor::actor::remote::Remote; use hyperactor::channel; use hyperactor::channel::ChannelAddr; +use hyperactor::clock::Clock; +use hyperactor::clock::RealClock; +use hyperactor::config; +use hyperactor::config::CONFIG_ENV_VAR; use hyperactor::context; +use hyperactor::declare_attrs; use hyperactor::mailbox::DialMailboxRouter; use hyperactor::mailbox::MailboxServer; use ndslice::Extent; @@ -47,7 +53,9 @@ use crate::proc_mesh::mesh_agent::MeshAgentMessageClient; use crate::proc_mesh::mesh_agent::ProcMeshAgent; use crate::proc_mesh::mesh_agent::ReconfigurableMailboxSender; use crate::resource; +use crate::resource::GetRankStatus; use crate::resource::RankedValues; +use crate::resource::Status; use crate::v1; use crate::v1::ActorMesh; use crate::v1::ActorMeshRef; @@ -56,6 +64,12 @@ use crate::v1::HostMeshRef; use crate::v1::Name; use crate::v1::ValueMesh; +declare_attrs! { + /// The maximum idle time between updates while spawning actor meshes. + @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESH_ACTOR_SPAWN_MAX_IDLE".to_string()) + pub attr ACTOR_SPAWN_MAX_IDLE: Duration = Duration::from_secs(30); +} + /// A reference to a single [`hyperactor::Proc`]. #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] pub struct ProcRef { @@ -595,7 +609,7 @@ impl ProcMeshRef { }, )?; - let (port, mut rx) = cx.mailbox().open_accum_port(RankedValues::default()); + let (port, rx) = cx.mailbox().open_accum_port(RankedValues::default()); self.agent_mesh().cast( cx, @@ -605,34 +619,33 @@ impl ProcMeshRef { }, )?; - // Wait for everyone to report back. - // TODO: move out of critical path - let statuses = loop { - let statuses = rx.recv().await?; - if statuses.rank(self.ranks.len()) == self.ranks.len() { - break statuses; - } - }; + let start_time = RealClock.now(); - let failed: Vec<_> = statuses - .iter() - .filter_map(|(ranks, status)| { - if status.is_terminating() { - Some(ranks.clone()) + match GetRankStatus::wait( + rx, + self.ranks.len(), + config::global::get(ACTOR_SPAWN_MAX_IDLE), + ) + .await + { + Ok(statuses) => { + if statuses.first_terminating().is_none() { + Ok(ActorMesh::new(self.clone(), name)) } else { - None + Err(Error::ActorSpawnError { statuses }) } - }) - .flatten() - .collect(); - if !failed.is_empty() { - return Err(Error::GspawnError( - name, - format!("failed ranks: {:?}", failed,), - )); + } + Err(complete) => { + // Fill the remaining statuses with a timeout error. + let mut statuses = RankedValues::from(( + 0..self.ranks.len(), + Status::Timeout(start_time.elapsed()), + )); + statuses.merge_from(complete); + + Err(Error::ActorSpawnError { statuses }) + } } - - Ok(ActorMesh::new(self.clone(), name)) } } @@ -670,13 +683,12 @@ impl view::RankedSliceable for ProcMeshRef { #[cfg(test)] mod tests { - use std::assert_matches::assert_matches; - use ndslice::ViewExt; use ndslice::extent; use timed_test::async_timed_test; - use crate::v1; + use crate::resource::RankedValues; + use crate::resource::Status; use crate::v1::testactor; use crate::v1::testing; @@ -725,7 +737,11 @@ mod tests { .spawn::(instance, "testfail", &()) .await .unwrap_err(); - assert_matches!(err, v1::Error::GspawnError(_, _)) + let statuses = err.into_actor_spawn_error().unwrap(); + assert_eq!( + statuses, + RankedValues::from((0..8, Status::Failed("test failure".to_string()))), + ); } } }