Skip to content

Commit 88f0b9d

Browse files
committed
[hyperactor] mesh: ensure liveness for proc and actor spawning
Make proc and actor spawning subject to a liveness parameter: we expect the spawn to either complete, or else yile updated regularly (configured by a new parameter `PROC_SPAWN_MAX_IDLE` and `ACTOR_SPAWN_MAX_IDLE`. This is a bridge solution to provide good error messaging, and to prevent applications from halting. Once we have owned host meshes w/ their own comm actors, we can use the general purpose liveness mechanism for this. We also clean up the error messages, refactor the status fence, and print the status of each individual proc on failure. Differential Revision: [D83882826](https://our.internmc.facebook.com/intern/diff/D83882826/) **NOTE FOR REVIEWERS**: This PR has internal Meta-specific changes or comments, please review them on [Phabricator](https://our.internmc.facebook.com/intern/diff/D83882826/)! ghstack-source-id: 314019055 Pull Request resolved: #1426
1 parent fc877dd commit 88f0b9d

File tree

6 files changed

+304
-52
lines changed

6 files changed

+304
-52
lines changed

hyperactor/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ pub mod supervision;
9292
pub mod sync;
9393
/// Test utilities
9494
pub mod test_utils;
95-
mod time;
95+
pub mod time;
9696

9797
pub use actor::Actor;
9898
pub use actor::ActorHandle;

hyperactor/src/time.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
*/
88

99
//! This module contains various utilities for dealing with time.
10+
//! (This probably belongs in a separate crate.)
1011
1112
use std::sync::Arc;
1213
use std::sync::Mutex;

hyperactor_mesh/src/resource.rs

Lines changed: 160 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,17 @@
1010
//! in hyperactor meshes.
1111
1212
use core::slice::GetDisjointMutIndex as _;
13+
use std::collections::HashMap;
14+
use std::fmt;
1315
use std::fmt::Debug;
1416
use std::hash::Hash;
1517
use std::marker::PhantomData;
1618
use std::mem::replace;
1719
use std::mem::take;
20+
use std::ops::Deref;
21+
use std::ops::DerefMut;
1822
use std::ops::Range;
23+
use std::time::Duration;
1924

2025
use enum_as_inner::EnumAsInner;
2126
use hyperactor::Bind;
@@ -30,6 +35,8 @@ use hyperactor::accum::Accumulator;
3035
use hyperactor::accum::CommReducer;
3136
use hyperactor::accum::ReducerFactory;
3237
use hyperactor::accum::ReducerSpec;
38+
use hyperactor::mailbox::MailboxError;
39+
use hyperactor::mailbox::PortReceiver;
3340
use hyperactor::message::Bind;
3441
use hyperactor::message::Bindings;
3542
use hyperactor::message::Unbind;
@@ -50,7 +57,8 @@ use crate::v1::Name;
5057
PartialEq,
5158
Eq,
5259
Hash,
53-
EnumAsInner
60+
EnumAsInner,
61+
strum::Display
5462
)]
5563
pub enum Status {
5664
/// The resource does not exist.
@@ -65,12 +73,17 @@ pub enum Status {
6573
Stopped,
6674
/// The resource has failed, with an error message.
6775
Failed(String),
76+
/// The resource has been declared failed after a timeout.
77+
Timeout(Duration),
6878
}
6979

7080
impl Status {
7181
/// Returns whether the status is a terminating status.
7282
pub fn is_terminating(&self) -> bool {
73-
matches!(self, Status::Stopping | Status::Stopped | Status::Failed(_))
83+
matches!(
84+
self,
85+
Status::Stopping | Status::Stopped | Status::Failed(_) | Status::Timeout(_)
86+
)
7487
}
7588
}
7689

@@ -128,6 +141,34 @@ pub struct GetRankStatus {
128141
pub reply: PortRef<RankedValues<Status>>,
129142
}
130143

144+
impl GetRankStatus {
145+
pub async fn wait(
146+
mut rx: PortReceiver<RankedValues<Status>>,
147+
num_ranks: usize,
148+
max_idle_time: Duration,
149+
) -> Result<RankedValues<Status>, RankedValues<Status>> {
150+
let mut alarm = hyperactor::time::Alarm::new();
151+
alarm.arm(max_idle_time);
152+
let mut statuses = RankedValues::default();
153+
loop {
154+
let mut sleeper = alarm.sleeper();
155+
tokio::select! {
156+
_ = sleeper.sleep() => return Err(statuses),
157+
new_statuses = rx.recv() => {
158+
match new_statuses {
159+
Ok(new_statuses) => statuses = new_statuses,
160+
Err(_) => return Err(statuses),
161+
}
162+
}
163+
}
164+
alarm.arm(max_idle_time);
165+
if statuses.rank(num_ranks) == num_ranks {
166+
break Ok(statuses);
167+
}
168+
}
169+
}
170+
}
171+
131172
/// The state of a resource.
132173
#[derive(Clone, Debug, Serialize, Deserialize, Named, PartialEq, Eq)]
133174
pub struct State<S> {
@@ -214,6 +255,14 @@ pub struct RankedValues<T> {
214255
intervals: Vec<(Range<usize>, T)>,
215256
}
216257

258+
impl<T: PartialEq> PartialEq for RankedValues<T> {
259+
fn eq(&self, other: &Self) -> bool {
260+
self.intervals == other.intervals
261+
}
262+
}
263+
264+
impl<T: Eq> Eq for RankedValues<T> {}
265+
217266
impl<T> Default for RankedValues<T> {
218267
fn default() -> Self {
219268
Self {
@@ -238,6 +287,28 @@ impl<T> RankedValues<T> {
238287
}
239288
}
240289

290+
impl<T: Clone> RankedValues<T> {
291+
pub fn materialized_iter(&self, until: usize) -> impl Iterator<Item = &T> + '_ {
292+
assert_eq!(self.rank(until), until, "insufficient rank");
293+
self.iter()
294+
.flat_map(|(range, value)| std::iter::repeat(value).take(range.end - range.start))
295+
}
296+
}
297+
298+
impl<T: Hash + Eq + Clone> RankedValues<T> {
299+
/// Invert this ranked values into a [`ValuesByRank<T>`].
300+
pub fn invert(&self) -> ValuesByRank<T> {
301+
let mut inverted: HashMap<T, Vec<Range<usize>>> = HashMap::new();
302+
for (range, value) in self.iter() {
303+
inverted
304+
.entry(value.clone())
305+
.or_default()
306+
.push(range.clone());
307+
}
308+
ValuesByRank { values: inverted }
309+
}
310+
}
311+
241312
impl<T: Eq + Clone> RankedValues<T> {
242313
/// Merge `other` into this set of ranked values. Values in `other` that overlap
243314
/// with `self` take prededence.
@@ -298,6 +369,11 @@ impl<T: Eq + Clone> RankedValues<T> {
298369
}
299370
}
300371

372+
/// Merge the contents of this RankedValues into another RankedValues.
373+
pub fn merge_into(self, other: &mut Self) {
374+
other.merge_from(self);
375+
}
376+
301377
fn append(&mut self, range: Range<usize>, value: T) {
302378
if let Some(last) = self.intervals.last_mut()
303379
&& last.0.end == range.start
@@ -310,6 +386,15 @@ impl<T: Eq + Clone> RankedValues<T> {
310386
}
311387
}
312388

389+
impl RankedValues<Status> {
390+
pub fn first_terminating(&self) -> Option<(usize, Status)> {
391+
self.intervals
392+
.iter()
393+
.find(|(_, status)| status.is_terminating())
394+
.map(|(range, status)| (range.start, status.clone()))
395+
}
396+
}
397+
313398
impl<T> From<(usize, T)> for RankedValues<T> {
314399
fn from((rank, value): (usize, T)) -> Self {
315400
Self {
@@ -318,6 +403,67 @@ impl<T> From<(usize, T)> for RankedValues<T> {
318403
}
319404
}
320405

406+
impl<T> From<(Range<usize>, T)> for RankedValues<T> {
407+
fn from((range, value): (Range<usize>, T)) -> Self {
408+
Self {
409+
intervals: vec![(range, value)],
410+
}
411+
}
412+
}
413+
414+
/// An inverted index of RankedValues, providing all ranks for
415+
/// which each unique T-typed value appears.
416+
#[derive(Clone, Debug)]
417+
pub struct ValuesByRank<T> {
418+
values: HashMap<T, Vec<Range<usize>>>,
419+
}
420+
421+
impl<T: Eq + Hash> PartialEq for ValuesByRank<T> {
422+
fn eq(&self, other: &Self) -> bool {
423+
self.values == other.values
424+
}
425+
}
426+
427+
impl<T: Eq + Hash> Eq for ValuesByRank<T> {}
428+
429+
impl<T: fmt::Display> fmt::Display for ValuesByRank<T> {
430+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
431+
let mut first_value = true;
432+
for (value, ranges) in self.iter() {
433+
if first_value {
434+
first_value = false;
435+
} else {
436+
write!(f, ";")?;
437+
}
438+
write!(f, "{}=", value)?;
439+
let mut first_range = true;
440+
for range in ranges.iter() {
441+
if first_range {
442+
first_range = false;
443+
} else {
444+
write!(f, ",")?;
445+
}
446+
write!(f, "{}..{}", range.start, range.end)?;
447+
}
448+
}
449+
Ok(())
450+
}
451+
}
452+
453+
impl<T> Deref for ValuesByRank<T> {
454+
type Target = HashMap<T, Vec<Range<usize>>>;
455+
456+
fn deref(&self) -> &Self::Target {
457+
&self.values
458+
}
459+
}
460+
461+
impl<T> DerefMut for ValuesByRank<T> {
462+
fn deref_mut(&mut self) -> &mut Self::Target {
463+
&mut self.values
464+
}
465+
}
466+
321467
/// Enabled for test only because we have to guarantee that the input
322468
/// iterator is well-formed.
323469
#[cfg(test)]
@@ -425,4 +571,16 @@ mod tests {
425571
assert_eq!(left.rank(70), 62);
426572
assert_eq!(left.rank(100), 62);
427573
}
574+
575+
#[test]
576+
fn test_equality() {
577+
assert_eq!(
578+
RankedValues::from((0..10, 123)),
579+
RankedValues::from((0..10, 123))
580+
);
581+
assert_eq!(
582+
RankedValues::from((0..10, Status::Failed("foo".to_string()))),
583+
RankedValues::from((0..10, Status::Failed("foo".to_string()))),
584+
);
585+
}
428586
}

hyperactor_mesh/src/v1.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,14 @@ pub mod testactor;
1717
pub mod testing;
1818
pub mod value_mesh;
1919

20+
use std::collections::HashMap;
21+
use std::ops::Range;
2022
use std::str::FromStr;
23+
use std::time::Duration;
2124

2225
pub use actor_mesh::ActorMesh;
2326
pub use actor_mesh::ActorMeshRef;
27+
use enum_as_inner::EnumAsInner;
2428
pub use host_mesh::HostMeshRef;
2529
use hyperactor::ActorId;
2630
use hyperactor::ActorRef;
@@ -33,12 +37,15 @@ use serde::Serialize;
3337
pub use value_mesh::ValueMesh;
3438

3539
use crate::resource;
40+
use crate::resource::RankedValues;
41+
use crate::resource::Status;
42+
use crate::resource::ValuesByRank;
3643
use crate::shortuuid::ShortUuid;
3744
use crate::v1::host_mesh::HostMeshAgent;
3845
use crate::v1::host_mesh::HostMeshRefParseError;
3946

4047
/// Errors that occur during mesh operations.
41-
#[derive(Debug, thiserror::Error)]
48+
#[derive(Debug, EnumAsInner, thiserror::Error)]
4249
pub enum Error {
4350
#[error("invalid mesh ref: expected {expected} ranks, but contains {actual} ranks")]
4451
InvalidRankCardinality { expected: usize, actual: usize },
@@ -96,6 +103,18 @@ pub enum Error {
96103
status: resource::Status,
97104
},
98105

106+
#[error(
107+
"error spawning proc mesh: statuses: {}",
108+
RankedValues::invert(&*.statuses)
109+
)]
110+
ProcSpawnError { statuses: RankedValues<Status> },
111+
112+
#[error(
113+
"error spawning actor mesh: statuses: {}",
114+
RankedValues::invert(&*.statuses)
115+
)]
116+
ActorSpawnError { statuses: RankedValues<Status> },
117+
99118
#[error("error: {0} does not exist")]
100119
NotExist(Name),
101120
}

0 commit comments

Comments
 (0)