Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hyperactor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ pub mod supervision;
pub mod sync;
/// Test utilities
pub mod test_utils;
mod time;
pub mod time;

pub use actor::Actor;
pub use actor::ActorHandle;
Expand Down
1 change: 1 addition & 0 deletions hyperactor/src/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/

//! This module contains various utilities for dealing with time.
//! (This probably belongs in a separate crate.)

use std::sync::Arc;
use std::sync::Mutex;
Expand Down
182 changes: 180 additions & 2 deletions hyperactor_mesh/src/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,17 @@
//! in hyperactor meshes.

use core::slice::GetDisjointMutIndex as _;
use std::collections::HashMap;
use std::fmt;
use std::fmt::Debug;
use std::hash::Hash;
use std::marker::PhantomData;
use std::mem::replace;
use std::mem::take;
use std::ops::Deref;
use std::ops::DerefMut;
use std::ops::Range;
use std::time::Duration;

use enum_as_inner::EnumAsInner;
use hyperactor::Bind;
Expand All @@ -30,6 +35,8 @@ use hyperactor::accum::Accumulator;
use hyperactor::accum::CommReducer;
use hyperactor::accum::ReducerFactory;
use hyperactor::accum::ReducerSpec;
use hyperactor::mailbox::MailboxError;
use hyperactor::mailbox::PortReceiver;
use hyperactor::message::Bind;
use hyperactor::message::Bindings;
use hyperactor::message::Unbind;
Expand All @@ -50,7 +57,8 @@ use crate::v1::Name;
PartialEq,
Eq,
Hash,
EnumAsInner
EnumAsInner,
strum::Display
)]
pub enum Status {
/// The resource does not exist.
Expand All @@ -65,12 +73,17 @@ pub enum Status {
Stopped,
/// The resource has failed, with an error message.
Failed(String),
/// The resource has been declared failed after a timeout.
Timeout(Duration),
}

impl Status {
/// Returns whether the status is a terminating status.
pub fn is_terminating(&self) -> bool {
matches!(self, Status::Stopping | Status::Stopped | Status::Failed(_))
matches!(
self,
Status::Stopping | Status::Stopped | Status::Failed(_) | Status::Timeout(_)
)
}
}

Expand Down Expand Up @@ -128,6 +141,34 @@ pub struct GetRankStatus {
pub reply: PortRef<RankedValues<Status>>,
}

impl GetRankStatus {
pub async fn wait(
mut rx: PortReceiver<RankedValues<Status>>,
num_ranks: usize,
max_idle_time: Duration,
) -> Result<RankedValues<Status>, RankedValues<Status>> {
let mut alarm = hyperactor::time::Alarm::new();
alarm.arm(max_idle_time);
let mut statuses = RankedValues::default();
loop {
let mut sleeper = alarm.sleeper();
tokio::select! {
_ = sleeper.sleep() => return Err(statuses),
new_statuses = rx.recv() => {
match new_statuses {
Ok(new_statuses) => statuses = new_statuses,
Err(_) => return Err(statuses),
}
}
}
alarm.arm(max_idle_time);
if statuses.rank(num_ranks) == num_ranks {
break Ok(statuses);
}
}
}
}

/// The state of a resource.
#[derive(Clone, Debug, Serialize, Deserialize, Named, PartialEq, Eq)]
pub struct State<S> {
Expand Down Expand Up @@ -214,6 +255,14 @@ pub struct RankedValues<T> {
intervals: Vec<(Range<usize>, T)>,
}

impl<T: PartialEq> PartialEq for RankedValues<T> {
fn eq(&self, other: &Self) -> bool {
self.intervals == other.intervals
}
}

impl<T: Eq> Eq for RankedValues<T> {}

impl<T> Default for RankedValues<T> {
fn default() -> Self {
Self {
Expand All @@ -238,6 +287,28 @@ impl<T> RankedValues<T> {
}
}

impl<T: Clone> RankedValues<T> {
pub fn materialized_iter(&self, until: usize) -> impl Iterator<Item = &T> + '_ {
assert_eq!(self.rank(until), until, "insufficient rank");
self.iter()
.flat_map(|(range, value)| std::iter::repeat(value).take(range.end - range.start))
}
}

impl<T: Hash + Eq + Clone> RankedValues<T> {
/// Invert this ranked values into a [`ValuesByRank<T>`].
pub fn invert(&self) -> ValuesByRank<T> {
let mut inverted: HashMap<T, Vec<Range<usize>>> = HashMap::new();
for (range, value) in self.iter() {
inverted
.entry(value.clone())
.or_default()
.push(range.clone());
}
ValuesByRank { values: inverted }
}
}

impl<T: Eq + Clone> RankedValues<T> {
/// Merge `other` into this set of ranked values. Values in `other` that overlap
/// with `self` take prededence.
Expand Down Expand Up @@ -298,6 +369,11 @@ impl<T: Eq + Clone> RankedValues<T> {
}
}

/// Merge the contents of this RankedValues into another RankedValues.
pub fn merge_into(self, other: &mut Self) {
other.merge_from(self);
}

fn append(&mut self, range: Range<usize>, value: T) {
if let Some(last) = self.intervals.last_mut()
&& last.0.end == range.start
Expand All @@ -310,6 +386,15 @@ impl<T: Eq + Clone> RankedValues<T> {
}
}

impl RankedValues<Status> {
pub fn first_terminating(&self) -> Option<(usize, Status)> {
self.intervals
.iter()
.find(|(_, status)| status.is_terminating())
.map(|(range, status)| (range.start, status.clone()))
}
}

impl<T> From<(usize, T)> for RankedValues<T> {
fn from((rank, value): (usize, T)) -> Self {
Self {
Expand All @@ -318,6 +403,67 @@ impl<T> From<(usize, T)> for RankedValues<T> {
}
}

impl<T> From<(Range<usize>, T)> for RankedValues<T> {
fn from((range, value): (Range<usize>, T)) -> Self {
Self {
intervals: vec![(range, value)],
}
}
}

/// An inverted index of RankedValues, providing all ranks for
/// which each unique T-typed value appears.
#[derive(Clone, Debug)]
pub struct ValuesByRank<T> {
values: HashMap<T, Vec<Range<usize>>>,
}

impl<T: Eq + Hash> PartialEq for ValuesByRank<T> {
fn eq(&self, other: &Self) -> bool {
self.values == other.values
}
}

impl<T: Eq + Hash> Eq for ValuesByRank<T> {}

impl<T: fmt::Display> fmt::Display for ValuesByRank<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut first_value = true;
for (value, ranges) in self.iter() {
if first_value {
first_value = false;
} else {
write!(f, ";")?;
}
write!(f, "{}=", value)?;
let mut first_range = true;
for range in ranges.iter() {
if first_range {
first_range = false;
} else {
write!(f, ",")?;
}
write!(f, "{}..{}", range.start, range.end)?;
}
}
Ok(())
}
}

impl<T> Deref for ValuesByRank<T> {
type Target = HashMap<T, Vec<Range<usize>>>;

fn deref(&self) -> &Self::Target {
&self.values
}
}

impl<T> DerefMut for ValuesByRank<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.values
}
}

/// Enabled for test only because we have to guarantee that the input
/// iterator is well-formed.
#[cfg(test)]
Expand Down Expand Up @@ -425,4 +571,36 @@ mod tests {
assert_eq!(left.rank(70), 62);
assert_eq!(left.rank(100), 62);
}

#[test]
fn test_equality() {
assert_eq!(
RankedValues::from((0..10, 123)),
RankedValues::from((0..10, 123))
);
assert_eq!(
RankedValues::from((0..10, Status::Failed("foo".to_string()))),
RankedValues::from((0..10, Status::Failed("foo".to_string()))),
);
}

#[test]
fn test_default_through_merging() {
let values: RankedValues<usize> =
[(0..10, 1), (15..20, 1), (30..50, 1)].into_iter().collect();

let mut default = RankedValues::from((0..50, 0));
default.merge_from(values);

assert_eq!(
default.iter().cloned().collect::<Vec<_>>(),
vec![
(0..10, 1),
(10..15, 0),
(15..20, 1),
(20..30, 0),
(30..50, 1)
]
);
}
}
21 changes: 20 additions & 1 deletion hyperactor_mesh/src/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ pub mod testactor;
pub mod testing;
pub mod value_mesh;

