diff --git a/hyperactor_mesh/src/v1/proc_mesh.rs b/hyperactor_mesh/src/v1/proc_mesh.rs index 2dbc2f7d2..714ea1a5e 100644 --- a/hyperactor_mesh/src/v1/proc_mesh.rs +++ b/hyperactor_mesh/src/v1/proc_mesh.rs @@ -78,7 +78,10 @@ declare_attrs! { }) pub attr ACTOR_SPAWN_MAX_IDLE: Duration = Duration::from_secs(30); - @meta(CONFIG_ENV_VAR = "HYPERACTOR_MESH_GET_ACTOR_STATE_MAX_IDLE".to_string()) + @meta(CONFIG = ConfigAttr { + env_name: Some("HYPERACTOR_MESH_GET_ACTOR_STATE_MAX_IDLE".to_string()), + py_name: None, + }) pub attr GET_ACTOR_STATE_MAX_IDLE: Duration = Duration::from_secs(30); } diff --git a/hyperactor_mesh/src/v1/value_mesh.rs b/hyperactor_mesh/src/v1/value_mesh.rs index 4c7b49a9c..37a778635 100644 --- a/hyperactor_mesh/src/v1/value_mesh.rs +++ b/hyperactor_mesh/src/v1/value_mesh.rs @@ -6,8 +6,10 @@ * LICENSE file in the root directory of this source tree. */ +use std::cmp::Ordering; use std::mem; use std::mem::MaybeUninit; +use std::ops::Range; use std::ptr; use std::ptr::NonNull; @@ -15,17 +17,138 @@ use futures::Future; use ndslice::view; use ndslice::view::Ranked; use ndslice::view::Region; +use serde::Deserialize; +use serde::Serialize; -/// A mesh of values, where each value is associated with a rank. +/// A mesh of values, one per rank in `region`. /// -/// # Invariant -/// The mesh is *complete*: `ranks.len()` always equals -/// `region.num_ranks()`. Every rank in the region has exactly one -/// associated value. -#[derive(Clone, Debug, PartialEq, Eq, Hash)] // only if T implements +/// The internal representation (`rep`) may be dense or compressed, +/// but externally the mesh always behaves as a complete mapping from +/// rank index → value. +/// +/// # Invariants +/// - Complete: every rank in `region` has exactly one value. +/// - Order: iteration and indexing follow the region's linearization. +#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] // only if T implements pub struct ValueMesh { + /// The logical multidimensional domain of the mesh. + /// + /// Determines the number of ranks (`region.num_ranks()`) and the + /// order in which they are traversed. region: Region, - ranks: Vec, + + /// The underlying storage representation. + /// + /// - `Rep::Dense` stores a `Vec` with one value per rank. + /// - `Rep::Compressed` stores a run-length encoded table of + /// unique values plus `(Range, u32)` pairs describing + /// contiguous runs of identical values. + /// + /// The representation is an internal optimization detail; all + /// public APIs (e.g. `values()`, `get()`, slicing) behave as if + /// the mesh were dense. + rep: Rep, +} + +/// A single run-length–encoded (RLE) segment within a [`ValueMesh`]. +/// +/// Each `Run` represents a contiguous range of ranks `[start, end)` +/// that all share the same value, referenced indirectly via a table +/// index `id`. This allows compact storage of large regions with +/// repeated values. +/// +/// Runs are serialized in a stable, portable format using `u64` for +/// range bounds (`start`, `end`) to avoid platform‐dependent `usize` +/// encoding differences. +#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] +struct Run { + /// Inclusive start of the contiguous range of ranks (0-based). + start: u64, + /// Exclusive end of the contiguous range of ranks (0-based). + end: u64, + /// Index into the value table for this run's shared value. + id: u32, +} + +impl Run { + /// Creates a new `Run` covering ranks `[start, end)` that all + /// share the same table entry `id`. + /// + /// Converts `usize` bounds to `u64` for stable serialization. + fn new(start: usize, end: usize, id: u32) -> Self { + Self { + start: start as u64, + end: end as u64, + id, + } + } +} + +impl TryFrom for (Range, u32) { + type Error = &'static str; + + /// Converts a serialized [`Run`] back into its in-memory form + /// `(Range, u32)`. + /// + /// Performs checked conversion of the 64-bit wire fields back + /// into `usize` indices, returning an error if either bound + /// exceeds the platform’s addressable range. This ensures safe + /// round-tripping between the serialized wire format and native + /// representation. + fn try_from(r: Run) -> Result { + let start = usize::try_from(r.start).map_err(|_| "run.start too large")?; + let end = usize::try_from(r.end).map_err(|_| "run.end too large")?; + Ok((start..end, r.id)) + } +} + +/// Internal storage representation for a [`ValueMesh`]. +/// +/// This enum abstracts how the per-rank values are stored. +/// Externally, both variants behave identically — the difference is +/// purely in memory layout and access strategy. +/// +/// - [`Rep::Dense`] stores one value per rank, directly. +/// - [`Rep::Compressed`] stores a compact run-length-encoded form, +/// reusing identical values across contiguous ranks. +/// +/// Users of [`ValueMesh`] normally never interact with `Rep` +/// directly; all iteration and slicing APIs present a dense logical +/// view. +#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] // only if T implements +#[serde(tag = "rep", rename_all = "snake_case")] +enum Rep { + /// Fully expanded representation: one element per rank. + /// + /// The length of `values` is always equal to + /// `region.num_ranks()`. This form is simple and fast for + /// iteration and mutation but uses more memory when large runs of + /// repeated values are present. + Dense { + /// Flat list of values, one per rank in the region's + /// linearization order. + values: Vec, + }, + + /// Run-length-encoded representation. + /// + /// Each run `(Range, id)` indicates that the ranks within + /// `Range` (half-open `[start, end)`) share the same value at + /// `table[id]`. The `table` stores each distinct value once. + /// + /// # Invariants + /// - Runs are non-empty and contiguous (`r.start < r.end`). + /// - Runs collectively cover `0..region.num_ranks()` with no gaps + /// or overlaps. + /// - `id` indexes into `table` (`id < table.len()`). + Compressed { + /// The deduplicated set of unique values referenced by runs. + table: Vec, + + /// List of `(range, table_id)` pairs describing contiguous + /// runs of identical values in region order. + runs: Vec, + }, } impl ValueMesh { @@ -34,7 +157,7 @@ impl ValueMesh { /// /// # Invariants /// This constructor validates that the number of provided values - /// (`ranks.len()`) matches the region’s cardinality + /// (`ranks.len()`) matches the region's cardinality /// (`region.num_ranks()`). A value mesh must be complete: every /// rank in `region` has a corresponding `T`. /// @@ -47,7 +170,10 @@ impl ValueMesh { if actual != expected { return Err(crate::v1::Error::InvalidRankCardinality { expected, actual }); } - Ok(Self { region, ranks }) + Ok(Self { + region, + rep: Rep::Dense { values: ranks }, + }) } /// Constructs a `ValueMesh` without checking cardinality. Caller @@ -55,7 +181,180 @@ impl ValueMesh { #[inline] pub(crate) fn new_unchecked(region: Region, ranks: Vec) -> Self { debug_assert_eq!(region.num_ranks(), ranks.len()); - Self { region, ranks } + Self { + region, + rep: Rep::Dense { values: ranks }, + } + } +} + +impl ValueMesh { + /// Builds a `ValueMesh` that assigns the single value `s` to + /// every rank in `region`, without materializing a dense + /// `Vec`. The result is stored in compressed (RLE) form as a + /// single run `[0..N)`. + /// + /// If `region.num_ranks() == 0`, the mesh contains no runs (and + /// an empty table), regardless of `s`. + pub fn from_single(region: Region, s: T) -> Self { + let n = region.num_ranks(); + if n == 0 { + return Self { + region, + rep: Rep::Compressed { + table: Vec::new(), + runs: Vec::new(), + }, + }; + } + + let table = vec![s]; + let runs = vec![Run::new(0, n, 0)]; + Self { + region, + rep: Rep::Compressed { table, runs }, + } + } +} + +impl ValueMesh { + /// Builds a [`ValueMesh`] covering the region, filled with + /// `T::default()`. + /// + /// Equivalent to [`ValueMesh::from_single(region, + /// T::default())`]. + pub fn from_default(region: Region) -> Self { + ValueMesh::::from_single(region, T::default()) + } +} + +impl ValueMesh { + /// Builds a compressed mesh from a default value and a set of + /// disjoint ranges that override the default. + /// + /// - `ranges` may be in any order; they must be non-empty, + /// in-bounds, and non-overlapping. + /// - Unspecified ranks are filled with `default`. + /// - Result is stored in RLE form; no dense `Vec` is + /// materialized. + pub fn from_ranges_with_default( + region: Region, + default: T, + mut ranges: Vec<(Range, T)>, + ) -> crate::v1::Result { + let n = region.num_ranks(); + + if n == 0 { + return Ok(Self { + region, + rep: Rep::Compressed { + table: Vec::new(), + runs: Vec::new(), + }, + }); + } + + // Validate: non-empty, in-bounds; then sort. + for (r, _) in &ranges { + if r.is_empty() { + return Err(crate::v1::Error::InvalidRankCardinality { + expected: n, + actual: 0, + }); // TODO: this surfaces the error but its not a great fit + } + if r.end > n { + return Err(crate::v1::Error::InvalidRankCardinality { + expected: n, + actual: r.end, + }); + } + } + ranges.sort_by_key(|(r, _)| (r.start, r.end)); + + // Validate: non-overlapping. + for w in ranges.windows(2) { + let (a, _) = &w[0]; + let (b, _) = &w[1]; + if a.end > b.start { + // Overlap + return Err(crate::v1::Error::InvalidRankCardinality { + expected: n, + actual: b.start, // TODO: this surfaces the error but is a bad fit + }); + } + } + + // Build table; keep it small by reusing equal values. + fn get_or_insert_id(table: &mut Vec, v: &T) -> u32 { + if let Some(idx) = table.iter().position(|x| x == v) { + idx as u32 + } else { + table.push(v.clone()); + (table.len() - 1) as u32 + } + } + + let mut table: Vec = Vec::with_capacity(1 + ranges.len()); + let default_id = get_or_insert_id(&mut table, &default); + + let mut runs: Vec = Vec::with_capacity(1 + 2 * ranges.len()); + let mut cursor = 0usize; + + for (r, v) in ranges.into_iter() { + // Fill default gap if any. + if cursor < r.start { + runs.push(Run::new(cursor, r.start, default_id)); + } + // Override block. + let id = get_or_insert_id(&mut table, &v); + runs.push(Run::new(r.start, r.end, id)); + cursor = r.end; + } + + // Trailing default tail. + if cursor < n { + runs.push(Run::new(cursor, n, default_id)); + } + + Ok(Self { + region, + rep: Rep::Compressed { table, runs }, + }) + } + + /// Builds a [`ValueMesh`] from a fully materialized dense vector + /// of per-rank values, then compresses it into run-length–encoded + /// form if possible. + /// + /// This constructor is intended for callers that already have one + /// value per rank (e.g. computed or received data) but wish to + /// store it efficiently. + /// + /// # Parameters + /// - `region`: The logical region describing the mesh’s shape and + /// rank order. + /// - `values`: A dense vector of values, one per rank in + /// `region`. + /// + /// # Returns + /// A [`ValueMesh`] whose internal representation is `Compressed` + /// if any adjacent elements are equal, or `Dense` if no + /// compression was possible. + /// + /// # Errors + /// Returns an error if the number of provided `values` does not + /// match the number of ranks in `region`. + /// + /// # Examples + /// ```ignore + /// let region: Region = extent!(n = 5).into(); + /// let mesh = ValueMesh::from_dense(region, vec![1, 1, 2, 2, 3]).unwrap(); + /// // Internally compressed to three runs: [1, 1], [2, 2], [3] + /// ``` + pub fn from_dense(region: Region, values: Vec) -> crate::v1::Result { + let mut vm = Self::new(region, values)?; + vm.compress_adjacent_in_place(); + Ok(vm) } } @@ -63,8 +362,17 @@ impl ValueMesh { /// Await all futures in the mesh, yielding a `ValueMesh` of their /// outputs. pub async fn join(self) -> ValueMesh { - let ValueMesh { region, ranks } = self; - ValueMesh::new_unchecked(region, futures::future::join_all(ranks).await) + let ValueMesh { region, rep } = self; + + match rep { + Rep::Dense { values } => { + let results = futures::future::join_all(values).await; + ValueMesh::new_unchecked(region, results) + } + Rep::Compressed { .. } => { + unreachable!("join() not implemented for compressed meshes") + } + } } } @@ -72,21 +380,74 @@ impl ValueMesh> { /// Transposes a `ValueMesh>` into a /// `Result, E>`. pub fn transpose(self) -> Result, E> { - let ValueMesh { region, ranks } = self; - let ranks = ranks.into_iter().collect::, E>>()?; - Ok(ValueMesh::new_unchecked(region, ranks)) + let ValueMesh { region, rep } = self; + + match rep { + Rep::Dense { values } => { + let values = values.into_iter().collect::, E>>()?; + Ok(ValueMesh::new_unchecked(region, values)) + } + Rep::Compressed { table, runs } => { + let table: Vec = table.into_iter().collect::, E>>()?; + Ok(ValueMesh { + region, + rep: Rep::Compressed { table, runs }, + }) + } + } } } impl view::Ranked for ValueMesh { type Item = T; + /// Returns the region that defines this mesh's shape and rank + /// order. fn region(&self) -> &Region { &self.region } + /// Looks up the value at the given linearized rank. + /// + /// Works transparently for both dense and compressed + /// representations: + /// - In the dense case, it simply indexes into the `values` + /// vector. + /// - In the compressed case, it performs a binary search over run + /// boundaries to find which run contains the given rank, then + /// returns the corresponding entry from `table`. + /// + /// Returns `None` if the rank is out of bounds. fn get(&self, rank: usize) -> Option<&Self::Item> { - self.ranks.get(rank) + if rank >= self.region.num_ranks() { + return None; + } + + match &self.rep { + Rep::Dense { values } => values.get(rank), + + Rep::Compressed { table, runs } => { + let rank = rank as u64; + + // Binary search over runs: find the one whose range + // contains `rank`. + let idx = runs + .binary_search_by(|run| { + if run.end <= rank { + Ordering::Less + } else if run.start > rank { + Ordering::Greater + } else { + Ordering::Equal + } + }) + .ok()?; + + // Map the run's table ID to its actual value. + let id = runs[idx].id as usize; + table.get(id) + } + } } } @@ -150,7 +511,7 @@ impl view::BuildFromRegionIndexed for ValueMesh { let mut filled = 0usize; // Drop guard: cleans up initialized elements on early exit. - // Stores raw, non-borrowed pointers (`NonNull`), so we don’t + // Stores raw, non-borrowed pointers (`NonNull`), so we don't // hold Rust references for the whole scope. This allows // mutating `buf`/`bits` inside the loop while still letting // the guard access them if dropped early. @@ -358,6 +719,131 @@ impl view::BuildFromRegionIndexed for ValueMesh { } } +impl ValueMesh { + /// Compresses the mesh in place using run-length encoding (RLE). + /// + /// This method scans the mesh's dense values, coalescing adjacent + /// runs of identical elements into a compact [`Rep::Compressed`] + /// representation. It replaces the internal storage (`rep`) with + /// the compressed form. + /// + /// # Behavior + /// - If the mesh is already compressed, this is a **no-op**. + /// - If the mesh is dense, it consumes the current `Vec` and + /// rebuilds the representation as a run table plus value table. + /// - Only *adjacent* equal values are merged; non-contiguous + /// duplicates remain distinct. + /// + /// # Requirements + /// - `T` must implement [`PartialEq`] (to detect equal values). + /// - `T` must implement [`Clone`] (to populate the table of + /// unique values). + /// + /// This operation is lossless: expanding the compressed mesh back + /// into a dense vector yields the same sequence of values. + pub fn compress_adjacent_in_place(&mut self) { + self.compress_adjacent_in_place_by(|a, b| a == b) + } +} + +impl ValueMesh { + /// Compresses the mesh in place using a custom equivalence + /// predicate. + /// + /// This is a generalized form of [`compress_adjacent_in_place`] + /// that merges adjacent values according to an arbitrary + /// predicate `same(a, b)`, rather than relying on `PartialEq`. + /// + /// # Behavior + /// - If the mesh is already compressed, this is a **no-op**. + /// - Otherwise, consumes the dense `Vec` and replaces it with + /// a run-length encoded (`Rep::Compressed`) representation, + /// where consecutive elements satisfying `same(a, b)` are + /// coalesced into a single run. + /// + /// # Requirements + /// - `T` must implement [`Clone`] (to populate the table of + /// unique values). + /// - The predicate must be reflexive and symmetric for + /// correctness. + /// + /// This operation is lossless: expanding the compressed mesh + /// reproduces the original sequence exactly under the same + /// equivalence. + pub fn compress_adjacent_in_place_by(&mut self, same: F) + where + F: FnMut(&T, &T) -> bool, + { + let values = match &mut self.rep { + Rep::Dense { values } => std::mem::take(values), + Rep::Compressed { .. } => return, + }; + let (table, runs) = compress_adjacent_with(values, same); + self.rep = Rep::Compressed { table, runs }; + } +} + +/// Performs simple run-length encoding (RLE) compression over a dense +/// sequence of values. +/// +/// Adjacent "equal" elements are coalesced into contiguous runs, +/// producing: +/// +/// - a **table** of unique values (in first-occurrence order) +/// - a **run list** of `(range, id)` pairs, where `range` is the +/// half-open index range `[start, end)` in the original dense +/// array, and `id` indexes into `table`. +/// +/// # Example +/// ``` +/// // Input: [A, A, B, B, B, A] +/// // Output: +/// // table = [A, B, A] +/// // runs = [(0..2, 0), (2..5, 1), (5..6, 2)] +/// ``` +/// +/// # Requirements +/// - `T: Clone` is required to copy elements into the table. +/// +/// # Returns +/// A tuple `(table, runs)` that together form the compressed +/// representation. Expanding the runs reproduces the original data. +fn compress_adjacent_with(values: Vec, mut same: F) -> (Vec, Vec) +where + F: FnMut(&T, &T) -> bool, +{ + // Empty input; trivial empty compression. + if values.is_empty() { + return (Vec::new(), Vec::new()); + } + + let mut table = Vec::new(); // unique values + let mut runs = Vec::new(); // (range, table_id) pairs + + let mut start = 0usize; + table.push(values[0].clone()); + let mut cur_id: u32 = 0; + + // Walk through all subsequent elements, closing and opening runs + // whenever the value changes. + for (i, _value) in values.iter().enumerate().skip(1) { + if !same(&values[i], &table[cur_id as usize]) { + // Close current run [start, i) + runs.push(Run::new(start, i, cur_id)); + + // Start a new run + start = i; + table.push(values[i].clone()); + cur_id = (table.len() - 1) as u32; + } + } + + // Close the final run + runs.push(Run::new(start, values.len(), cur_id)); + + (table, runs) +} + #[cfg(test)] mod tests { use std::convert::Infallible; @@ -378,9 +864,11 @@ mod tests { use ndslice::view::CollectMeshExt; use ndslice::view::MapIntoExt; use ndslice::view::Ranked; + use ndslice::view::RankedSliceable; use ndslice::view::ViewExt; use proptest::prelude::*; use proptest::strategy::ValueTree; + use serde_json; use super::*; @@ -818,7 +1306,10 @@ mod tests { let doubled: ValueMesh<_> = vm.map_into(|x| x * 2); assert_eq!(doubled.region, region); - assert_eq!(doubled.ranks, vec![0, 2, 4, 6, 8, 10]); + assert_eq!( + doubled.values().collect::>(), + vec![0, 2, 4, 6, 8, 10] + ); } #[test] @@ -831,7 +1322,7 @@ mod tests { let lens: ValueMesh<_> = vm.map_into(|s| s.len()); assert_eq!(lens.region, region); - assert_eq!(lens.ranks, vec![1, 1, 1, 1]); + assert_eq!(lens.values().collect::>(), vec![1, 1, 1, 1]); } #[test] @@ -900,7 +1391,7 @@ mod tests { assert_eq!(pending.region, region); // Drive the ready futures without a runtime and collect results. - let results: Vec<_> = pending.ranks.into_iter().map(poll_now).collect(); + let results: Vec<_> = pending.values().map(|f| poll_now(f.clone())).collect(); assert_eq!(results, vec![11, 21, 31, 41]); } @@ -911,7 +1402,7 @@ mod tests { let out: ValueMesh<_> = vm.map_into(|x| x * x); assert_eq!(out.region, region); - assert_eq!(out.ranks, vec![49]); + assert_eq!(out.values().collect::>(), vec![49]); } #[test] @@ -929,4 +1420,261 @@ mod tests { assert_eq!(projected.values().collect::>(), vec![10, 20, 30]); assert_eq!(projected.region(), ®ion); } + + #[test] + fn rle_roundtrip_all_equal() { + let region: Region = extent!(n = 6).into(); + let mut vm = ValueMesh::new_unchecked(region.clone(), vec![42; 6]); + + // Compress and ensure logical equality preserved. + vm.compress_adjacent_in_place(); + let collected: Vec<_> = vm.values().collect(); + assert_eq!(collected, vec![42, 42, 42, 42, 42, 42]); + + // Random access still works. + for i in 0..region.num_ranks() { + assert_eq!(vm.get(i), Some(&42)); + } + assert_eq!(vm.get(region.num_ranks()), None); // out-of-bounds + } + + #[test] + fn rle_roundtrip_alternating() { + let region: Region = extent!(n = 6).into(); + let original = vec![1, 2, 1, 2, 1, 2]; + let mut vm = ValueMesh::new_unchecked(region.clone(), original.clone()); + + vm.compress_adjacent_in_place(); + let collected: Vec<_> = vm.values().collect(); + assert_eq!(collected, original); + + // Spot-check random access after compression. + assert_eq!(vm.get(0), Some(&1)); + assert_eq!(vm.get(1), Some(&2)); + assert_eq!(vm.get(3), Some(&2)); + assert_eq!(vm.get(5), Some(&2)); + } + + #[test] + fn rle_roundtrip_blocky_and_slice() { + // Blocks: 0,0,0 | 1,1 | 2,2,2,2 | 3 + let region: Region = extent!(n = 10).into(); + let original = vec![0, 0, 0, 1, 1, 2, 2, 2, 2, 3]; + let mut vm = ValueMesh::new_unchecked(region.clone(), original.clone()); + + vm.compress_adjacent_in_place(); + let collected: Vec<_> = vm.values().collect(); + assert_eq!(collected, original); + + // Slice a middle subregion [3..8) → [1,1,2,2,2] + let sub_region = region.range("n", 3..8).unwrap(); + let sliced = vm.sliced(sub_region); + let sliced_vec: Vec<_> = sliced.values().collect(); + assert_eq!(sliced_vec, vec![1, 1, 2, 2, 2]); + } + + #[test] + fn rle_idempotent_noop_on_second_call() { + let region: Region = extent!(n = 7).into(); + let original = vec![9, 9, 9, 8, 8, 9, 9]; + let mut vm = ValueMesh::new_unchecked(region.clone(), original.clone()); + + vm.compress_adjacent_in_place(); + let once: Vec<_> = vm.values().collect(); + assert_eq!(once, original); + + // Calling again should be a no-op and still yield identical + // values. + vm.compress_adjacent_in_place(); + let twice: Vec<_> = vm.values().collect(); + assert_eq!(twice, original); + } + + #[test] + fn rle_works_after_build_indexed() { + // Build with shuffled pairs, then compress and verify + // semantics. + let region: Region = extent!(x = 2, y = 3).into(); // 6 + let pairs = vec![(3, 30), (0, 0), (5, 50), (2, 20), (1, 10), (4, 40)]; + let mut vm = pairs + .into_iter() + .collect_indexed::>(region.clone()) + .unwrap(); + + // Should compress to 6 runs of length 1; still must + // round-trip. + vm.compress_adjacent_in_place(); + let collected: Vec<_> = vm.values().collect(); + assert_eq!(collected, vec![0, 10, 20, 30, 40, 50]); + // Spot-check get() + assert_eq!(vm.get(4), Some(&40)); + } + + #[test] + fn rle_handles_singleton_mesh() { + let region: Region = extent!(n = 1).into(); + let mut vm = ValueMesh::new_unchecked(region.clone(), vec![123]); + + vm.compress_adjacent_in_place(); + let collected: Vec<_> = vm.values().collect(); + assert_eq!(collected, vec![123]); + assert_eq!(vm.get(0), Some(&123)); + assert_eq!(vm.get(1), None); + } + + #[test] + fn test_dense_round_trip() { + // Build a simple dense mesh of 5 integers. + let region: Region = extent!(x = 5).into(); + let dense = ValueMesh::new(region.clone(), vec![1, 2, 3, 4, 5]).unwrap(); + + let json = serde_json::to_string_pretty(&dense).unwrap(); + let restored: ValueMesh = serde_json::from_str(&json).unwrap(); + + assert_eq!(dense, restored); + + // Dense meshes should stay dense on the wire: check the + // tagged variant. + let v: serde_json::Value = serde_json::from_str(&json).unwrap(); + // enum tag is nested: {"rep": {"rep":"dense", ...}} + let tag = v + .get("rep") + .and_then(|o| o.get("rep")) + .and_then(|s| s.as_str()); + assert_eq!(tag, Some("dense")); + } + + #[test] + fn test_compressed_round_trip() { + // Build a dense mesh, compress it, and verify it stays + // compressed on the wire. + let region: Region = extent!(x = 10).into(); + let mut mesh = ValueMesh::new(region.clone(), vec![1, 1, 1, 2, 2, 3, 3, 3, 3, 3]).unwrap(); + mesh.compress_adjacent_in_place(); + + let json = serde_json::to_string_pretty(&mesh).unwrap(); + let restored: ValueMesh = serde_json::from_str(&json).unwrap(); + + // Logical equality preserved. + assert_eq!(mesh, restored); + + // Compressed meshes should stay compressed on the wire. + let v: serde_json::Value = serde_json::from_str(&json).unwrap(); + // enum tag is nested: {"rep": {"rep":"compressed", ...}} + let tag = v + .get("rep") + .and_then(|o| o.get("rep")) + .and_then(|s| s.as_str()); + assert_eq!(tag, Some("compressed")); + } + + #[test] + fn test_stable_run_encoding() { + let run = Run::new(0, 10, 42); + let json = serde_json::to_string(&run).unwrap(); + let decoded: Run = serde_json::from_str(&json).unwrap(); + + assert_eq!(run, decoded); + assert_eq!(run.start, 0); + assert_eq!(run.end, 10); + assert_eq!(run.id, 42); + + // Ensure conversion back to Range works. + let (range, id): (Range, u32) = run.try_into().unwrap(); + assert_eq!(range, 0..10); + assert_eq!(id, 42); + } + + #[test] + fn from_single_builds_single_run() { + let region: Region = extent!(n = 6).into(); + let vm = ValueMesh::from_single(region.clone(), 7); + + assert_eq!(vm.region(), ®ion); + assert_eq!(vm.values().collect::>(), vec![7, 7, 7, 7, 7, 7]); + assert_eq!(vm.get(0), Some(&7)); + assert_eq!(vm.get(5), Some(&7)); + assert_eq!(vm.get(6), None); + } + + #[test] + fn from_default_builds_with_default_value() { + let region: Region = extent!(n = 6).into(); + let vm = ValueMesh::::from_default(region.clone()); + + assert_eq!(vm.region(), ®ion); + // i32::default() == 0 + assert_eq!(vm.values().collect::>(), vec![0, 0, 0, 0, 0, 0]); + assert_eq!(vm.get(0), Some(&0)); + assert_eq!(vm.get(5), Some(&0)); + } + + #[test] + fn test_default_vs_single_equivalence() { + let region: Region = extent!(x = 4).into(); + let d1 = ValueMesh::::from_default(region.clone()); + let d2 = ValueMesh::from_single(region.clone(), 0); + assert_eq!(d1, d2); + } + + #[test] + fn build_from_ranges_with_default_basic() { + let region: Region = extent!(n = 10).into(); + let vm = ValueMesh::from_ranges_with_default( + region.clone(), + 0, // default + vec![(2..4, 1), (6..9, 2)], + ) + .unwrap(); + + assert_eq!(vm.region(), ®ion); + assert_eq!( + vm.values().collect::>(), + vec![0, 0, 1, 1, 0, 0, 2, 2, 2, 0] + ); + + // Internal shape: [0..2)->0, [2..4)->1, [4..6)->0, [6..9)->2, + // [9..10)->0 + if let Rep::Compressed { table, runs } = &vm.rep { + // Table is small and de-duplicated. + assert!(table.len() <= 3); + assert_eq!(runs.len(), 5); + } else { + panic!("expected compressed"); + } + } + + #[test] + fn build_from_ranges_with_default_edge_cases() { + let region: Region = extent!(n = 5).into(); + + // Full override covers entire region. + let vm = ValueMesh::from_ranges_with_default(region.clone(), 9, vec![(0..5, 3)]).unwrap(); + assert_eq!(vm.values().collect::>(), vec![3, 3, 3, 3, 3]); + + // Adjacent overrides and default gaps. + let vm = ValueMesh::from_ranges_with_default(region.clone(), 0, vec![(1..2, 7), (2..4, 7)]) + .unwrap(); + assert_eq!(vm.values().collect::>(), vec![0, 7, 7, 7, 0]); + + // Empty region. + let empty_region: Region = extent!(n = 0).into(); + let vm = ValueMesh::from_ranges_with_default(empty_region.clone(), 42, vec![]).unwrap(); + assert_eq!(vm.values().collect::>(), Vec::::new()); + } + + #[test] + fn from_dense_builds_and_compresses() { + let region: Region = extent!(n = 6).into(); + let mesh = ValueMesh::from_dense(region.clone(), vec![1, 1, 2, 2, 3, 3]).unwrap(); + + assert_eq!(mesh.region(), ®ion); + assert!(matches!(mesh.rep, Rep::Compressed { .. })); + assert_eq!(mesh.values().collect::>(), vec![1, 1, 2, 2, 3, 3]); + + // Spot-check indexing. + assert_eq!(mesh.get(0), Some(&1)); + assert_eq!(mesh.get(3), Some(&2)); + assert_eq!(mesh.get(5), Some(&3)); + } } diff --git a/monarch_hyperactor/src/value_mesh.rs b/monarch_hyperactor/src/value_mesh.rs index d518064b1..64d9fd9ac 100644 --- a/monarch_hyperactor/src/value_mesh.rs +++ b/monarch_hyperactor/src/value_mesh.rs @@ -39,8 +39,17 @@ impl PyValueMesh { let vals: Vec> = values.extract()?; // Build & validate cardinality against region. - let inner = > as BuildFromRegion>>::build_dense(region, vals) - .map_err(|e| PyValueError::new_err(e.to_string()))?; + let mut inner = + > as BuildFromRegion>>::build_dense(region, vals) + .map_err(|e| PyValueError::new_err(e.to_string()))?; + + // Coalesce adjacent identical Python objects (same pointer + // identity). For Py, we treat equality as object + // identity: consecutive references to the *same* object + // pointer are merged into RLE runs. This tends to compress + // sentinel/categorical/boolean data, but not freshly + // allocated numerics/strings. + inner.compress_adjacent_in_place_by(|a, b| a.as_ptr() == b.as_ptr()); Ok(Self { inner }) } @@ -71,6 +80,7 @@ impl PyValueMesh { // Py. `unwrap` is safe because the bounds have been // checked. let v: Py = self.inner.get(rank).unwrap().clone(); + Ok(v) } @@ -84,9 +94,19 @@ impl PyValueMesh { // Preserve the shape's original Slice (offset/strides). let s = shape.get_inner(); let region = Region::new(s.labels().to_vec(), s.slice().clone()); - let inner = > as ndslice::view::BuildFromRegionIndexed>> - ::build_indexed(region, pairs) - .map_err(|e| PyValueError::new_err(e.to_string()))?; + let mut inner = > as ndslice::view::BuildFromRegionIndexed< + Py, + >>::build_indexed(region, pairs) + .map_err(|e| PyValueError::new_err(e.to_string()))?; + + // Coalesce adjacent identical Python objects (same pointer + // identity). For Py, we treat equality as object + // identity: consecutive references to the *same* object + // pointer are merged into RLE runs. This tends to compress + // sentinel/categorical/boolean data, but not freshly + // allocated numerics/strings. + inner.compress_adjacent_in_place_by(|a, b| a.as_ptr() == b.as_ptr()); + Ok(Self { inner }) } }