diff --git a/Cargo.toml b/Cargo.toml index 2ccf36636..e73ee007a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,10 +42,12 @@ graph_map = "0.1" bytemuck = "1.18.0" [dependencies] -serde = { version = "1.0", features = ["derive"] } +columnar = "0.3" +columnation = "0.1.0" fnv="1.0.2" +paste = "1.0" +serde = { version = "1.0", features = ["derive"] } timely = {workspace = true} -columnar = "0.3" [workspace.dependencies] timely = { version = "0.18", default-features = false } diff --git a/examples/columnar.rs b/examples/columnar.rs index 80e41c521..af864bffb 100644 --- a/examples/columnar.rs +++ b/examples/columnar.rs @@ -348,7 +348,7 @@ mod builder { use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher; use differential_dataflow::trace::implementations::merge_batcher::ColMerger; -use timely::container::columnation::TimelyStack; +use differential_dataflow::containers::TimelyStack; /// A batcher for columnar storage. pub type Col2ValBatcher = MergeBatcher, batcher::Chunker>, ColMerger<(K,V),T,R>>; diff --git a/examples/spines.rs b/examples/spines.rs index bc3e4ce32..9d5a82019 100644 --- a/examples/spines.rs +++ b/examples/spines.rs @@ -64,13 +64,6 @@ fn main() { keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); }, - "flat" => { - use differential_dataflow::trace::implementations::ord_neu::{FlatKeyBatcherDefault, FlatKeyBuilderDefault, FlatKeySpineDefault}; - let data = data.arrange::, FlatKeyBuilderDefault, FlatKeySpineDefault>(); - let keys = keys.arrange::, FlatKeyBuilderDefault, FlatKeySpineDefault>(); - keys.join_core(&data, |_k, (), ()| Option::<()>::None) - .probe_with(&mut probe); - } _ => { println!("unrecognized mode: {:?}", mode) } diff --git a/src/consolidation.rs b/src/consolidation.rs index 812207552..9c1ed008e 100644 --- a/src/consolidation.rs +++ b/src/consolidation.rs @@ -14,11 +14,8 @@ use std::cmp::Ordering; use std::collections::VecDeque; use timely::Container; use timely::container::{ContainerBuilder, PushInto}; -use timely::container::flatcontainer::{FlatStack, Push, Region}; -use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion}; -use crate::Data; +use crate::{IntoOwned, Data}; use crate::difference::{IsZero, Semigroup}; -use crate::trace::cursor::IntoOwned; /// Sorts and consolidates `vec`. /// @@ -321,36 +318,6 @@ where } } -impl ConsolidateLayout for FlatStack, T, R>> -where - for<'a> K: Region + Push<::ReadItem<'a>> + Clone + 'static, - for<'a> K::ReadItem<'a>: Ord + Copy, - for<'a> V: Region + Push<::ReadItem<'a>> + Clone + 'static, - for<'a> V::ReadItem<'a>: Ord + Copy, - for<'a> T: Region + Push<::ReadItem<'a>> + Clone + 'static, - for<'a> T::ReadItem<'a>: Ord + Copy, - R: Region + Push<::Owned> + Clone + 'static, - for<'a> R::Owned: Semigroup>, -{ - type Key<'a> = (K::ReadItem<'a>, V::ReadItem<'a>, T::ReadItem<'a>) where Self: 'a; - type Diff<'a> = R::ReadItem<'a> where Self: 'a; - type DiffOwned = R::Owned; - - fn into_parts(((key, val), time, diff): Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) { - ((key, val, time), diff) - } - - fn cmp<'a>(((key1, val1), time1, _diff1): &Self::Item<'_>, ((key2, val2), time2, _diff2): &Self::Item<'_>) -> Ordering { - (K::reborrow(*key1), V::reborrow(*val1), T::reborrow(*time1)).cmp(&(K::reborrow(*key2), V::reborrow(*val2), T::reborrow(*time2))) - } - - fn push_with_diff(&mut self, (key, value, time): Self::Key<'_>, diff: Self::DiffOwned) { - self.copy(((key, value), time, diff)); - } -} - - - #[cfg(test)] mod tests { use super::*; diff --git a/src/containers.rs b/src/containers.rs new file mode 100644 index 000000000..84d10017f --- /dev/null +++ b/src/containers.rs @@ -0,0 +1,323 @@ +//! A columnar container based on the columnation library. + +use std::iter::FromIterator; + +pub use columnation::*; +use timely::container::PushInto; + +/// An append-only vector that store records as columns. +/// +/// This container maintains elements that might conventionally own +/// memory allocations, but instead the pointers to those allocations +/// reference larger regions of memory shared with multiple instances +/// of the type. Elements can be retrieved as references, and care is +/// taken when this type is dropped to ensure that the correct memory +/// is returned (rather than the incorrect memory, from running the +/// elements `Drop` implementations). +pub struct TimelyStack { + local: Vec, + inner: T::InnerRegion, +} + +impl TimelyStack { + /// Construct a [TimelyStack], reserving space for `capacity` elements + /// + /// Note that the associated region is not initialized to a specific capacity + /// because we can't generally know how much space would be required. + pub fn with_capacity(capacity: usize) -> Self { + Self { + local: Vec::with_capacity(capacity), + inner: T::InnerRegion::default(), + } + } + + /// Ensures `Self` can absorb `items` without further allocations. + /// + /// The argument `items` may be cloned and iterated multiple times. + /// Please be careful if it contains side effects. + #[inline(always)] + pub fn reserve_items<'a, I>(&mut self, items: I) + where + I: Iterator+Clone, + T: 'a, + { + self.local.reserve(items.clone().count()); + self.inner.reserve_items(items); + } + + /// Ensures `Self` can absorb `items` without further allocations. + /// + /// The argument `items` may be cloned and iterated multiple times. + /// Please be careful if it contains side effects. + #[inline(always)] + pub fn reserve_regions<'a, I>(&mut self, regions: I) + where + Self: 'a, + I: Iterator+Clone, + { + self.local.reserve(regions.clone().map(|cs| cs.local.len()).sum()); + self.inner.reserve_regions(regions.map(|cs| &cs.inner)); + } + + + + /// Copies an element in to the region. + /// + /// The element can be read by indexing + pub fn copy(&mut self, item: &T) { + // TODO: Some types `T` should just be cloned. + // E.g. types that are `Copy` or vecs of ZSTs. + unsafe { + self.local.push(self.inner.copy(item)); + } + } + /// Empties the collection. + pub fn clear(&mut self) { + unsafe { + // Unsafety justified in that setting the length to zero exposes + // no invalid data. + self.local.set_len(0); + self.inner.clear(); + } + } + /// Retain elements that pass a predicate, from a specified offset. + /// + /// This method may or may not reclaim memory in the inner region. + pub fn retain_from bool>(&mut self, index: usize, mut predicate: P) { + let mut write_position = index; + for position in index..self.local.len() { + if predicate(&self[position]) { + // TODO: compact the inner region and update pointers. + self.local.swap(position, write_position); + write_position += 1; + } + } + unsafe { + // Unsafety justified in that `write_position` is no greater than + // `self.local.len()` and so this exposes no invalid data. + self.local.set_len(write_position); + } + } + + /// Unsafe access to `local` data. The slices stor data that is backed by a region + /// allocation. Therefore, it is undefined behavior to mutate elements of the `local` slice. + /// + /// # Safety + /// Elements within `local` can be reordered, but not mutated, removed and/or dropped. + pub unsafe fn local(&mut self) -> &mut [T] { + &mut self.local[..] + } + + /// Estimate the memory capacity in bytes. + #[inline] + pub fn heap_size(&self, mut callback: impl FnMut(usize, usize)) { + let size_of = std::mem::size_of::(); + callback(self.local.len() * size_of, self.local.capacity() * size_of); + self.inner.heap_size(callback); + } + + /// Estimate the consumed memory capacity in bytes, summing both used and total capacity. + #[inline] + pub fn summed_heap_size(&self) -> (usize, usize) { + let (mut length, mut capacity) = (0, 0); + self.heap_size(|len, cap| { + length += len; + capacity += cap + }); + (length, capacity) + } + + /// The length in items. + #[inline] + pub fn len(&self) -> usize { + self.local.len() + } + + /// Returns `true` if the stack is empty. + pub fn is_empty(&self) -> bool { + self.local.is_empty() + } + + /// The capacity of the local vector. + #[inline] + pub fn capacity(&self) -> usize { + self.local.capacity() + } + + /// Reserve space for `additional` elements. + #[inline] + pub fn reserve(&mut self, additional: usize) { + self.local.reserve(additional) + } +} + +impl TimelyStack<(A, B)> { + /// Copies a destructured tuple `(A, B)` into this column stack. + /// + /// This serves situations where a tuple should be constructed from its constituents but + /// not all elements are available as owned data. + /// + /// The element can be read by indexing + pub fn copy_destructured(&mut self, t1: &A, t2: &B) { + unsafe { + self.local.push(self.inner.copy_destructured(t1, t2)); + } + } +} + +impl TimelyStack<(A, B, C)> { + /// Copies a destructured tuple `(A, B, C)` into this column stack. + /// + /// This serves situations where a tuple should be constructed from its constituents but + /// not all elements are available as owned data. + /// + /// The element can be read by indexing + pub fn copy_destructured(&mut self, r0: &A, r1: &B, r2: &C) { + unsafe { + self.local.push(self.inner.copy_destructured(r0, r1, r2)); + } + } +} + +impl std::ops::Deref for TimelyStack { + type Target = [T]; + #[inline(always)] + fn deref(&self) -> &Self::Target { + &self.local[..] + } +} + +impl Drop for TimelyStack { + fn drop(&mut self) { + self.clear(); + } +} + +impl Default for TimelyStack { + fn default() -> Self { + Self { + local: Vec::new(), + inner: T::InnerRegion::default(), + } + } +} + +impl<'a, A: 'a + Columnation> FromIterator<&'a A> for TimelyStack { + fn from_iter>(iter: T) -> Self { + let iter = iter.into_iter(); + let mut c = TimelyStack::::with_capacity(iter.size_hint().0); + for element in iter { + c.copy(element); + } + + c + } +} + +impl PartialEq for TimelyStack { + fn eq(&self, other: &Self) -> bool { + PartialEq::eq(&self[..], &other[..]) + } +} + +impl Eq for TimelyStack {} + +impl std::fmt::Debug for TimelyStack { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self[..].fmt(f) + } +} + +impl Clone for TimelyStack { + fn clone(&self) -> Self { + let mut new: Self = Default::default(); + for item in &self[..] { + new.copy(item); + } + new + } + + fn clone_from(&mut self, source: &Self) { + self.clear(); + for item in &source[..] { + self.copy(item); + } + } +} + +impl PushInto for TimelyStack { + #[inline] + fn push_into(&mut self, item: T) { + self.copy(&item); + } +} + +impl PushInto<&T> for TimelyStack { + #[inline] + fn push_into(&mut self, item: &T) { + self.copy(item); + } +} + + +impl PushInto<&&T> for TimelyStack { + #[inline] + fn push_into(&mut self, item: &&T) { + self.copy(*item); + } +} + +mod container { + use std::ops::Deref; + + use columnation::Columnation; + use timely::Container; + use timely::container::SizableContainer; + + use crate::containers::TimelyStack; + + impl Container for TimelyStack { + type ItemRef<'a> = &'a T where Self: 'a; + type Item<'a> = &'a T where Self: 'a; + + fn len(&self) -> usize { + self.local.len() + } + + fn is_empty(&self) -> bool { + self.local.is_empty() + } + + fn clear(&mut self) { + TimelyStack::clear(self) + } + + type Iter<'a> = std::slice::Iter<'a, T> where Self: 'a; + + fn iter(&self) -> Self::Iter<'_> { + self.deref().iter() + } + + type DrainIter<'a> = std::slice::Iter<'a, T> where Self: 'a; + + fn drain(&mut self) -> Self::DrainIter<'_> { + (*self).iter() + } + } + + impl SizableContainer for TimelyStack { + fn at_capacity(&self) -> bool { + self.len() == self.capacity() + } + fn ensure_capacity(&mut self, stash: &mut Option) { + if self.capacity() == 0 { + *self = stash.take().unwrap_or_default(); + self.clear(); + } + let preferred = timely::container::buffer::default_capacity::(); + if self.capacity() < preferred { + self.reserve(preferred - self.capacity()); + } + } + } +} diff --git a/src/dynamic/pointstamp.rs b/src/dynamic/pointstamp.rs index f28fb348e..5928da185 100644 --- a/src/dynamic/pointstamp.rs +++ b/src/dynamic/pointstamp.rs @@ -69,7 +69,7 @@ impl PointStamp { } /// Returns the wrapped vector. /// - /// This method is the support way to mutate the contents of `self`, by extracting + /// This method is the support way to mutate the contents of `self`, by extracting /// the vector and then re-introducing it with `PointStamp::new` to re-establish /// the invariant that the vector not end with `T::minimum`. pub fn into_vec(self) -> Vec { @@ -253,50 +253,55 @@ impl Lattice for PointStamp { } } -use timely::container::columnation::{Columnation, Region}; -impl Columnation for PointStamp { - type InnerRegion = PointStampStack; -} +mod columnation { + use columnation::{Columnation, Region}; + + use crate::dynamic::pointstamp::PointStamp; + + impl Columnation for PointStamp { + type InnerRegion = PointStampStack; + } -/// Stack for PointStamp. Part of Columnation implementation. -pub struct PointStampStack( as Columnation>::InnerRegion) -where - ::Item: Columnation; + /// Stack for PointStamp. Part of Columnation implementation. + pub struct PointStampStack( as Columnation>::InnerRegion) + where + ::Item: Columnation; -impl Default for PointStampStack + impl Default for PointStampStack where ::Item: Columnation -{ - #[inline] - fn default() -> Self { - Self(Default::default()) + { + #[inline] + fn default() -> Self { + Self(Default::default()) + } } -} -impl Region for PointStampStack + impl Region for PointStampStack where ::Item: Columnation -{ - type Item = PointStamp; + { + type Item = PointStamp; - #[inline] - unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item { - Self::Item { vector: self.0.copy(&item.vector) } - } + #[inline] + unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item { + Self::Item { vector: self.0.copy(&item.vector) } + } - fn clear(&mut self) { - self.0.clear(); - } + fn clear(&mut self) { + self.0.clear(); + } - fn reserve_items<'a, I>(&mut self, items: I) where Self: 'a, I: Iterator + Clone { - self.0.reserve_items(items.map(|x| &x.vector)); - } + fn reserve_items<'a, I>(&mut self, items: I) where Self: 'a, I: Iterator + Clone { + self.0.reserve_items(items.map(|x| &x.vector)); + } - fn reserve_regions<'a, I>(&mut self, regions: I) where Self: 'a, I: Iterator + Clone { - self.0.reserve_regions(regions.map(|r| &r.0)); - } + fn reserve_regions<'a, I>(&mut self, regions: I) where Self: 'a, I: Iterator + Clone { + self.0.reserve_regions(regions.map(|r| &r.0)); + } - fn heap_size(&self, callback: impl FnMut(usize, usize)) { - self.0.heap_size(callback); + fn heap_size(&self, callback: impl FnMut(usize, usize)) { + self.0.heap_size(callback); + } } } diff --git a/src/into_owned.rs b/src/into_owned.rs new file mode 100644 index 000000000..a4a91f49c --- /dev/null +++ b/src/into_owned.rs @@ -0,0 +1,204 @@ +//! Traits for converting between owned and borrowed types. + +/// A reference type corresponding to an owned type, supporting conversion in each direction. +/// +/// This trait can be implemented by a GAT, and enables owned types to be borrowed as a GAT. +/// This trait is analogous to `ToOwned`, but not as prescriptive. Specifically, it avoids the +/// requirement that the other trait implement `Borrow`, for which a borrow must result in a +/// `&'self Borrowed`, which cannot move the lifetime into a GAT borrowed type. +pub trait IntoOwned<'a> { + /// Owned type into which this type can be converted. + type Owned; + /// Conversion from an instance of this type to the owned type. + #[must_use] + fn into_owned(self) -> Self::Owned; + /// Clones `self` onto an existing instance of the owned type. + fn clone_onto(self, other: &mut Self::Owned); + /// Borrows an owned instance as oneself. + #[must_use] + fn borrow_as(owned: &'a Self::Owned) -> Self; +} + +impl<'a, T: ToOwned + ?Sized> IntoOwned<'a> for &'a T { + type Owned = T::Owned; + #[inline] + fn into_owned(self) -> Self::Owned { + self.to_owned() + } + #[inline] + fn clone_onto(self, other: &mut Self::Owned) { + ::clone_into(self, other) + } + #[inline] + fn borrow_as(owned: &'a Self::Owned) -> Self { + std::borrow::Borrow::borrow(owned) + } +} + +impl<'a, T, E> IntoOwned<'a> for Result +where + T: IntoOwned<'a>, + E: IntoOwned<'a>, +{ + type Owned = Result; + + #[inline] + fn into_owned(self) -> Self::Owned { + self.map(T::into_owned).map_err(E::into_owned) + } + + #[inline] + fn clone_onto(self, other: &mut Self::Owned) { + match (self, other) { + (Ok(item), Ok(target)) => T::clone_onto(item, target), + (Err(item), Err(target)) => E::clone_onto(item, target), + (Ok(item), target) => *target = Ok(T::into_owned(item)), + (Err(item), target) => *target = Err(E::into_owned(item)), + } + } + + #[inline] + fn borrow_as(owned: &'a Self::Owned) -> Self { + owned.as_ref().map(T::borrow_as).map_err(E::borrow_as) + } +} + +impl<'a, T> IntoOwned<'a> for Option +where + T: IntoOwned<'a>, +{ + type Owned = Option; + + #[inline] + fn into_owned(self) -> Self::Owned { + self.map(IntoOwned::into_owned) + } + + #[inline] + fn clone_onto(self, other: &mut Self::Owned) { + match (self, other) { + (Some(item), Some(target)) => T::clone_onto(item, target), + (Some(item), target) => *target = Some(T::into_owned(item)), + (None, target) => *target = None, + } + } + + #[inline] + fn borrow_as(owned: &'a Self::Owned) -> Self { + owned.as_ref().map(T::borrow_as) + } +} + +mod tuple { + use paste::paste; + + macro_rules! tuple { + ($($name:ident)+) => (paste! { + + #[allow(non_camel_case_types)] + #[allow(non_snake_case)] + impl<'a, $($name),*> crate::IntoOwned<'a> for ($($name,)*) + where + $($name: crate::IntoOwned<'a>),* + { + type Owned = ($($name::Owned,)*); + + #[inline] + fn into_owned(self) -> Self::Owned { + let ($($name,)*) = self; + ( + $($name.into_owned(),)* + ) + } + + #[inline] + fn clone_onto(self, other: &mut Self::Owned) { + let ($($name,)*) = self; + let ($([<$name _other>],)*) = other; + $($name.clone_onto([<$name _other>]);)* + } + + #[inline] + fn borrow_as(owned: &'a Self::Owned) -> Self { + let ($($name,)*) = owned; + ( + $($name::borrow_as($name),)* + ) + } + } + }) + } + + tuple!(A); + tuple!(A B); + tuple!(A B C); + tuple!(A B C D); + tuple!(A B C D E); + tuple!(A B C D E F); + tuple!(A B C D E F G); + tuple!(A B C D E F G H); + tuple!(A B C D E F G H I); + tuple!(A B C D E F G H I J); + tuple!(A B C D E F G H I J K); + tuple!(A B C D E F G H I J K L); + tuple!(A B C D E F G H I J K L M); + tuple!(A B C D E F G H I J K L M N); + tuple!(A B C D E F G H I J K L M N O); + tuple!(A B C D E F G H I J K L M N O P); +} + +mod primitive { + macro_rules! implement_for { + ($index_type:ty) => { + impl<'a> crate::IntoOwned<'a> for $index_type { + type Owned = $index_type; + + #[inline] + fn into_owned(self) -> Self::Owned { + self + } + + #[inline] + fn clone_onto(self, other: &mut Self::Owned) { + *other = self; + } + + #[inline] + fn borrow_as(owned: &'a Self::Owned) -> Self { + *owned + } + } + }; +} + + implement_for!(()); + implement_for!(bool); + implement_for!(char); + + implement_for!(u8); + implement_for!(u16); + implement_for!(u32); + implement_for!(u64); + implement_for!(u128); + implement_for!(usize); + + implement_for!(i8); + implement_for!(i16); + implement_for!(i32); + implement_for!(i64); + implement_for!(i128); + implement_for!(isize); + + implement_for!(f32); + implement_for!(f64); + + implement_for!(std::num::Wrapping); + implement_for!(std::num::Wrapping); + implement_for!(std::num::Wrapping); + implement_for!(std::num::Wrapping); + implement_for!(std::num::Wrapping); + implement_for!(std::num::Wrapping); + + implement_for!(std::time::Duration); + +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index dfd08f284..c67b356cf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -79,6 +79,7 @@ use std::fmt::Debug; pub use collection::{Collection, AsCollection}; pub use hashable::Hashable; pub use difference::Abelian as Diff; +pub use into_owned::IntoOwned; /// Data type usable in differential dataflow. /// @@ -104,6 +105,8 @@ pub mod collection; pub mod logging; pub mod consolidation; pub mod capture; +pub mod containers; +mod into_owned; /// Configuration options for differential dataflow. #[derive(Default)] diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 93df7bc77..c18193882 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -26,7 +26,7 @@ use timely::progress::Timestamp; use timely::progress::Antichain; use timely::dataflow::operators::Capability; -use crate::{Data, ExchangeData, Collection, AsCollection, Hashable}; +use crate::{Data, ExchangeData, Collection, AsCollection, Hashable, IntoOwned}; use crate::difference::Semigroup; use crate::lattice::Lattice; use crate::trace::{self, Trace, TraceReader, Batch, BatchReader, Batcher, Builder, Cursor}; @@ -279,8 +279,6 @@ where } } -use crate::trace::cursor::IntoOwned; - // Direct reduce implementations. use crate::difference::Abelian; impl Arranged diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 62f9fe844..57b215907 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -111,8 +111,7 @@ use timely::dataflow::operators::Capability; use crate::operators::arrange::arrangement::Arranged; use crate::trace::{Builder, Description}; use crate::trace::{self, Trace, TraceReader, Batch, Cursor}; -use crate::trace::cursor::IntoOwned; -use crate::{ExchangeData, Hashable}; +use crate::{ExchangeData, Hashable, IntoOwned}; use super::TraceAgent; diff --git a/src/operators/consolidate.rs b/src/operators/consolidate.rs index e4b718123..602f58f87 100644 --- a/src/operators/consolidate.rs +++ b/src/operators/consolidate.rs @@ -8,9 +8,7 @@ use timely::dataflow::Scope; -use crate::trace::cursor::IntoOwned; - -use crate::{Collection, ExchangeData, Hashable}; +use crate::{IntoOwned, Collection, ExchangeData, Hashable}; use crate::consolidation::ConsolidatingContainerBuilder; use crate::difference::Semigroup; diff --git a/src/operators/count.rs b/src/operators/count.rs index af310b664..15d06b42b 100644 --- a/src/operators/count.rs +++ b/src/operators/count.rs @@ -5,10 +5,8 @@ use timely::dataflow::*; use timely::dataflow::operators::Operator; use timely::dataflow::channels::pact::Pipeline; -use crate::trace::cursor::IntoOwned; - use crate::lattice::Lattice; -use crate::{ExchangeData, Collection}; +use crate::{IntoOwned, ExchangeData, Collection}; use crate::difference::{IsZero, Semigroup}; use crate::hashable::Hashable; use crate::collection::AsCollection; diff --git a/src/operators/join.rs b/src/operators/join.rs index f029a4c3d..560c1d913 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -668,7 +668,7 @@ where Ordering::Greater => batch.seek_key(batch_storage, trace.key(trace_storage)), Ordering::Equal => { - use crate::trace::cursor::IntoOwned; + use crate::IntoOwned; thinker.history1.edits.load(trace, trace_storage, |time| { let mut time = time.into_owned(); diff --git a/src/operators/mod.rs b/src/operators/mod.rs index f069df475..615cfa399 100644 --- a/src/operators/mod.rs +++ b/src/operators/mod.rs @@ -22,7 +22,7 @@ pub mod threshold; use crate::lattice::Lattice; use crate::trace::Cursor; -use crate::trace::cursor::IntoOwned; +use crate::IntoOwned; /// An accumulation of (value, time, diff) updates. struct EditList<'a, C: Cursor> { diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index b7a61e89b..7bb0495c3 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -8,7 +8,7 @@ use timely::Container; use timely::container::PushInto; use crate::hashable::Hashable; -use crate::{Data, ExchangeData, Collection}; +use crate::{Data, ExchangeData, Collection, IntoOwned}; use crate::difference::{Semigroup, Abelian}; use timely::order::PartialOrder; @@ -19,8 +19,6 @@ use timely::dataflow::operators::Operator; use timely::dataflow::channels::pact::Pipeline; use timely::dataflow::operators::Capability; -use crate::trace::cursor::IntoOwned; - use crate::operators::arrange::{Arranged, ArrangeByKey, ArrangeBySelf, TraceAgent}; use crate::lattice::Lattice; use crate::trace::{Batch, BatchReader, Cursor, Trace, Builder, ExertionLogic, Description}; @@ -482,7 +480,6 @@ where while batch_cursor.key_valid(batch_storage) || exposed_position < exposed.len() { use std::borrow::Borrow; - use crate::trace::cursor::IntoOwned; // Determine the next key we will work on; could be synthetic, could be from a batch. let key1 = exposed.get(exposed_position).map(|x| <_ as IntoOwned>::borrow_as(&x.0)); @@ -669,13 +666,13 @@ where /// Implementation based on replaying historical and new updates together. mod history_replay { + use timely::progress::Antichain; + use timely::PartialOrder; + use crate::lattice::Lattice; use crate::trace::Cursor; - use crate::trace::cursor::IntoOwned; use crate::operators::ValueHistory; - use timely::progress::Antichain; - - use timely::PartialOrder; + use crate::IntoOwned; use super::{PerKeyCompute, sort_dedup}; diff --git a/src/operators/threshold.rs b/src/operators/threshold.rs index d2add618f..187a94704 100644 --- a/src/operators/threshold.rs +++ b/src/operators/threshold.rs @@ -140,7 +140,7 @@ where } }); - use crate::trace::cursor::IntoOwned; + use crate::IntoOwned; if let Some(capability) = cap { let mut session = output.session(&capability); diff --git a/src/trace/cursor/mod.rs b/src/trace/cursor/mod.rs index 9233c4962..e53c0425a 100644 --- a/src/trace/cursor/mod.rs +++ b/src/trace/cursor/mod.rs @@ -6,15 +6,16 @@ //! supports efficient seeking (via the `seek_key` and `seek_val` methods). use timely::progress::Timestamp; + use crate::difference::Semigroup; +// `pub use` for legacy reasons. +pub use crate::IntoOwned; use crate::lattice::Lattice; pub mod cursor_list; pub use self::cursor_list::CursorList; -pub use timely::container::flatcontainer::IntoOwned; - /// A cursor for navigating ordered `(key, val, time, diff)` updates. pub trait Cursor { diff --git a/src/trace/implementations/chunker.rs b/src/trace/implementations/chunker.rs index 111d7c1e8..0eef4d779 100644 --- a/src/trace/implementations/chunker.rs +++ b/src/trace/implementations/chunker.rs @@ -1,9 +1,12 @@ //! Organize streams of data into sorted chunks. use std::collections::VecDeque; + +use columnation::Columnation; use timely::Container; -use timely::container::columnation::{Columnation, TimelyStack}; use timely::container::{ContainerBuilder, PushInto, SizableContainer}; + +use crate::containers::TimelyStack; use crate::consolidation::{consolidate_updates, ConsolidateLayout}; use crate::difference::Semigroup; diff --git a/src/trace/implementations/huffman_container.rs b/src/trace/implementations/huffman_container.rs index 288ba8d4c..cd8bbbb09 100644 --- a/src/trace/implementations/huffman_container.rs +++ b/src/trace/implementations/huffman_container.rs @@ -150,7 +150,7 @@ impl Default for HuffmanContainer { mod wrapper { - use crate::trace::IntoOwned; + use crate::IntoOwned; use super::Encoded; pub struct Wrapped<'a, B: Ord> { diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index 5ffab9c64..4de943594 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -294,7 +294,7 @@ pub mod container { (self.len(), size, capacity, allocations) } } - + /// A merger for arbitrary containers. /// /// `MC` is a [`Container`] that implements [`MergerChunk`]. @@ -494,16 +494,16 @@ pub mod container { >::from(list) } } - + impl MergerChunk for Vec<(D, T, R)> { type TimeOwned = T; type DiffOwned = (); - + fn time_kept((_, time, _): &Self::Item<'_>, upper: &AntichainRef, frontier: &mut Antichain) -> bool { if upper.less_equal(time) { frontier.insert_with(&time, |time| time.clone()); true - } + } else { false } } fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, _stash: &mut Self::DiffOwned) { @@ -526,9 +526,11 @@ pub mod container { pub mod columnation { use timely::progress::{Antichain, frontier::AntichainRef}; - use timely::container::columnation::TimelyStack; - use timely::container::columnation::Columnation; + use columnation::Columnation; + + use crate::containers::TimelyStack; use crate::difference::Semigroup; + use super::{ContainerQueue, MergerChunk}; /// A `Merger` implementation backed by `TimelyStack` containers (columnation). @@ -567,21 +569,21 @@ pub mod container { self.head += 1; &self.list[self.head - 1] } - + fn peek(&self) -> &T { &self.list[self.head] } } - + impl MergerChunk for TimelyStack<(D, T, R)> { type TimeOwned = T; type DiffOwned = R; - + fn time_kept((_, time, _): &Self::Item<'_>, upper: &AntichainRef, frontier: &mut Antichain) -> bool { if upper.less_equal(time) { frontier.insert_with(&time, |time| time.clone()); true - } + } else { false } } fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, stash: &mut Self::DiffOwned) { @@ -605,95 +607,4 @@ pub mod container { } } } - - pub use flat_container::FlatMerger; - /// Implementations of `ContainerQueue` and `MergerChunk` for `FlatStack` containers (flat_container). - /// - /// This is currently non-functional, while we try and sort out some missing constraints that seem to - /// allow the direct implementation to work, but the corresponding implementation here to not compile. - pub mod flat_container { - - use timely::progress::{Antichain, frontier::AntichainRef}; - use timely::container::flatcontainer::{FlatStack, Region}; - use timely::container::flatcontainer::impls::tuple::TupleABCRegion; - use timely::container::flatcontainer::Push; - use crate::difference::{IsZero, Semigroup}; - use super::{ContainerQueue, MergerChunk}; - - /// A `Merger` implementation backed by `FlatStack` containers (flat_container). - pub type FlatMerger = super::ContainerMerger,FlatStackQueue<((K,V), T, R)>>; - - /// A queue implementation over a flat stack. - pub struct FlatStackQueue { - list: FlatStack, - head: usize, - } - - impl ContainerQueue> for FlatStackQueue - where - for<'a> R::ReadItem<'a>: Ord, - { - fn next_or_alloc(&mut self) -> Result, FlatStack> { - if self.is_empty() { - Err(std::mem::take(&mut self.list)) - } - else { - Ok(self.pop()) - } - } - fn is_empty(&self) -> bool { - self.head >= self.list.len() - } - fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering { - self.peek().cmp(&other.peek()) - } - fn from(list: FlatStack) -> Self { - FlatStackQueue { list, head: 0 } - } - } - - impl FlatStackQueue { - - fn pop(&mut self) -> R::ReadItem<'_> { - self.head += 1; - self.list.get(self.head - 1) - } - - fn peek(&self) -> R::ReadItem<'_> { - self.list.get(self.head) - } - } - - impl MergerChunk for FlatStack> - where - D: Region, - for<'a> D::ReadItem<'a>: Ord, - T: Region, - for<'a> T::ReadItem<'a>: Ord, - R: Region, - R::Owned: Default + IsZero + for<'a> Semigroup>, - TupleABCRegion: for<'a,'b> Push<(D::ReadItem<'a>, T::ReadItem<'a>, &'b R::Owned)>, - { - type TimeOwned = T::Owned; - type DiffOwned = R::Owned; - - fn time_kept(_time: &Self::Item<'_>, _upper: &AntichainRef, _frontier: &mut Antichain) -> bool { - unimplemented!() - } - fn push_and_add<'a>(&mut self, _item1: as Region>::ReadItem<'a>, _item2: Self::Item<'a>, _stash: &mut Self::DiffOwned) { - // let (_, _, _) = _item1; - unimplemented!() - } - fn account(&self) -> (usize, usize, usize, usize) { - let (mut size, mut capacity, mut allocations) = (0, 0, 0); - let cb = |siz, cap| { - size += siz; - capacity += cap; - allocations += 1; - }; - self.heap_size(cb); - (self.len(), size, capacity, allocations) - } - } - } } diff --git a/src/trace/implementations/merge_batcher_flat.rs b/src/trace/implementations/merge_batcher_flat.rs deleted file mode 100644 index 505e0d7dc..000000000 --- a/src/trace/implementations/merge_batcher_flat.rs +++ /dev/null @@ -1,291 +0,0 @@ -//! A general purpose `Batcher` implementation for FlatStack. - -use std::cmp::Ordering; -use std::marker::PhantomData; -use timely::progress::frontier::{Antichain, AntichainRef}; -use timely::{Data, PartialOrder}; -use timely::container::flatcontainer::{Push, FlatStack, Region, ReserveItems}; -use timely::container::flatcontainer::impls::tuple::TupleABCRegion; - -use crate::difference::{IsZero, Semigroup}; -use crate::trace::implementations::merge_batcher::Merger; -use crate::trace::cursor::IntoOwned; - -/// A merger for flat stacks. -/// -/// `MC` is a [`Region`] that implements [`MergerChunk`]. -pub struct FlatcontainerMerger { - _marker: PhantomData, -} - -impl Default for FlatcontainerMerger { - fn default() -> Self { - Self { _marker: PhantomData, } - } -} - -impl FlatcontainerMerger { - const BUFFER_SIZE_BYTES: usize = 8 << 10; - fn chunk_capacity(&self) -> usize { - let size = ::std::mem::size_of::(); - if size == 0 { - Self::BUFFER_SIZE_BYTES - } else if size <= Self::BUFFER_SIZE_BYTES { - Self::BUFFER_SIZE_BYTES / size - } else { - 1 - } - } - - /// Helper to get pre-sized vector from the stash. - #[inline] - fn empty(&self, stash: &mut Vec>) -> FlatStack { - stash.pop().unwrap_or_else(|| FlatStack::with_capacity(self.chunk_capacity())) - } - - /// Helper to return a chunk to the stash. - #[inline] - fn recycle(&self, mut chunk: FlatStack, stash: &mut Vec>) { - // TODO: Should we limit the size of `stash`? - if chunk.capacity() == self.chunk_capacity() { - chunk.clear(); - stash.push(chunk); - } - } -} - -/// Behavior to dissect items of chunks in the merge batcher -pub trait MergerChunk: Region { - /// The data portion of the update - type Data<'a>: Ord where Self: 'a; - /// The time of the update - type Time<'a>: Ord where Self: 'a; - /// The owned time type. - type TimeOwned; - /// The diff of the update - type Diff<'a> where Self: 'a; - /// The owned diff type. - type DiffOwned; - - /// Split a read item into its constituents. Must be cheap. - fn into_parts<'a>(item: Self::ReadItem<'a>) -> (Self::Data<'a>, Self::Time<'a>, Self::Diff<'a>); -} - -impl MergerChunk for TupleABCRegion -where - D: Region, - for<'a> D::ReadItem<'a>: Ord, - T: Region, - for<'a> T::ReadItem<'a>: Ord, - R: Region, -{ - type Data<'a> = D::ReadItem<'a> where Self: 'a; - type Time<'a> = T::ReadItem<'a> where Self: 'a; - type TimeOwned = T::Owned; - type Diff<'a> = R::ReadItem<'a> where Self: 'a; - type DiffOwned = R::Owned; - - fn into_parts<'a>((data, time, diff): Self::ReadItem<'a>) -> (Self::Data<'a>, Self::Time<'a>, Self::Diff<'a>) { - (data, time, diff) - } -} - -impl Merger for FlatcontainerMerger -where - for<'a> MC: MergerChunk + Clone + 'static - + ReserveItems<::ReadItem<'a>> - + Push<::ReadItem<'a>> - + Push<(MC::Data<'a>, MC::Time<'a>, &'a MC::DiffOwned)> - + Push<(MC::Data<'a>, MC::Time<'a>, MC::Diff<'a>)>, - for<'a> MC::Time<'a>: PartialOrder + Copy + IntoOwned<'a, Owned=MC::TimeOwned>, - for<'a> MC::Diff<'a>: IntoOwned<'a, Owned = MC::DiffOwned>, - for<'a> MC::TimeOwned: Ord + PartialOrder + PartialOrder> + Data, - for<'a> MC::DiffOwned: Default + Semigroup + Semigroup> + Data, -{ - type Time = MC::TimeOwned; - type Chunk = FlatStack; - - fn merge(&mut self, list1: Vec, list2: Vec, output: &mut Vec, stash: &mut Vec) { - let mut list1 = list1.into_iter(); - let mut list2 = list2.into_iter(); - - let mut head1 = >::from(list1.next().unwrap_or_default()); - let mut head2 = >::from(list2.next().unwrap_or_default()); - - let mut result = self.empty(stash); - - let mut diff = MC::DiffOwned::default(); - - // while we have valid data in each input, merge. - while !head1.is_empty() && !head2.is_empty() { - while (result.capacity() - result.len()) > 0 && !head1.is_empty() && !head2.is_empty() { - let cmp = { - let (data1, time1, _diff) = MC::into_parts(head1.peek()); - let (data2, time2, _diff) = MC::into_parts(head2.peek()); - (data1, time1).cmp(&(data2, time2)) - }; - // TODO: The following less/greater branches could plausibly be a good moment for - // `copy_range`, on account of runs of records that might benefit more from a - // `memcpy`. - match cmp { - Ordering::Less => { - result.copy(head1.pop()); - } - Ordering::Greater => { - result.copy(head2.pop()); - } - Ordering::Equal => { - let (data, time1, diff1) = MC::into_parts(head1.pop()); - let (_data, _time2, diff2) = MC::into_parts(head2.pop()); - diff1.clone_onto(&mut diff); - diff.plus_equals(&diff2); - if !diff.is_zero() { - result.copy((data, time1, &diff)); - } - } - } - } - - if result.capacity() == result.len() { - output.push(result); - result = self.empty(stash); - } - - if head1.is_empty() { - self.recycle(head1.done(), stash); - head1 = FlatStackQueue::from(list1.next().unwrap_or_default()); - } - if head2.is_empty() { - self.recycle(head2.done(), stash); - head2 = FlatStackQueue::from(list2.next().unwrap_or_default()); - } - } - - while !head1.is_empty() { - let advance = result.capacity() - result.len(); - let iter = head1.iter().take(advance); - result.reserve_items(iter.clone()); - for item in iter { - result.copy(item); - } - output.push(result); - head1.advance(advance); - result = self.empty(stash); - } - if !result.is_empty() { - output.push(result); - result = self.empty(stash); - } - output.extend(list1); - self.recycle(head1.done(), stash); - - while !head2.is_empty() { - let advance = result.capacity() - result.len(); - let iter = head2.iter().take(advance); - result.reserve_items(iter.clone()); - for item in iter { - result.copy(item); - } - output.push(result); - head2.advance(advance); - result = self.empty(stash); - } - output.extend(list2); - self.recycle(head2.done(), stash); - } - - fn extract( - &mut self, - merged: Vec, - upper: AntichainRef, - frontier: &mut Antichain, - readied: &mut Vec, - kept: &mut Vec, - stash: &mut Vec, - ) { - let mut keep = self.empty(stash); - let mut ready = self.empty(stash); - - for buffer in merged { - for (data, time, diff) in buffer.iter().map(MC::into_parts) { - if upper.less_equal(&time) { - frontier.insert_with(&time, |time| (*time).into_owned()); - if keep.len() == keep.capacity() && !keep.is_empty() { - kept.push(keep); - keep = self.empty(stash); - } - keep.copy((data, time, diff)); - } else { - if ready.len() == ready.capacity() && !ready.is_empty() { - readied.push(ready); - ready = self.empty(stash); - } - ready.copy((data, time, diff)); - } - } - // Recycling buffer. - self.recycle(buffer, stash); - } - // Finish the kept data. - if !keep.is_empty() { - kept.push(keep); - } - if !ready.is_empty() { - readied.push(ready); - } - } - - fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize) { - let (mut size, mut capacity, mut allocations) = (0, 0, 0); - let cb = |siz, cap| { - size += siz; - capacity += cap; - allocations += 1; - }; - chunk.heap_size(cb); - (chunk.len(), size, capacity, allocations) - } -} - -struct FlatStackQueue { - list: FlatStack, - head: usize, -} - -impl Default for FlatStackQueue { - fn default() -> Self { - Self::from(Default::default()) - } -} - -impl FlatStackQueue { - fn pop(&mut self) -> R::ReadItem<'_> { - self.head += 1; - self.list.get(self.head - 1) - } - - fn peek(&self) -> R::ReadItem<'_> { - self.list.get(self.head) - } - - fn from(list: FlatStack) -> Self { - FlatStackQueue { list, head: 0 } - } - - fn done(self) -> FlatStack { - self.list - } - - fn is_empty(&self) -> bool { - self.head >= self.list.len() - } - - /// Return an iterator over the remaining elements. - fn iter(&self) -> impl Iterator> + Clone { - self.list.iter().skip(self.head) - } - - fn advance(&mut self, consumed: usize) { - self.head += consumed; - } -} diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index 472293447..f62dcf765 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -41,7 +41,6 @@ pub mod spine_fueled; pub mod merge_batcher; -pub mod merge_batcher_flat; pub mod ord_neu; pub mod rhh; pub mod huffman_container; @@ -58,12 +57,13 @@ pub use self::ord_neu::RcOrdKeyBuilder as KeyBuilder; use std::borrow::{ToOwned}; use std::convert::TryInto; +use columnation::Columnation; use serde::{Deserialize, Serialize}; - use timely::Container; -use timely::container::columnation::{Columnation, TimelyStack}; use timely::container::PushInto; use timely::progress::Timestamp; + +use crate::containers::TimelyStack; use crate::lattice::Lattice; use crate::difference::Semigroup; @@ -146,11 +146,6 @@ where type OffsetContainer = OffsetList; } -/// A layout based on flat containers. -pub struct FlatLayout { - phantom: std::marker::PhantomData<(K, V, T, R)>, -} - /// A type with a preferred container. /// /// Examples include types that implement `Clone` who prefer @@ -455,122 +450,16 @@ where } } -mod flatcontainer { - use timely::container::flatcontainer::{FlatStack, IntoOwned, Push, Region}; - use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion}; - use timely::progress::Timestamp; - - use crate::difference::Semigroup; - use crate::lattice::Lattice; - use crate::trace::implementations::{BatchContainer, BuilderInput, FlatLayout, Layout, OffsetList, Update}; - - impl Update for FlatLayout - where - K: Region, - V: Region, - T: Region, - R: Region, - K::Owned: Ord + Clone + 'static, - V::Owned: Ord + Clone + 'static, - T::Owned: Ord + Clone + Lattice + Timestamp + 'static, - R::Owned: Ord + Semigroup + 'static, - { - type Key = K::Owned; - type Val = V::Owned; - type Time = T::Owned; - type Diff = R::Owned; - } - - impl Layout for FlatLayout - where - K: Region + Push<::Owned> + for<'a> Push<::ReadItem<'a>> + 'static, - V: Region + Push<::Owned> + for<'a> Push<::ReadItem<'a>> + 'static, - T: Region + Push<::Owned> + for<'a> Push<::ReadItem<'a>> + 'static, - R: Region + Push<::Owned> + for<'a> Push<::ReadItem<'a>> + 'static, - K::Owned: Ord + Clone + 'static, - V::Owned: Ord + Clone + 'static, - T::Owned: Ord + Clone + Lattice + Timestamp + 'static, - R::Owned: Ord + Semigroup + 'static, - for<'a> K::ReadItem<'a>: Copy + Ord, - for<'a> V::ReadItem<'a>: Copy + Ord, - for<'a> T::ReadItem<'a>: Copy + Ord, - for<'a> R::ReadItem<'a>: Copy + Ord, - { - type Target = Self; - type KeyContainer = FlatStack; - type ValContainer = FlatStack; - type TimeContainer = FlatStack; - type DiffContainer = FlatStack; - type OffsetContainer = OffsetList; - } - - impl BuilderInput for FlatStack,T,R>> - where - K: Region + Clone + 'static, - V: Region + Clone + 'static, - T: Region + Clone + 'static, - R: Region + Clone + 'static, - for<'a> K::ReadItem<'a>: Copy + Ord, - for<'a> V::ReadItem<'a>: Copy + Ord, - for<'a> T::ReadItem<'a>: Copy + Ord, - for<'a> R::ReadItem<'a>: Copy + Ord, - KBC: BatchContainer, - VBC: BatchContainer, - for<'a> KBC::ReadItem<'a>: PartialEq>, - for<'a> VBC::ReadItem<'a>: PartialEq>, - { - type Key<'a> = K::ReadItem<'a>; - type Val<'a> = V::ReadItem<'a>; - type Time = T::Owned; - type Diff = R::Owned; - - fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) { - (key, val, time.into_owned(), diff.into_owned()) - } - - fn key_eq(this: &Self::Key<'_>, other: KBC::ReadItem<'_>) -> bool { - KBC::reborrow(other) == K::reborrow(*this) - } - - fn val_eq(this: &Self::Val<'_>, other: VBC::ReadItem<'_>) -> bool { - VBC::reborrow(other) == V::reborrow(*this) - } - - fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) { - let mut keys = 0; - let mut vals = 0; - let mut upds = 0; - let mut prev_keyval = None; - for link in chain.iter() { - for ((key, val), _, _) in link.iter() { - if let Some((p_key, p_val)) = prev_keyval { - if p_key != key { - keys += 1; - vals += 1; - } else if p_val != val { - vals += 1; - } - } else { - keys += 1; - vals += 1; - } - upds += 1; - prev_keyval = Some((key, val)); - } - } - (keys, vals, upds) - } - } -} - pub use self::containers::{BatchContainer, SliceContainer}; /// Containers for data that resemble `Vec`, with leaner implementations. pub mod containers { - use timely::container::columnation::{Columnation, TimelyStack}; + use columnation::Columnation; use timely::container::PushInto; - use crate::trace::IntoOwned; + + use crate::containers::TimelyStack; + use crate::IntoOwned; /// A general-purpose container resembling `Vec`. pub trait BatchContainer: for<'a> PushInto> + 'static { @@ -699,40 +588,6 @@ pub mod containers { } } - mod flatcontainer { - use timely::container::flatcontainer::{FlatStack, Push, Region}; - use crate::trace::implementations::BatchContainer; - - impl BatchContainer for FlatStack - where - for<'a> R: Region + Push<::ReadItem<'a>> + 'static, - for<'a> R::ReadItem<'a>: Copy + Ord, - { - type Owned = R::Owned; - type ReadItem<'a> = R::ReadItem<'a>; - - fn with_capacity(size: usize) -> Self { - Self::with_capacity(size) - } - - fn merge_capacity(cont1: &Self, cont2: &Self) -> Self { - Self::merge_capacity([cont1, cont2].into_iter()) - } - - fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { - R::reborrow(item) - } - - fn index(&self, index: usize) -> Self::ReadItem<'_> { - self.get(index) - } - - fn len(&self) -> usize { - self.len() - } - } - } - /// A container that accepts slices `[B::Item]`. pub struct SliceContainer { /// Offsets that bound each contained slice. diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index df4e9d68a..09380d27f 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -9,16 +9,14 @@ //! and should consume fewer resources (computation and memory) when it applies. use std::rc::Rc; -use timely::container::columnation::{TimelyStack}; -use timely::container::flatcontainer::{FlatStack, RegionPreference}; -use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion}; -use crate::trace::implementations::chunker::{ColumnationChunker, ContainerChunker, VecChunker}; + +use crate::containers::TimelyStack; +use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker}; use crate::trace::implementations::spine_fueled::Spine; use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger}; -use crate::trace::implementations::merge_batcher_flat::FlatcontainerMerger; use crate::trace::rc_blanket_impls::RcBuilder; -use super::{Update, Layout, Vector, TStack, Preferred, FlatLayout}; +use super::{Update, Layout, Vector, TStack, Preferred}; pub use self::val_batch::{OrdValBatch, OrdValBuilder}; pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder}; @@ -40,24 +38,6 @@ pub type ColValBatcher = MergeBatcher, ColumnationC /// A builder for columnar storage. pub type ColValBuilder = RcBuilder, TimelyStack<((K,V),T,R)>>>; -/// A trace implementation backed by flatcontainer storage. -pub type FlatValSpine = Spine>>; -/// A batcher for flatcontainer storage. -pub type FlatValBatcher = MergeBatcher>, FlatcontainerMerger>; -/// A builder for flatcontainer storage. -pub type FlatValBuilder = RcBuilder>>; - - -/// A trace implementation backed by flatcontainer storage, using [`FlatLayout`] as the layout. -pub type FlatValSpineDefault = FlatValSpine< - FlatLayout<::Region, ::Region, ::Region, ::Region>, ->; -/// A batcher for flatcontainer storage, using [`FlatLayout`] as the layout. -pub type FlatValBatcherDefault = FlatValBatcher::Region, ::Region>, ::Region, ::Region>, C>; -/// A builder for flatcontainer storage, using [`FlatLayout`] as the layout. -pub type FlatValBuilderDefault = FlatValBuilder::Region, ::Region, ::Region, ::Region>, TupleABCRegion::Region, ::Region>, ::Region, ::Region>>; - - /// A trace implementation using a spine of ordered lists. pub type OrdKeySpine = Spine>>>; /// A batcher for ordered lists. @@ -75,22 +55,6 @@ pub type ColKeyBatcher = MergeBatcher, ColumnationChu /// A builder for columnar storage pub type ColKeyBuilder = RcBuilder, TimelyStack<((K,()),T,R)>>>; -/// A trace implementation backed by flatcontainer storage. -pub type FlatKeySpine = Spine>>; -/// A batcher for flatcontainer storage. -pub type FlatKeyBatcher = MergeBatcher>, FlatcontainerMerger>; -/// A builder for flatcontainer storage. -pub type FlatKeyBuilder = RcBuilder>>; - -/// A trace implementation backed by flatcontainer storage, using [`FlatLayout`] as the layout. -pub type FlatKeySpineDefault = FlatKeySpine< - FlatLayout<::Region, <() as RegionPreference>::Region, ::Region, ::Region>, ->; -/// A batcher for flatcontainer storage, using [`FlatLayout`] as the layout. -pub type FlatKeyBatcherDefault = FlatValBatcher::Region, <() as RegionPreference>::Region>, ::Region, ::Region>, C>; -/// A builder for flatcontainer storage, using [`FlatLayout`] as the layout. -pub type FlatKeyBuilderDefault = FlatKeyBuilder::Region, <() as RegionPreference>::Region, ::Region, ::Region>, TupleABCRegion::Region, <() as RegionPreference>::Region>, ::Region, ::Region>>; - /// A trace implementation backed by columnar storage. pub type PreferredSpine = Spine>>>; /// A batcher for columnar storage. @@ -111,7 +75,7 @@ mod val_batch { use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; use crate::trace::implementations::{BatchContainer, BuilderInput}; - use crate::trace::cursor::IntoOwned; + use crate::IntoOwned; use super::{Layout, Update}; @@ -714,7 +678,7 @@ mod key_batch { use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; use crate::trace::implementations::{BatchContainer, BuilderInput}; - use crate::trace::cursor::IntoOwned; + use crate::IntoOwned; use super::{Layout, Update}; diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index d2160b0b6..d1197bf90 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -9,9 +9,9 @@ use std::rc::Rc; use std::cmp::Ordering; use serde::{Deserialize, Serialize}; -use timely::container::columnation::TimelyStack; use crate::Hashable; +use crate::containers::TimelyStack; use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker}; use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger}; use crate::trace::implementations::spine_fueled::Spine; @@ -94,7 +94,7 @@ mod val_batch { use crate::hashable::Hashable; use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; use crate::trace::implementations::{BatchContainer, BuilderInput}; - use crate::trace::cursor::IntoOwned; + use crate::IntoOwned; use super::{Layout, Update, HashOrdered}; diff --git a/src/trace/mod.rs b/src/trace/mod.rs index e8c28a891..c736518c4 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -16,10 +16,9 @@ use timely::progress::{Antichain, frontier::AntichainRef}; use timely::progress::Timestamp; use crate::logging::Logger; -use crate::trace::cursor::IntoOwned; use crate::difference::Semigroup; +use crate::IntoOwned; use crate::lattice::Lattice; -// use ::difference::Semigroup; pub use self::cursor::Cursor; pub use self::description::Description; diff --git a/src/trace/wrappers/enter.rs b/src/trace/wrappers/enter.rs index 4f8280c70..a4e0b5ca3 100644 --- a/src/trace/wrappers/enter.rs +++ b/src/trace/wrappers/enter.rs @@ -186,7 +186,7 @@ where #[inline] fn map_times)>(&mut self, storage: &Self::Storage, mut logic: L) { - use crate::trace::cursor::IntoOwned; + use crate::IntoOwned; self.cursor.map_times(storage, |time, diff| { logic(&TInner::to_inner(time.into_owned()), diff) }) @@ -240,7 +240,7 @@ where #[inline] fn map_times)>(&mut self, storage: &Self::Storage, mut logic: L) { - use crate::trace::cursor::IntoOwned; + use crate::IntoOwned; self.cursor.map_times(&storage.batch, |time, diff| { logic(&TInner::to_inner(time.into_owned()), diff) }) diff --git a/src/trace/wrappers/freeze.rs b/src/trace/wrappers/freeze.rs index 201d8dc92..6a39fa0b0 100644 --- a/src/trace/wrappers/freeze.rs +++ b/src/trace/wrappers/freeze.rs @@ -26,7 +26,7 @@ use timely::progress::frontier::AntichainRef; use crate::operators::arrange::Arranged; use crate::trace::{TraceReader, BatchReader, Description}; use crate::trace::cursor::Cursor; -use crate::trace::cursor::IntoOwned; +use crate::IntoOwned; /// Freezes updates to an arrangement using a supplied function. /// diff --git a/src/trace/wrappers/frontier.rs b/src/trace/wrappers/frontier.rs index 8379b8bf6..34f4da143 100644 --- a/src/trace/wrappers/frontier.rs +++ b/src/trace/wrappers/frontier.rs @@ -149,7 +149,7 @@ impl Cursor for CursorFrontier { let until = self.until.borrow(); let mut temp: C::Time = ::minimum(); self.cursor.map_times(storage, |time, diff| { - use crate::trace::cursor::IntoOwned; + use crate::IntoOwned; time.clone_onto(&mut temp); temp.advance_by(since); if !until.less_equal(&temp) { @@ -212,7 +212,7 @@ where let until = self.until.borrow(); let mut temp: C::Time = ::minimum(); self.cursor.map_times(&storage.batch, |time, diff| { - use crate::trace::cursor::IntoOwned; + use crate::IntoOwned; time.clone_onto(&mut temp); temp.advance_by(since); if !until.less_equal(&temp) { diff --git a/tests/bfs.rs b/tests/bfs.rs index cc5d8f619..70d47509d 100644 --- a/tests/bfs.rs +++ b/tests/bfs.rs @@ -1,23 +1,18 @@ -use std::fmt::Debug; use rand::{Rng, SeedableRng, StdRng}; use std::sync::{Arc, Mutex}; -use timely::{Config, PartialOrder}; -use timely::container::flatcontainer::{MirrorRegion, Push, Region, RegionPreference, ReserveItems}; +use timely::Config; use timely::dataflow::*; use timely::dataflow::operators::Capture; use timely::dataflow::operators::capture::Extract; -use timely::order::{FlatProductRegion, Product}; use differential_dataflow::input::Input; use differential_dataflow::Collection; use differential_dataflow::operators::*; use differential_dataflow::lattice::Lattice; -use differential_dataflow::operators::arrange::Arrange; -use differential_dataflow::trace::implementations::ord_neu::{FlatKeyBatcherDefault, FlatKeyBuilderDefault, FlatKeySpineDefault, FlatValBatcherDefault, FlatValBuilderDefault, FlatValSpineDefault}; type Node = usize; type Edge = (Node, Node); @@ -47,7 +42,6 @@ fn test_sizes(nodes: usize, edges: usize, rounds: usize, threads: usize) { let mut results = [ bfs_sequential(root_list.clone(), edge_list.clone()), bfs_differential(root_list.clone(), edge_list.clone(), Config::process(threads)), - bfs_differential_flat(root_list.clone(), edge_list.clone(), Config::process(threads)), ]; for results in results.iter_mut() { @@ -225,105 +219,3 @@ where G::Timestamp: Lattice+Ord { .reduce(|_, s, t| t.push((*s[0].0, 1))) }) } - -fn bfs_differential_flat( - roots_list: Vec<(usize, usize, isize)>, - edges_list: Vec<((usize, usize), usize, isize)>, - config: Config, -) -> Vec<((usize, usize), usize, isize)> { - let (send, recv) = std::sync::mpsc::channel(); - let send = Arc::new(Mutex::new(send)); - - timely::execute(config, move |worker| { - let mut roots_list = roots_list.clone(); - let mut edges_list = edges_list.clone(); - - // define BFS dataflow; return handles to roots and edges inputs - let (mut roots, mut edges) = worker.dataflow(|scope| { - let send = send.lock().unwrap().clone(); - - let (root_input, roots) = scope.new_collection(); - let (edge_input, edges) = scope.new_collection(); - - let c = bfs_flat(&edges, &roots).map(|(_, dist)| (dist, ())); - let arranged = c.arrange::>, FlatKeyBuilderDefault, FlatKeySpineDefault>(); - type Bu = FlatValBuilderDefault; - type T2 = FlatValSpineDefault; - let reduced = arranged.reduce_abelian::<_, _, _, Bu, T2>("Count", |_k, s, t| { - t.push((s[0].1.clone(), isize::from(1i8))) - }); - reduced - .as_collection(|k, c| (k, c as usize)) - .inner - .capture_into(send); - - (root_input, edge_input) - }); - - // sort by decreasing insertion time. - roots_list.sort_by(|x, y| y.1.cmp(&x.1)); - edges_list.sort_by(|x, y| y.1.cmp(&x.1)); - - let mut round = 0; - while roots_list.len() > 0 || edges_list.len() > 0 { - while roots_list.last().map(|x| x.1) == Some(round) { - let (node, _time, diff) = roots_list.pop().unwrap(); - roots.update(node, diff); - } - while edges_list.last().map(|x| x.1) == Some(round) { - let ((src, dst), _time, diff) = edges_list.pop().unwrap(); - edges.update((src, dst), diff); - } - - round += 1; - roots.advance_to(round); - edges.advance_to(round); - } - }) - .unwrap(); - - recv.extract() - .into_iter() - .flat_map(|(_, list)| { - list.into_iter() - .map(|((dst, cnt), time, diff)| ((dst, cnt), time, diff)) - }) - .collect() -} - -// returns pairs (n, s) indicating node n can be reached from a root in s steps. -fn bfs_flat( - edges: &Collection, - roots: &Collection, -) -> Collection -where - G::Timestamp: Lattice + Ord + RegionPreference, - for<'a> G::Timestamp: PartialOrder<<::Region as Region>::ReadItem<'a>>, - ::Region: Region + Push, - for<'a> as RegionPreference>::Region: Region> + Push<< as RegionPreference>::Region as Region>::ReadItem<'a>>, - ::Region: Clone + Ord, - for<'a> FlatProductRegion<::Region, MirrorRegion>: Push<&'a Product>, - for<'a> ::Region, MirrorRegion> as Region>::ReadItem<'a>: Copy + Ord + Debug, - Product: for<'a> PartialOrder<< as RegionPreference>::Region as Region>::ReadItem<'a>>, - for<'a> < as RegionPreference>::Region as Region>::ReadItem<'a>: PartialOrder>, - for<'a> as RegionPreference>::Region: ReserveItems<< as RegionPreference>::Region as Region>::ReadItem<'a>>, -{ - // initialize roots as reaching themselves at distance 0 - let nodes = roots.map(|x| (x, 0)); - - // repeatedly update minimal distances each node can be reached from each root - nodes.iterate(|inner| { - let edges = edges.enter(&inner.scope()); - let nodes = nodes.enter(&inner.scope()); - - type Batcher = FlatValBatcherDefault>; - type Builder = FlatValBuilderDefault; - type Spine = FlatValSpineDefault; - let arranged1 = inner.arrange::>, Builder>, Spine>>(); - let arranged2 = edges.arrange::>, Builder>, Spine>>(); - arranged1 - .join_core(&arranged2, move |_k, l, d| Some((d, l + 1))) - .concat(&nodes) - .reduce(|_, s, t| t.push((*s[0].0, 1))) - }) -}