From 04f2c618302f198711dbb5f3e4957907955db62f Mon Sep 17 00:00:00 2001 From: Petros Angelatos Date: Sat, 18 Jun 2022 12:15:53 +0200 Subject: [PATCH] frontier: tighter integration with rust traits The `frontier` module implements a common Rust pattern where there is one type representing a mutable version of something and a separate type representing its immutable counterpart. Examples in std are `String` and `str`, `PathBuf` and `Path`, `OsString` and `OsStr`, and other. In all the examples above the mutable version holds onto a heap allocated buffer and the immutable version is just a `#[repr(transparent)]` wrapper over an raw unsized slice of the data type. This patch changes `AntichainRef` to simply be a transparent wrapper over `[T]`. This change enables `Deref` implementations targeting the immutable antichain from both `MutableAntichain` and `Antichain`. This has a bunch of ergonomic improvements for users (due to auto-deref) and also allows having a single implementation of all methods that only need immutable access to the antichain. Specifically, the following methods have been deduplicated and are now implemented in only once place: * `AntichainRef::less_than` * `AntichainRef::less_equal` * `AntichainRef::dominates` * `AntichainRef::elements` * `::eq` * `::partial_cmp` * `::hash` Finally, this change also enables replacing the inherent `Antichain::borrow` and `Antichain::to_owned` methods by implementing the `std::borrow::Borrow` and `std::borrow::ToOwned` traits respectively. Signed-off-by: Petros Angelatos --- timely/src/dataflow/operators/capability.rs | 2 +- .../dataflow/operators/generic/notificator.rs | 4 +- timely/src/dataflow/operators/inspect.rs | 4 +- timely/src/dataflow/operators/probe.rs | 4 +- timely/src/progress/frontier.rs | 311 ++++++------------ timely/src/progress/reachability.rs | 4 +- 6 files changed, 117 insertions(+), 212 deletions(-) diff --git a/timely/src/dataflow/operators/capability.rs b/timely/src/dataflow/operators/capability.rs index c696640c7..cd448b0bd 100644 --- a/timely/src/dataflow/operators/capability.rs +++ b/timely/src/dataflow/operators/capability.rs @@ -403,7 +403,7 @@ impl CapabilitySet { /// let mut cap = CapabilitySet::from_elem(default_cap); /// let mut vector = Vec::new(); /// move |input, output| { - /// cap.downgrade(&input.frontier().frontier()); + /// cap.downgrade(&**input.frontier()); /// while let Some((time, data)) = input.next() { /// data.swap(&mut vector); /// } diff --git a/timely/src/dataflow/operators/generic/notificator.rs b/timely/src/dataflow/operators/generic/notificator.rs index 6c8dd8ea9..8e662a66e 100644 --- a/timely/src/dataflow/operators/generic/notificator.rs +++ b/timely/src/dataflow/operators/generic/notificator.rs @@ -40,8 +40,8 @@ impl<'a, T: Timestamp> Notificator<'a, T> { } /// Reveals the elements in the frontier of the indicated input. - pub fn frontier(&self, input: usize) -> AntichainRef { - self.frontiers[input].frontier() + pub fn frontier(&self, input: usize) -> &AntichainRef { + self.frontiers[input] } /// Requests a notification at the time associated with capability `cap`. diff --git a/timely/src/dataflow/operators/inspect.rs b/timely/src/dataflow/operators/inspect.rs index 53e0d0ca2..4631a509a 100644 --- a/timely/src/dataflow/operators/inspect.rs +++ b/timely/src/dataflow/operators/inspect.rs @@ -124,9 +124,9 @@ impl InspectCore for StreamCore { let mut frontier = crate::progress::Antichain::from_elem(G::Timestamp::minimum()); let mut vector = Default::default(); self.unary_frontier(Pipeline, "InspectBatch", move |_,_| move |input, output| { - if input.frontier.frontier() != frontier.borrow() { + if input.frontier != &frontier { frontier.clear(); - frontier.extend(input.frontier.frontier().iter().cloned()); + frontier.extend(input.frontier.iter().cloned()); func(Err(frontier.elements())); } input.for_each(|time, data| { diff --git a/timely/src/dataflow/operators/probe.rs b/timely/src/dataflow/operators/probe.rs index 7c5a8567e..ba1e85663 100644 --- a/timely/src/dataflow/operators/probe.rs +++ b/timely/src/dataflow/operators/probe.rs @@ -166,8 +166,8 @@ impl Handle { /// let frontier = handle.with_frontier(|frontier| frontier.to_vec()); /// ``` #[inline] - pub fn with_frontier)->R>(&self, mut function: F) -> R { - function(self.frontier.borrow().frontier()) + pub fn with_frontier)->R>(&self, mut function: F) -> R { + function(&self.frontier.borrow()) } } diff --git a/timely/src/progress/frontier.rs b/timely/src/progress/frontier.rs index ea52ba63e..c1e7cbb24 100644 --- a/timely/src/progress/frontier.rs +++ b/timely/src/progress/frontier.rs @@ -67,53 +67,6 @@ impl Antichain { } added } - - /// Returns true if any item in the antichain is strictly less than the argument. - /// - /// # Examples - /// - ///``` - /// use timely::progress::frontier::Antichain; - /// - /// let mut frontier = Antichain::from_elem(2); - /// assert!(frontier.less_than(&3)); - /// assert!(!frontier.less_than(&2)); - /// assert!(!frontier.less_than(&1)); - /// - /// frontier.clear(); - /// assert!(!frontier.less_than(&3)); - ///``` - #[inline] - pub fn less_than(&self, time: &T) -> bool { - self.elements.iter().any(|x| x.less_than(time)) - } - - /// Returns true if any item in the antichain is less than or equal to the argument. - /// - /// # Examples - /// - ///``` - /// use timely::progress::frontier::Antichain; - /// - /// let mut frontier = Antichain::from_elem(2); - /// assert!(frontier.less_equal(&3)); - /// assert!(frontier.less_equal(&2)); - /// assert!(!frontier.less_equal(&1)); - /// - /// frontier.clear(); - /// assert!(!frontier.less_equal(&3)); - ///``` - #[inline] - pub fn less_equal(&self, time: &T) -> bool { - self.elements.iter().any(|x| x.less_equal(time)) - } - - /// Returns true if every element of `other` is greater or equal to some element of `self`. - #[deprecated(since="0.12.0", note="please use `PartialOrder::less_equal` instead")] - #[inline] - pub fn dominates(&self, other: &Antichain) -> bool { - ::less_equal(self, other) - } } impl std::iter::FromIterator for Antichain { @@ -181,43 +134,23 @@ impl Antichain { /// Sorts the elements so that comparisons between antichains can be made. pub fn sort(&mut self) where T: Ord { self.elements.sort() } - - /// Reveals the elements in the antichain. - /// - /// This method is redundant with ` as Deref>`, but the method - /// is in such broad use that we probably don't want to deprecate it without - /// some time to fix all things. - /// - /// # Examples - /// - ///``` - /// use timely::progress::frontier::Antichain; - /// - /// let mut frontier = Antichain::from_elem(2); - /// assert_eq!(frontier.elements(), &[2]); - ///``` - #[inline] pub fn elements(&self) -> &[T] { &self[..] } - - /// Reveals the elements in the antichain. - /// - /// # Examples - /// - ///``` - /// use timely::progress::frontier::Antichain; - /// - /// let mut frontier = Antichain::from_elem(2); - /// assert_eq!(&*frontier.borrow(), &[2]); - ///``` - #[inline] pub fn borrow(&self) -> AntichainRef { AntichainRef::new(&self.elements) }} +} impl PartialEq for Antichain { fn eq(&self, other: &Self) -> bool { - // Lengths should be the same, with the option for fast acceptance if identical. - self.elements().len() == other.elements().len() && - ( - self.elements().iter().zip(other.elements().iter()).all(|(t1,t2)| t1 == t2) || - self.elements().iter().all(|t1| other.elements().iter().any(|t2| t1.eq(t2))) - ) + PartialEq::eq(&**self, &**other) + } +} + +impl PartialEq> for Antichain { + fn eq(&self, other: &MutableAntichain) -> bool { + PartialEq::eq(&**self, &**other) + } +} + +impl PartialEq<&AntichainRef> for Antichain { + fn eq(&self, other: &&AntichainRef) -> bool { + PartialEq::eq(&**self, *other) } } @@ -225,7 +158,7 @@ impl Eq for Antichain { } impl PartialOrder for Antichain { fn less_equal(&self, other: &Self) -> bool { - other.elements().iter().all(|t2| self.elements().iter().any(|t1| t1.less_equal(t2))) + PartialOrder::less_equal(&**self, &**other) } } @@ -246,16 +179,17 @@ impl Antichain { debug_assert!(self.len() <= 1); self.elements.pop() } - /// Return a reference to the at most one element the antichain contains. - pub fn as_option(&self) -> Option<&T> { - debug_assert!(self.len() <= 1); - self.elements.last() - } } impl std::hash::Hash for Antichain { fn hash(&self, state: &mut H) { - let mut temp = self.elements.iter().collect::>(); + std::hash::Hash::hash(&**self, state) + } +} + +impl std::hash::Hash for AntichainRef { + fn hash(&self, state: &mut H) { + let mut temp = self.iter().collect::>(); temp.sort(); for element in temp { element.hash(state); @@ -279,9 +213,9 @@ impl Into> for Antichain { } impl ::std::ops::Deref for Antichain { - type Target = [T]; + type Target = AntichainRef; fn deref(&self) -> &Self::Target { - &self.elements + AntichainRef::new(&self.elements) } } @@ -369,22 +303,6 @@ impl MutableAntichain { self.dirty = self.updates.len(); } - /// Reveals the minimal elements with positive count. - /// - /// # Examples - /// - ///``` - /// use timely::progress::frontier::MutableAntichain; - /// - /// let mut frontier = MutableAntichain::::new(); - /// assert!(frontier.frontier().len() == 0); - ///``` - #[inline] - pub fn frontier(&self) -> AntichainRef<'_, T> { - debug_assert_eq!(self.dirty, 0); - AntichainRef::new(&self.frontier) - } - /// Creates a new singleton `MutableAntichain`. /// /// # Examples @@ -393,7 +311,7 @@ impl MutableAntichain { /// use timely::progress::frontier::{AntichainRef, MutableAntichain}; /// /// let mut frontier = MutableAntichain::new_bottom(0u64); - /// assert!(frontier.frontier() == AntichainRef::new(&[0u64])); + /// assert!(frontier == AntichainRef::new(&[0u64])); ///``` #[inline] pub fn new_bottom(bottom: T) -> MutableAntichain @@ -408,64 +326,6 @@ impl MutableAntichain { } } - /// Returns true if there are no elements in the `MutableAntichain`. - /// - /// # Examples - /// - ///``` - /// use timely::progress::frontier::MutableAntichain; - /// - /// let mut frontier = MutableAntichain::::new(); - /// assert!(frontier.is_empty()); - ///``` - #[inline] - pub fn is_empty(&self) -> bool { - debug_assert_eq!(self.dirty, 0); - self.frontier.is_empty() - } - - /// Returns true if any item in the `MutableAntichain` is strictly less than the argument. - /// - /// # Examples - /// - ///``` - /// use timely::progress::frontier::MutableAntichain; - /// - /// let mut frontier = MutableAntichain::new_bottom(1u64); - /// assert!(!frontier.less_than(&0)); - /// assert!(!frontier.less_than(&1)); - /// assert!(frontier.less_than(&2)); - ///``` - #[inline] - pub fn less_than(&self, time: &T) -> bool - where - T: PartialOrder, - { - debug_assert_eq!(self.dirty, 0); - self.frontier().less_than(time) - } - - /// Returns true if any item in the `MutableAntichain` is less than or equal to the argument. - /// - /// # Examples - /// - ///``` - /// use timely::progress::frontier::MutableAntichain; - /// - /// let mut frontier = MutableAntichain::new_bottom(1u64); - /// assert!(!frontier.less_equal(&0)); - /// assert!(frontier.less_equal(&1)); - /// assert!(frontier.less_equal(&2)); - ///``` - #[inline] - pub fn less_equal(&self, time: &T) -> bool - where - T: PartialOrder, - { - debug_assert_eq!(self.dirty, 0); - self.frontier().less_equal(time) - } - /// Allows a single-element push, but dirties the antichain and prevents inspection until cleaned. /// /// At the moment inspection is prevented via panic, so best be careful (this should probably be fixed). @@ -491,7 +351,7 @@ impl MutableAntichain { /// .update_iter(vec![(1, -1), (2, 7)]) /// .collect::>(); /// - /// assert!(frontier.frontier() == AntichainRef::new(&[2])); + /// assert!(frontier == AntichainRef::new(&[2])); /// assert!(changes == vec![(1, -1), (2, 1)]); ///``` #[inline] @@ -587,6 +447,26 @@ impl MutableAntichain { } } +impl std::ops::Deref for MutableAntichain { + type Target = AntichainRef; + fn deref(&self) -> &Self::Target { + debug_assert_eq!(self.dirty, 0); + AntichainRef::new(&self.frontier) + } +} + +impl PartialEq> for MutableAntichain { + fn eq(&self, other: &Antichain) -> bool { + PartialEq::eq(&**self, &**other) + } +} + +impl PartialEq<&AntichainRef> for MutableAntichain { + fn eq(&self, other: &&AntichainRef) -> bool { + PartialEq::eq(&**self, *other) + } +} + impl Default for MutableAntichain { fn default() -> Self { Self::new() @@ -626,8 +506,8 @@ impl From> for MutableAntichain { result } } -impl<'a, T: PartialOrder+Ord+Clone> From> for MutableAntichain { - fn from(antichain: AntichainRef<'a, T>) -> Self { +impl From<&AntichainRef> for MutableAntichain { + fn from(antichain: &AntichainRef) -> Self { let mut result = MutableAntichain::new(); result.update_iter(antichain.into_iter().map(|time| (time.clone(), 1))); result @@ -650,50 +530,41 @@ where /// A wrapper for elements of an antichain. #[derive(Debug)] -pub struct AntichainRef<'a, T: 'a> { +#[repr(transparent)] +pub struct AntichainRef { /// Elements contained in the antichain. - frontier: &'a [T], -} - -impl<'a, T: 'a> Clone for AntichainRef<'a, T> { - fn clone(&self) -> Self { - Self { - frontier: self.frontier, - } - } + frontier: [T], } -impl<'a, T: 'a> Copy for AntichainRef<'a, T> { } - -impl<'a, T: 'a> AntichainRef<'a, T> { +impl AntichainRef { /// Create a new `AntichainRef` from a reference to a slice of elements forming the frontier. /// /// This method does not check that this antichain has any particular properties, for example /// that there are no elements strictly less than other elements. - pub fn new(frontier: &'a [T]) -> Self { - Self { - frontier, - } + pub fn new(frontier: &[T]) -> &Self { + // SAFETY: AntichainRef just wraps [T], and frontier is &[T], + // therefore transmuting &[T] to &AntichainRef is safe. + unsafe { std::mem::transmute(frontier) } } - /// Constructs an owned antichain from the antichain reference. + /// Reveals the elements in the antichain. + /// + /// This method is redundant with ` as Deref>`, but the method + /// is in such broad use that we probably don't want to deprecate it without + /// some time to fix all things. /// /// # Examples /// ///``` - /// use timely::progress::{Antichain, frontier::AntichainRef}; + /// use timely::progress::frontier::Antichain; /// - /// let frontier = AntichainRef::new(&[1u64]); - /// assert_eq!(frontier.to_owned(), Antichain::from_elem(1u64)); + /// let mut frontier = Antichain::from_elem(2); + /// assert_eq!(frontier.elements(), &[2]); ///``` - pub fn to_owned(&self) -> Antichain where T: Clone { - Antichain { - elements: self.frontier.to_vec() - } - } + #[inline] pub fn elements(&self) -> &[T] { &self[..] } } -impl<'a, T: 'a+PartialOrder> AntichainRef<'a, T> { +impl AntichainRef { /// Returns true if any item in the `AntichainRef` is strictly less than the argument. /// @@ -728,9 +599,43 @@ impl<'a, T: 'a+PartialOrder> AntichainRef<'a, T> { pub fn less_equal(&self, time: &T) -> bool { self.iter().any(|x| x.less_equal(time)) } + + /// Returns true if every element of `other` is greater or equal to some element of `self`. + #[deprecated(since="0.12.0", note="please use `PartialOrder::less_equal` instead")] + #[inline] + pub fn dominates(&self, other: &AntichainRef) -> bool { + ::less_equal(self, other) + } +} + +impl std::borrow::Borrow> for Antichain { + fn borrow(&self) -> &AntichainRef { + self + } +} + +impl ToOwned for AntichainRef { + type Owned = Antichain; + fn to_owned(&self) -> Self::Owned { + Antichain { + elements: self.frontier.to_owned() + } + } +} + +impl PartialEq> for AntichainRef { + fn eq(&self, other: &MutableAntichain) -> bool { + PartialEq::eq(self, &**other) + } +} + +impl PartialEq> for AntichainRef { + fn eq(&self, other: &Antichain) -> bool { + PartialEq::eq(self, &**other) + } } -impl<'a, T: PartialEq> PartialEq for AntichainRef<'a, T> { +impl PartialEq for AntichainRef { fn eq(&self, other: &Self) -> bool { // Lengths should be the same, with the option for fast acceptance if identical. self.len() == other.len() && @@ -741,17 +646,17 @@ impl<'a, T: PartialEq> PartialEq for AntichainRef<'a, T> { } } -impl<'a, T: Eq> Eq for AntichainRef<'a, T> { } +impl Eq for AntichainRef { } -impl<'a, T: PartialOrder> PartialOrder for AntichainRef<'a, T> { +impl PartialOrder for AntichainRef { fn less_equal(&self, other: &Self) -> bool { other.iter().all(|t2| self.iter().any(|t1| t1.less_equal(t2))) } } -impl<'a, T: TotalOrder> TotalOrder for AntichainRef<'a, T> { } +impl TotalOrder for AntichainRef { } -impl<'a, T: TotalOrder> AntichainRef<'a, T> { +impl AntichainRef { /// Return a reference to the at most one element the antichain contains. pub fn as_option(&self) -> Option<&T> { debug_assert!(self.len() <= 1); @@ -759,14 +664,14 @@ impl<'a, T: TotalOrder> AntichainRef<'a, T> { } } -impl<'a, T> ::std::ops::Deref for AntichainRef<'a, T> { +impl ::std::ops::Deref for AntichainRef { type Target = [T]; fn deref(&self) -> &Self::Target { - self.frontier + &self.frontier } } -impl<'a, T: 'a> ::std::iter::IntoIterator for &'a AntichainRef<'a, T> { +impl<'a, T> ::std::iter::IntoIterator for &'a AntichainRef { type Item = &'a T; type IntoIter = ::std::slice::Iter<'a, T>; fn into_iter(self) -> Self::IntoIter { diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index 8e51c839f..b4fa69cfc 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -456,7 +456,7 @@ impl PortInformation { /// this pointstamp. #[inline] pub fn is_global(&self, time: &T) -> bool { - let dominated = self.implications.frontier().iter().any(|t| t.less_than(time)); + let dominated = self.implications.iter().any(|t| t.less_than(time)); let redundant = self.implications.count_for(time) > 1; !dominated && !redundant } @@ -897,4 +897,4 @@ pub mod logging { impl From for TrackerEvent { fn from(v: TargetUpdate) -> TrackerEvent { TrackerEvent::TargetUpdate(v) } } -} \ No newline at end of file +}