use std::collections::HashMap;
use std::ops::Range;
use std::str::FromStr;
use std::time::Duration;

pub use actor_mesh::ActorMesh;
pub use actor_mesh::ActorMeshRef;
use enum_as_inner::EnumAsInner;
pub use host_mesh::HostMeshRef;
use hyperactor::ActorId;
use hyperactor::ActorRef;
Expand All @@ -33,12 +37,15 @@ use serde::Serialize;
pub use value_mesh::ValueMesh;

use crate::resource;
use crate::resource::RankedValues;
use crate::resource::Status;
use crate::resource::ValuesByRank;
use crate::shortuuid::ShortUuid;
use crate::v1::host_mesh::HostMeshAgent;
use crate::v1::host_mesh::HostMeshRefParseError;

/// Errors that occur during mesh operations.
#[derive(Debug, thiserror::Error)]
#[derive(Debug, EnumAsInner, thiserror::Error)]
pub enum Error {
#[error("invalid mesh ref: expected {expected} ranks, but contains {actual} ranks")]
InvalidRankCardinality { expected: usize, actual: usize },
Expand Down Expand Up @@ -96,6 +103,18 @@ pub enum Error {
status: resource::Status,
},

#[error(
"error spawning proc mesh: statuses: {}",
RankedValues::invert(&*.statuses)
)]
ProcSpawnError { statuses: RankedValues<Status> },

#[error(
"error spawning actor mesh: statuses: {}",
RankedValues::invert(&*.statuses)
)]
ActorSpawnError { statuses: RankedValues<Status> },

#[error("error: {0} does not exist")]
NotExist(Name),
}
Expand Down
Loading