diff --git a/hyperactor_mesh/src/v1/proc_mesh.rs b/hyperactor_mesh/src/v1/proc_mesh.rs index 2dbc2f7d2..714ea1a5e 100644 --- a/hyperactor_mesh/src/v1/proc_mesh.rs +++ b/hyperactor_mesh/src/v1/proc_mesh.rs @@ -78,7 +78,10 @@ declare_attrs! { }) pub attr ACTOR_SPAWN_MAX_IDLE: Duration = Duration::from_secs(30); - @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESH_GET_ACTOR_STATE_MAX_IDLE".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_MESH_GET_ACTOR_STATE_MAX_IDLE".to_string()), + py_name: None, + }) pub attr GET_ACTOR_STATE_MAX_IDLE: Duration = Duration::from_secs(30); } diff --git a/hyperactor_mesh/src/v1/value_mesh.rs b/hyperactor_mesh/src/v1/value_mesh.rs index 4c7b49a9c..4c8bdc40f 100644 --- a/hyperactor_mesh/src/v1/value_mesh.rs +++ b/hyperactor_mesh/src/v1/value_mesh.rs @@ -6,8 +6,10 @@ * LICENSE file in the root directory of this source tree. */ +use std::cmp::Ordering; use std::mem; use std::mem::MaybeUninit; +use std::ops::Range; use std::ptr; use std::ptr::NonNull; @@ -16,16 +18,82 @@ use ndslice::view; use ndslice::view::Ranked; use ndslice::view::Region; -/// A mesh of values, where each value is associated with a rank. +/// A mesh of values, one per rank in `region`. /// -/// # Invariant -/// The mesh is *complete*: `ranks.len()` always equals -/// `region.num_ranks()`. Every rank in the region has exactly one -/// associated value. +/// The internal representation (`rep`) may be dense or compressed, +/// but externally the mesh always behaves as a complete mapping from +/// rank index → value. +/// +/// # Invariants +/// - Complete: every rank in `region` has exactly one value. +/// - Order: iteration and indexing follow the region's linearization. #[derive(Clone, Debug, PartialEq, Eq, Hash)] // only if T implements pub struct ValueMesh { + /// The logical multidimensional domain of the mesh. + /// + /// Determines the number of ranks (`region.num_ranks()`) and the + /// order in which they are traversed. region: Region, - ranks: Vec, + + /// The underlying storage representation. + /// + /// - `Rep::Dense` stores a `Vec` with one value per rank. + /// - `Rep::Compressed` stores a run-length encoded table of + /// unique values plus `(Range, u32)` pairs describing + /// contiguous runs of identical values. + /// + /// The representation is an internal optimization detail; all + /// public APIs (e.g. `values()`, `get()`, slicing) behave as if + /// the mesh were dense. + rep: Rep, +} + +/// Internal storage representation for a [`ValueMesh`]. +/// +/// This enum abstracts how the per-rank values are stored. +/// Externally, both variants behave identically — the difference is +/// purely in memory layout and access strategy. +/// +/// - [`Rep::Dense`] stores one value per rank, directly. +/// - [`Rep::Compressed`] stores a compact run-length-encoded form, +/// reusing identical values across contiguous ranks. +/// +/// Users of [`ValueMesh`] normally never interact with `Rep` +/// directly; all iteration and slicing APIs present a dense logical +/// view. +#[derive(Clone, Debug, PartialEq, Eq, Hash)] // only if T implements +enum Rep { + /// Fully expanded representation: one element per rank. + /// + /// The length of `values` is always equal to + /// `region.num_ranks()`. This form is simple and fast for + /// iteration and mutation but uses more memory when large runs of + /// repeated values are present. + Dense { + /// Flat list of values, one per rank in the region's + /// linearization order. + values: Vec, + }, + + /// Run-length-encoded representation. + /// + /// Each run `(Range, id)` indicates that the ranks within + /// `Range` (half-open `[start, end)`) share the same value at + /// `table[id]`. The `table` stores each distinct value once. + /// + /// # Invariants + /// - Runs are non-empty and contiguous (`r.start < r.end`). + /// - Runs collectively cover `0..region.num_ranks()` with no gaps + /// or overlaps. + /// - `id` indexes into `table` (`id < table.len()`). + Compressed { + /// The deduplicated set of unique values referenced by runs. + table: Vec, + + /// List of `(range, table_id)` pairs describing contiguous + /// runs of identical values in region order. + runs: Vec<(Range, u32)>, + }, } impl ValueMesh { @@ -34,7 +102,7 @@ impl ValueMesh { /// /// # Invariants /// This constructor validates that the number of provided values - /// (`ranks.len()`) matches the region’s cardinality + /// (`ranks.len()`) matches the region's cardinality /// (`region.num_ranks()`). A value mesh must be complete: every /// rank in `region` has a corresponding `T`. /// @@ -47,7 +115,10 @@ impl ValueMesh { if actual != expected { return Err(crate::v1::Error::InvalidRankCardinality { expected, actual }); } - Ok(Self { region, ranks }) + Ok(Self { + region, + rep: Rep::Dense { values: ranks }, + }) } /// Constructs a `ValueMesh` without checking cardinality. Caller @@ -55,7 +126,10 @@ impl ValueMesh { #[inline] pub(crate) fn new_unchecked(region: Region, ranks: Vec) -> Self { debug_assert_eq!(region.num_ranks(), ranks.len()); - Self { region, ranks } + Self { + region, + rep: Rep::Dense { values: ranks }, + } } } @@ -63,8 +137,17 @@ impl ValueMesh { /// Await all futures in the mesh, yielding a `ValueMesh` of their /// outputs. pub async fn join(self) -> ValueMesh { - let ValueMesh { region, ranks } = self; - ValueMesh::new_unchecked(region, futures::future::join_all(ranks).await) + let ValueMesh { region, rep } = self; + + match rep { + Rep::Dense { values } => { + let results = futures::future::join_all(values).await; + ValueMesh::new_unchecked(region, results) + } + Rep::Compressed { .. } => { + unreachable!("join() not implemented for compressed meshes") + } + } } } @@ -72,21 +155,72 @@ impl ValueMesh> { /// Transposes a `ValueMesh>` into a /// `Result, E>`. pub fn transpose(self) -> Result, E> { - let ValueMesh { region, ranks } = self; - let ranks = ranks.into_iter().collect::, E>>()?; - Ok(ValueMesh::new_unchecked(region, ranks)) + let ValueMesh { region, rep } = self; + + match rep { + Rep::Dense { values } => { + let values = values.into_iter().collect::, E>>()?; + Ok(ValueMesh::new_unchecked(region, values)) + } + Rep::Compressed { table, runs } => { + let table: Vec = table.into_iter().collect::, E>>()?; + Ok(ValueMesh { + region, + rep: Rep::Compressed { table, runs }, + }) + } + } } } impl view::Ranked for ValueMesh { type Item = T; + /// Returns the region that defines this mesh's shape and rank + /// order. fn region(&self) -> &Region { &self.region } + /// Looks up the value at the given linearized rank. + /// + /// Works transparently for both dense and compressed + /// representations: + /// - In the dense case, it simply indexes into the `values` + /// vector. + /// - In the compressed case, it performs a binary search over run + /// boundaries to find which run contains the given rank, then + /// returns the corresponding entry from `table`. + /// + /// Returns `None` if the rank is out of bounds. fn get(&self, rank: usize) -> Option<&Self::Item> { - self.ranks.get(rank) + if rank >= self.region.num_ranks() { + return None; + } + + match &self.rep { + Rep::Dense { values } => values.get(rank), + + Rep::Compressed { table, runs } => { + // Binary search over runs: find the one whose range + // contains `rank`. + let idx = runs + .binary_search_by(|(r, _)| { + if r.end <= rank { + Ordering::Less + } else if r.start > rank { + Ordering::Greater + } else { + Ordering::Equal + } + }) + .ok()?; + + // Map the run's table ID to its actual value. + let id = runs[idx].1 as usize; + table.get(id) + } + } } } @@ -150,7 +284,7 @@ impl view::BuildFromRegionIndexed for ValueMesh { let mut filled = 0usize; // Drop guard: cleans up initialized elements on early exit. - // Stores raw, non-borrowed pointers (`NonNull`), so we don’t + // Stores raw, non-borrowed pointers (`NonNull`), so we don't // hold Rust references for the whole scope. This allows // mutating `buf`/`bits` inside the loop while still letting // the guard access them if dropped early. @@ -358,6 +492,134 @@ impl view::BuildFromRegionIndexed for ValueMesh { } } +impl ValueMesh { + /// Compresses the mesh in place using run-length encoding (RLE). + /// + /// This method scans the mesh's dense values, coalescing adjacent + /// runs of identical elements into a compact [`Rep::Compressed`] + /// representation. It replaces the internal storage (`rep`) with + /// the compressed form. + /// + /// # Behavior + /// - If the mesh is already compressed, this is a **no-op**. + /// - If the mesh is dense, it consumes the current `Vec` and + /// rebuilds the representation as a run table plus value table. + /// - Only *adjacent* equal values are merged; non-contiguous + /// duplicates remain distinct. + /// + /// # Requirements + /// - `T` must implement [`PartialEq`] (to detect equal values). + /// - `T` must implement [`Clone`] (to populate the table of + /// unique values). + /// + /// This operation is lossless: expanding the compressed mesh back + /// into a dense vector yields the same sequence of values. + pub fn compress_adjacent_in_place(&mut self) { + self.compress_adjacent_in_place_by(|a, b| a == b) + } +} + +impl ValueMesh { + /// Compresses the mesh in place using a custom equivalence + /// predicate. + /// + /// This is a generalized form of [`compress_adjacent_in_place`] + /// that merges adjacent values according to an arbitrary + /// predicate `same(a, b)`, rather than relying on `PartialEq`. + /// + /// # Behavior + /// - If the mesh is already compressed, this is a **no-op**. + /// - Otherwise, consumes the dense `Vec` and replaces it with + /// a run-length encoded (`Rep::Compressed`) representation, + /// where consecutive elements satisfying `same(a, b)` are + /// coalesced into a single run. + /// + /// # Requirements + /// - `T` must implement [`Clone`] (to populate the table of + /// unique values). + /// - The predicate must be reflexive and symmetric for + /// correctness. + /// + /// This operation is lossless: expanding the compressed mesh + /// reproduces the original sequence exactly under the same + /// equivalence. + pub fn compress_adjacent_in_place_by(&mut self, same: F) + where + F: FnMut(&T, &T) -> bool, + { + let values = match &mut self.rep { + Rep::Dense { values } => std::mem::take(values), + Rep::Compressed { .. } => return, + }; + let (table, runs) = compress_adjacent_with(values, same); + self.rep = Rep::Compressed { table, runs }; + } +} + +/// Performs simple run-length encoding (RLE) compression over a dense +/// sequence of values. +/// +/// Adjacent "equal" elements are coalesced into contiguous runs, +/// producing: +/// +/// - a **table** of unique values (in first-occurrence order) +/// - a **run list** of `(range, id)` pairs, where `range` is the +/// half-open index range `[start, end)` in the original dense +/// array, and `id` indexes into `table`. +/// +/// # Example +/// ``` +/// // Input: [A, A, B, B, B, A] +/// // Output: +/// // table = [A, B, A] +/// // runs = [(0..2, 0), (2..5, 1), (5..6, 2)] +/// ``` +/// +/// # Requirements +/// - `T: Clone` is required to copy elements into the table. +/// +/// # Returns +/// A tuple `(table, runs)` that together form the compressed +/// representation. Expanding the runs reproduces the original data. +fn compress_adjacent_with( + values: Vec, + mut same: F, +) -> (Vec, Vec<(Range, u32)>) +where + F: FnMut(&T, &T) -> bool, +{ + // Empty input; trivial empty compression. + if values.is_empty() { + return (Vec::new(), Vec::new()); + } + + let mut table = Vec::new(); // unique values + let mut runs = Vec::new(); // (range, table_id) pairs + + let mut start = 0usize; + table.push(values[0].clone()); + let mut cur_id: u32 = 0; + + // Walk through all subsequent elements, closing and opening runs + // whenever the value changes. + for (i, _value) in values.iter().enumerate().skip(1) { + if !same(&values[i], &table[cur_id as usize]) { + // Close current run [start, i) + runs.push((start..i, cur_id)); + + // Start a new run + start = i; + table.push(values[i].clone()); + cur_id = (table.len() - 1) as u32; + } + } + + // Close the final run + runs.push((start..values.len(), cur_id)); + + (table, runs) +} + #[cfg(test)] mod tests { use std::convert::Infallible; @@ -378,6 +640,7 @@ mod tests { use ndslice::view::CollectMeshExt; use ndslice::view::MapIntoExt; use ndslice::view::Ranked; + use ndslice::view::RankedSliceable; use ndslice::view::ViewExt; use proptest::prelude::*; use proptest::strategy::ValueTree; @@ -818,7 +1081,10 @@ mod tests { let doubled: ValueMesh<_> = vm.map_into(|x| x * 2); assert_eq!(doubled.region, region); - assert_eq!(doubled.ranks, vec![0, 2, 4, 6, 8, 10]); + assert_eq!( + doubled.values().collect::>(), + vec![0, 2, 4, 6, 8, 10] + ); } #[test] @@ -831,7 +1097,7 @@ mod tests { let lens: ValueMesh<_> = vm.map_into(|s| s.len()); assert_eq!(lens.region, region); - assert_eq!(lens.ranks, vec![1, 1, 1, 1]); + assert_eq!(lens.values().collect::>(), vec![1, 1, 1, 1]); } #[test] @@ -900,7 +1166,7 @@ mod tests { assert_eq!(pending.region, region); // Drive the ready futures without a runtime and collect results. - let results: Vec<_> = pending.ranks.into_iter().map(poll_now).collect(); + let results: Vec<_> = pending.values().map(|f| poll_now(f.clone())).collect(); assert_eq!(results, vec![11, 21, 31, 41]); } @@ -911,7 +1177,7 @@ mod tests { let out: ValueMesh<_> = vm.map_into(|x| x * x); assert_eq!(out.region, region); - assert_eq!(out.ranks, vec![49]); + assert_eq!(out.values().collect::>(), vec![49]); } #[test] @@ -929,4 +1195,105 @@ mod tests { assert_eq!(projected.values().collect::>(), vec![10, 20, 30]); assert_eq!(projected.region(), ®ion); } + + #[test] + fn rle_roundtrip_all_equal() { + let region: Region = extent!(n = 6).into(); + let mut vm = ValueMesh::new_unchecked(region.clone(), vec![42; 6]); + + // Compress and ensure logical equality preserved. + vm.compress_adjacent_in_place(); + let collected: Vec<_> = vm.values().collect(); + assert_eq!(collected, vec![42, 42, 42, 42, 42, 42]); + + // Random access still works. + for i in 0..region.num_ranks() { + assert_eq!(vm.get(i), Some(&42)); + } + assert_eq!(vm.get(region.num_ranks()), None); // out-of-bounds + } + + #[test] + fn rle_roundtrip_alternating() { + let region: Region = extent!(n = 6).into(); + let original = vec![1, 2, 1, 2, 1, 2]; + let mut vm = ValueMesh::new_unchecked(region.clone(), original.clone()); + + vm.compress_adjacent_in_place(); + let collected: Vec<_> = vm.values().collect(); + assert_eq!(collected, original); + + // Spot-check random access after compression. + assert_eq!(vm.get(0), Some(&1)); + assert_eq!(vm.get(1), Some(&2)); + assert_eq!(vm.get(3), Some(&2)); + assert_eq!(vm.get(5), Some(&2)); + } + + #[test] + fn rle_roundtrip_blocky_and_slice() { + // Blocks: 0,0,0 | 1,1 | 2,2,2,2 | 3 + let region: Region = extent!(n = 10).into(); + let original = vec![0, 0, 0, 1, 1, 2, 2, 2, 2, 3]; + let mut vm = ValueMesh::new_unchecked(region.clone(), original.clone()); + + vm.compress_adjacent_in_place(); + let collected: Vec<_> = vm.values().collect(); + assert_eq!(collected, original); + + // Slice a middle subregion [3..8) → [1,1,2,2,2] + let sub_region = region.range("n", 3..8).unwrap(); + let sliced = vm.sliced(sub_region); + let sliced_vec: Vec<_> = sliced.values().collect(); + assert_eq!(sliced_vec, vec![1, 1, 2, 2, 2]); + } + + #[test] + fn rle_idempotent_noop_on_second_call() { + let region: Region = extent!(n = 7).into(); + let original = vec![9, 9, 9, 8, 8, 9, 9]; + let mut vm = ValueMesh::new_unchecked(region.clone(), original.clone()); + + vm.compress_adjacent_in_place(); + let once: Vec<_> = vm.values().collect(); + assert_eq!(once, original); + + // Calling again should be a no-op and still yield identical + // values. + vm.compress_adjacent_in_place(); + let twice: Vec<_> = vm.values().collect(); + assert_eq!(twice, original); + } + + #[test] + fn rle_works_after_build_indexed() { + // Build with shuffled pairs, then compress and verify + // semantics. + let region: Region = extent!(x = 2, y = 3).into(); // 6 + let pairs = vec![(3, 30), (0, 0), (5, 50), (2, 20), (1, 10), (4, 40)]; + let mut vm = pairs + .into_iter() + .collect_indexed::>(region.clone()) + .unwrap(); + + // Should compress to 6 runs of length 1; still must + // round-trip. + vm.compress_adjacent_in_place(); + let collected: Vec<_> = vm.values().collect(); + assert_eq!(collected, vec![0, 10, 20, 30, 40, 50]); + // Spot-check get() + assert_eq!(vm.get(4), Some(&40)); + } + + #[test] + fn rle_handles_singleton_mesh() { + let region: Region = extent!(n = 1).into(); + let mut vm = ValueMesh::new_unchecked(region.clone(), vec![123]); + + vm.compress_adjacent_in_place(); + let collected: Vec<_> = vm.values().collect(); + assert_eq!(collected, vec![123]); + assert_eq!(vm.get(0), Some(&123)); + assert_eq!(vm.get(1), None); + } } diff --git a/monarch_hyperactor/src/value_mesh.rs b/monarch_hyperactor/src/value_mesh.rs index d518064b1..64d9fd9ac 100644 --- a/monarch_hyperactor/src/value_mesh.rs +++ b/monarch_hyperactor/src/value_mesh.rs @@ -39,8 +39,17 @@ impl PyValueMesh { let vals: Vec> = values.extract()?; // Build & validate cardinality against region. - let inner = > as BuildFromRegion>>::build_dense(region, vals) - .map_err(|e| PyValueError::new_err(e.to_string()))?; + let mut inner = + > as BuildFromRegion>>::build_dense(region, vals) + .map_err(|e| PyValueError::new_err(e.to_string()))?; + + // Coalesce adjacent identical Python objects (same pointer + // identity). For Py, we treat equality as object + // identity: consecutive references to the *same* object + // pointer are merged into RLE runs. This tends to compress + // sentinel/categorical/boolean data, but not freshly + // allocated numerics/strings. + inner.compress_adjacent_in_place_by(|a, b| a.as_ptr() == b.as_ptr()); Ok(Self { inner }) } @@ -71,6 +80,7 @@ impl PyValueMesh { // Py. `unwrap` is safe because the bounds have been // checked. let v: Py = self.inner.get(rank).unwrap().clone(); + Ok(v) } @@ -84,9 +94,19 @@ impl PyValueMesh { // Preserve the shape's original Slice (offset/strides). let s = shape.get_inner(); let region = Region::new(s.labels().to_vec(), s.slice().clone()); - let inner = > as ndslice::view::BuildFromRegionIndexed>> - ::build_indexed(region, pairs) - .map_err(|e| PyValueError::new_err(e.to_string()))?; + let mut inner = > as ndslice::view::BuildFromRegionIndexed< + Py, + >>::build_indexed(region, pairs) + .map_err(|e| PyValueError::new_err(e.to_string()))?; + + // Coalesce adjacent identical Python objects (same pointer + // identity). For Py, we treat equality as object + // identity: consecutive references to the *same* object + // pointer are merged into RLE runs. This tends to compress + // sentinel/categorical/boolean data, but not freshly + // allocated numerics/strings. + inner.compress_adjacent_in_place_by(|a, b| a.as_ptr() == b.as_ptr()); + Ok(Self { inner }) } }