From 0600f1676fd084f48242875d633a134dc21dec7a Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 20 Feb 2025 09:29:10 +0100 Subject: [PATCH] Separate containers crate Signed-off-by: Moritz Hoffmann --- Cargo.toml | 8 +- containers/Cargo.toml | 14 ++ .../examples/columnar.rs | 21 +-- .../examples/spines.rs | 6 +- .../src/columnation.rs | 79 +++++++++- containers/src/columnation/chunker.rs | 120 +++++++++++++++ containers/src/columnation/merge_batcher.rs | 138 +++++++++++++++++ .../src}/huffman_container.rs | 23 +-- containers/src/lib.rs | 60 ++++++++ .../implementations => containers/src}/rhh.rs | 43 ++---- differential-dataflow/Cargo.toml | 3 +- differential-dataflow/src/lib.rs | 1 - .../src/trace/implementations/chunker.rs | 113 -------------- .../trace/implementations/merge_batcher.rs | 89 +---------- .../src/trace/implementations/mod.rs | 140 ------------------ .../src/trace/implementations/ord_neu.rs | 38 +---- 16 files changed, 452 insertions(+), 444 deletions(-) create mode 100644 containers/Cargo.toml rename {differential-dataflow => containers}/examples/columnar.rs (97%) rename {differential-dataflow => containers}/examples/spines.rs (92%) rename differential-dataflow/src/containers.rs => containers/src/columnation.rs (77%) create mode 100644 containers/src/columnation/chunker.rs create mode 100644 containers/src/columnation/merge_batcher.rs rename {differential-dataflow/src/trace/implementations => containers/src}/huffman_container.rs (98%) create mode 100644 containers/src/lib.rs rename {differential-dataflow/src/trace/implementations => containers/src}/rhh.rs (97%) diff --git a/Cargo.toml b/Cargo.toml index f455929c6..ee5c1a035 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,8 +1,10 @@ [workspace] members = [ - "differential-dataflow", # "advent_of_code_2017", + "containers", + "differential-dataflow", "dogsdogsdogs", + "doop", "experiments", "interactive", "server", @@ -10,12 +12,12 @@ members = [ "server/dataflows/neighborhood", "server/dataflows/random_graph", "server/dataflows/reachability", - #"tpchlike", - "doop" + # "tpchlike", ] resolver = "2" [workspace.dependencies] +columnar = { version = "0.4.1", default-features = false } differential-dataflow = { path = "differential-dataflow", default-features = false, version = "0.15.2" } timely = { version = "0.21", default-features = false } #timely = { path = "../timely-dataflow/timely/", default-features = false } diff --git a/containers/Cargo.toml b/containers/Cargo.toml new file mode 100644 index 000000000..afcfc27d3 --- /dev/null +++ b/containers/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "differential-containers" +version = "0.1.0" +edition = "2021" + +[dependencies] +columnation = "0.1.0" +differential-dataflow = { workspace = true } +serde = { version = "1.0", features = ["derive"] } +timely = { workspace = true } + +[dev-dependencies] +bytemuck = { default-features = false, version = "1.21.0" } +columnar = { workspace = true } diff --git a/differential-dataflow/examples/columnar.rs b/containers/examples/columnar.rs similarity index 97% rename from differential-dataflow/examples/columnar.rs rename to containers/examples/columnar.rs index 3ab037666..2cc7d15de 100644 --- a/differential-dataflow/examples/columnar.rs +++ b/containers/examples/columnar.rs @@ -1,17 +1,12 @@ //! Wordcount based on `columnar`. -use { - timely::container::{Container, CapacityContainerBuilder}, - timely::dataflow::channels::pact::ExchangeCore, - timely::dataflow::InputHandleCore, - timely::dataflow::ProbeHandle, -}; - - -use differential_dataflow::trace::implementations::ord_neu::ColKeyBuilder; -use differential_dataflow::trace::implementations::ord_neu::ColKeySpine; - +use differential_containers::columnation::{ColKeyBuilder, ColKeySpine, ColMerger, TimelyStack}; use differential_dataflow::operators::arrange::arrangement::arrange_core; +use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher; +use timely::container::{Container, CapacityContainerBuilder}; +use timely::dataflow::InputHandleCore; +use timely::dataflow::ProbeHandle; +use timely::dataflow::channels::pact::ExchangeCore; fn main() { @@ -346,10 +341,6 @@ mod builder { } -use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher; -use differential_dataflow::trace::implementations::merge_batcher::ColMerger; -use differential_dataflow::containers::TimelyStack; - /// A batcher for columnar storage. pub type Col2ValBatcher = MergeBatcher, batcher::Chunker>, ColMerger<(K,V),T,R>>; pub type Col2KeyBatcher = Col2ValBatcher; diff --git a/differential-dataflow/examples/spines.rs b/containers/examples/spines.rs similarity index 92% rename from differential-dataflow/examples/spines.rs rename to containers/examples/spines.rs index 9d5a82019..a1dee0851 100644 --- a/differential-dataflow/examples/spines.rs +++ b/containers/examples/spines.rs @@ -28,7 +28,7 @@ fn main() { match mode.as_str() { "new" => { - use differential_dataflow::trace::implementations::ord_neu::{ColKeyBatcher, ColKeyBuilder, ColKeySpine}; + use differential_containers::columnation::{ColKeyBatcher, ColKeyBuilder, ColKeySpine}; let data = data.arrange::, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(); let keys = keys.arrange::, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(); keys.join_core(&data, |_k, &(), &()| Option::<()>::None) @@ -42,7 +42,7 @@ fn main() { .probe_with(&mut probe); }, "rhh" => { - use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecBatcher, VecBuilder, VecSpine}; + use differential_containers::rhh::{HashWrapper, VecBatcher, VecBuilder, VecSpine}; let data = data.map(|x| HashWrapper { inner: x }).arrange::, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>(); let keys = keys.map(|x| HashWrapper { inner: x }).arrange::, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>(); keys.join_core(&data, |_k, &(), &()| Option::<()>::None) @@ -50,7 +50,7 @@ fn main() { }, "slc" => { - use differential_dataflow::trace::implementations::ord_neu::{PreferredBatcher, PreferredBuilder, PreferredSpine}; + use differential_containers::{PreferredBatcher, PreferredBuilder, PreferredSpine}; let data = data.map(|x| (x.clone().into_bytes(), x.into_bytes())) diff --git a/differential-dataflow/src/containers.rs b/containers/src/columnation.rs similarity index 77% rename from differential-dataflow/src/containers.rs rename to containers/src/columnation.rs index 84d10017f..e810f1f57 100644 --- a/differential-dataflow/src/containers.rs +++ b/containers/src/columnation.rs @@ -1,10 +1,83 @@ //! A columnar container based on the columnation library. use std::iter::FromIterator; - -pub use columnation::*; +use std::rc::Rc; + +use columnation::{Columnation, Region}; +use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher; +use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatch, OrdKeyBuilder, OrdValBatch, OrdValBuilder}; +use differential_dataflow::trace::implementations::spine_fueled::Spine; +use differential_dataflow::trace::implementations::{BatchContainer, Layout, OffsetList, Update}; +use differential_dataflow::trace::rc_blanket_impls::RcBuilder; use timely::container::PushInto; +mod merge_batcher; +mod chunker; + +pub use merge_batcher::ColMerger; +pub use chunker::ColumnationChunker; + +/// A layout based on timely stacks +pub struct TStack { + phantom: std::marker::PhantomData, +} + +impl Layout for TStack +where + U::Key: Columnation, + U::Val: Columnation, + U::Time: Columnation, + U::Diff: Columnation + Ord, +{ + type Target = U; + type KeyContainer = TimelyStack; + type ValContainer = TimelyStack; + type TimeContainer = TimelyStack; + type DiffContainer = TimelyStack; + type OffsetContainer = OffsetList; +} + + +/// A trace implementation backed by columnar storage. +pub type ColValSpine = Spine>>>; +/// A batcher for columnar storage. +pub type ColValBatcher = MergeBatcher, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>; +/// A builder for columnar storage. +pub type ColValBuilder = RcBuilder, TimelyStack<((K,V),T,R)>>>; + + +/// A trace implementation backed by columnar storage. +pub type ColKeySpine = Spine>>>; +/// A batcher for columnar storage +pub type ColKeyBatcher = MergeBatcher, ColumnationChunker<((K,()),T,R)>, ColMerger<(K,()),T,R>>; +/// A builder for columnar storage +pub type ColKeyBuilder = RcBuilder, TimelyStack<((K,()),T,R)>>>; + +// The `ToOwned` requirement exists to satisfy `self.reserve_items`, who must for now +// be presented with the actual contained type, rather than a type that borrows into it. +impl BatchContainer for TimelyStack { + type Owned = T; + type ReadItem<'a> = &'a T; + + fn with_capacity(size: usize) -> Self { + Self::with_capacity(size) + } + + fn merge_capacity(cont1: &Self, cont2: &Self) -> Self { + let mut new = Self::default(); + new.reserve_regions(std::iter::once(cont1).chain(std::iter::once(cont2))); + new + } + fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item } + fn index(&self, index: usize) -> Self::ReadItem<'_> { + &self[index] + } + fn len(&self) -> usize { + self[..].len() + } +} + + /// An append-only vector that store records as columns. /// /// This container maintains elements that might conventionally own @@ -274,7 +347,7 @@ mod container { use timely::Container; use timely::container::SizableContainer; - use crate::containers::TimelyStack; + use crate::columnation::TimelyStack; impl Container for TimelyStack { type ItemRef<'a> = &'a T where Self: 'a; diff --git a/containers/src/columnation/chunker.rs b/containers/src/columnation/chunker.rs new file mode 100644 index 000000000..97a45b42e --- /dev/null +++ b/containers/src/columnation/chunker.rs @@ -0,0 +1,120 @@ +use std::collections::VecDeque; +use columnation::Columnation; +use timely::container::{ContainerBuilder, PushInto}; +use differential_dataflow::consolidation::consolidate_updates; +use differential_dataflow::difference::Semigroup; + +use crate::columnation::TimelyStack; + +/// Chunk a stream of vectors into chains of vectors. +pub struct ColumnationChunker { + pending: Vec, + ready: VecDeque>, + empty: Option>, +} + + +impl Default for ColumnationChunker { + fn default() -> Self { + Self { + pending: Vec::default(), + ready: VecDeque::default(), + empty: None, + } + } +} + +impl ColumnationChunker<(D, T, R)> +where + D: Columnation + Ord, + T: Columnation + Ord, + R: Columnation + Semigroup, +{ + const BUFFER_SIZE_BYTES: usize = 64 << 10; + fn chunk_capacity() -> usize { + let size = ::std::mem::size_of::<(D, T, R)>(); + if size == 0 { + Self::BUFFER_SIZE_BYTES + } else if size <= Self::BUFFER_SIZE_BYTES { + Self::BUFFER_SIZE_BYTES / size + } else { + 1 + } + } + + /// Form chunks out of pending data, if needed. This function is meant to be applied to + /// potentially full buffers, and ensures that if the buffer was full when called it is at most + /// half full when the function returns. + /// + /// `form_chunk` does the following: + /// * If pending is full, consolidate. + /// * If after consolidation it's more than half full, peel off chunks, + /// leaving behind any partial chunk in pending. + fn form_chunk(&mut self) { + consolidate_updates(&mut self.pending); + if self.pending.len() >= Self::chunk_capacity() { + while self.pending.len() > Self::chunk_capacity() { + let mut chunk = TimelyStack::with_capacity(Self::chunk_capacity()); + for item in self.pending.drain(..chunk.capacity()) { + chunk.copy(&item); + } + self.ready.push_back(chunk); + } + } + } +} + +impl<'a, D, T, R> PushInto<&'a mut Vec<(D, T, R)>> for ColumnationChunker<(D, T, R)> +where + D: Columnation + Ord + Clone, + T: Columnation + Ord + Clone, + R: Columnation + Semigroup + Clone, +{ + fn push_into(&mut self, container: &'a mut Vec<(D, T, R)>) { + // Ensure `self.pending` has the desired capacity. We should never have a larger capacity + // because we don't write more than capacity elements into the buffer. + if self.pending.capacity() < Self::chunk_capacity() * 2 { + self.pending.reserve(Self::chunk_capacity() * 2 - self.pending.len()); + } + + let mut drain = container.drain(..).peekable(); + while drain.peek().is_some() { + self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len())); + if self.pending.len() == self.pending.capacity() { + self.form_chunk(); + } + } + } +} + +impl ContainerBuilder for ColumnationChunker<(D, T, R)> +where + D: Columnation + Ord + Clone + 'static, + T: Columnation + Ord + Clone + 'static, + R: Columnation + Semigroup + Clone + 'static, +{ + type Container = TimelyStack<(D,T,R)>; + + fn extract(&mut self) -> Option<&mut Self::Container> { + if let Some(ready) = self.ready.pop_front() { + self.empty = Some(ready); + self.empty.as_mut() + } else { + None + } + } + + fn finish(&mut self) -> Option<&mut Self::Container> { + consolidate_updates(&mut self.pending); + while !self.pending.is_empty() { + let mut chunk = TimelyStack::with_capacity(Self::chunk_capacity()); + for item in self.pending.drain(..std::cmp::min(self.pending.len(), chunk.capacity())) { + chunk.copy(&item); + } + self.ready.push_back(chunk); + } + self.empty = self.ready.pop_front(); + self.empty.as_mut() + } +} + diff --git a/containers/src/columnation/merge_batcher.rs b/containers/src/columnation/merge_batcher.rs new file mode 100644 index 000000000..5f4716109 --- /dev/null +++ b/containers/src/columnation/merge_batcher.rs @@ -0,0 +1,138 @@ +//! Implementations of `ContainerQueue` and `MergerChunk` for `TimelyStack` containers (columnation). + +use timely::progress::{Antichain, frontier::AntichainRef, Timestamp}; +use columnation::Columnation; +use differential_dataflow::difference::Semigroup; +use differential_dataflow::lattice::Lattice; +use differential_dataflow::trace::implementations::{BatchContainer, BuilderInput}; +use differential_dataflow::trace::implementations::merge_batcher::container::{ContainerMerger, ContainerQueue, MergerChunk}; + +use crate::columnation::TimelyStack; + +/// A `Merger` implementation backed by `TimelyStack` containers (columnation). +pub type ColMerger = ContainerMerger,TimelyStackQueue<(D, T, R)>>; + +/// TODO +pub struct TimelyStackQueue { + list: TimelyStack, + head: usize, +} + +impl ContainerQueue> for TimelyStackQueue<(D, T, R)> { + fn next_or_alloc(&mut self) -> Result<&(D, T, R), TimelyStack<(D, T, R)>> { + if self.is_empty() { + Err(std::mem::take(&mut self.list)) + } + else { + Ok(self.pop()) + } + } + fn is_empty(&self) -> bool { + self.head == self.list[..].len() + } + fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering { + let (data1, time1, _) = self.peek(); + let (data2, time2, _) = other.peek(); + (data1, time1).cmp(&(data2, time2)) + } + fn from(list: TimelyStack<(D, T, R)>) -> Self { + TimelyStackQueue { list, head: 0 } + } +} + +impl TimelyStackQueue { + fn pop(&mut self) -> &T { + self.head += 1; + &self.list[self.head - 1] + } + + fn peek(&self) -> &T { + &self.list[self.head] + } +} + +impl MergerChunk for TimelyStack<(D, T, R)> { + type TimeOwned = T; + type DiffOwned = R; + + fn time_kept((_, time, _): &Self::Item<'_>, upper: &AntichainRef, frontier: &mut Antichain) -> bool { + if upper.less_equal(time) { + frontier.insert_with(&time, |time| time.clone()); + true + } + else { false } + } + fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, stash: &mut Self::DiffOwned) { + let (data, time, diff1) = item1; + let (_data, _time, diff2) = item2; + stash.clone_from(diff1); + stash.plus_equals(&diff2); + if !stash.is_zero() { + self.copy_destructured(data, time, stash); + } + } + fn account(&self) -> (usize, usize, usize, usize) { + let (mut size, mut capacity, mut allocations) = (0, 0, 0); + let cb = |siz, cap| { + size += siz; + capacity += cap; + allocations += 1; + }; + self.heap_size(cb); + (self.len(), size, capacity, allocations) + } +} + +impl BuilderInput for TimelyStack<((K::Owned, V::Owned), T, R)> +where + K: BatchContainer, + for<'a> K::ReadItem<'a>: PartialEq<&'a K::Owned>, + K::Owned: Ord + Columnation + Clone + 'static, + V: BatchContainer, + for<'a> V::ReadItem<'a>: PartialEq<&'a V::Owned>, + V::Owned: Ord + Columnation + Clone + 'static, + T: Timestamp + Lattice + Columnation + Clone + 'static, + R: Ord + Clone + Semigroup + Columnation + 'static, +{ + type Key<'a> = &'a K::Owned; + type Val<'a> = &'a V::Owned; + type Time = T; + type Diff = R; + + fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) { + (key, val, time.clone(), diff.clone()) + } + + fn key_eq(this: &&K::Owned, other: K::ReadItem<'_>) -> bool { + K::reborrow(other) == *this + } + + fn val_eq(this: &&V::Owned, other: V::ReadItem<'_>) -> bool { + V::reborrow(other) == *this + } + + fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) { + let mut keys = 0; + let mut vals = 0; + let mut upds = 0; + let mut prev_keyval = None; + for link in chain.iter() { + for ((key, val), _, _) in link.iter() { + if let Some((p_key, p_val)) = prev_keyval { + if p_key != key { + keys += 1; + vals += 1; + } else if p_val != val { + vals += 1; + } + } else { + keys += 1; + vals += 1; + } + upds += 1; + prev_keyval = Some((key, val)); + } + } + (keys, vals, upds) + } +} diff --git a/differential-dataflow/src/trace/implementations/huffman_container.rs b/containers/src/huffman_container.rs similarity index 98% rename from differential-dataflow/src/trace/implementations/huffman_container.rs rename to containers/src/huffman_container.rs index cd8bbbb09..dffe8a18b 100644 --- a/differential-dataflow/src/trace/implementations/huffman_container.rs +++ b/containers/src/huffman_container.rs @@ -3,7 +3,7 @@ use std::collections::BTreeMap; use timely::container::PushInto; -use crate::trace::implementations::{BatchContainer, OffsetList}; +use differential_dataflow::trace::implementations::{BatchContainer, OffsetList}; use self::wrapper::Wrapped; use self::encoded::Encoded; @@ -41,8 +41,8 @@ impl PushInto> for HuffmanContainer { bytes.extend(huffman.encode(item.iter())); self.offsets.push(bytes.len()); }, - Err(raw) => { - raw.extend(item); + Err(raw) => { + raw.extend(item); self.offsets.push(raw.len()); } } @@ -150,7 +150,8 @@ impl Default for HuffmanContainer { mod wrapper { - use crate::IntoOwned; + use differential_dataflow::IntoOwned; + use super::Encoded; pub struct Wrapped<'a, B: Ord> { @@ -257,7 +258,7 @@ mod huffman { use std::collections::BTreeMap; use std::convert::TryInto; - + use self::decoder::Decoder; use self::encoder::Encoder; @@ -284,7 +285,7 @@ mod huffman { } /// Decodes the provided bytes as a sequence of symbols. - pub fn decode(&self, bytes: I) -> Decoder<'_, T, I::IntoIter> + pub fn decode(&self, bytes: I) -> Decoder<'_, T, I::IntoIter> where I: IntoIterator { @@ -320,7 +321,7 @@ mod huffman { while let Some((node, level)) = todo.pop() { match node { Node::Leaf(sym) => { levels.push((level, sym)); }, - Node::Fork(l,r) => { + Node::Fork(l,r) => { todo.push((&tree[*l], level + 1)); todo.push((&tree[*r], level + 1)); }, @@ -348,13 +349,13 @@ mod huffman { } } - Huffman { + Huffman { encode, decode, } } - /// Inserts a symbol, and + /// Inserts a symbol, and fn insert_decode(map: &mut [Decode; 256], symbol: &T, bits: usize, code: u64) where T: Clone { let byte: u8 = (code >> 56).try_into().unwrap(); if bits <= 8 { @@ -379,7 +380,7 @@ mod huffman { Fork(usize, usize), } - /// Decoder + /// Decoder #[derive(Eq, PartialEq, Ord, PartialOrd, Debug, Default)] pub enum Decode { /// An as-yet unfilled slot. @@ -395,7 +396,7 @@ mod huffman { /// Tests to see if the map contains any invalid values. /// /// A correctly initialized map will have no invalid values. - /// A map with invalid values will be unable to decode some + /// A map with invalid values will be unable to decode some /// input byte sequences. fn any_void(&self) -> bool { match self { diff --git a/containers/src/lib.rs b/containers/src/lib.rs new file mode 100644 index 000000000..e5b661a6c --- /dev/null +++ b/containers/src/lib.rs @@ -0,0 +1,60 @@ +//! Additional containers supported in Differential Dataflow. + +use std::rc::Rc; +use differential_dataflow::difference::Semigroup; +use differential_dataflow::lattice::Lattice; +use differential_dataflow::trace::implementations::ord_neu::{OrdValBatch, OrdValBuilder}; +use differential_dataflow::trace::implementations::spine_fueled::Spine; +use differential_dataflow::trace::implementations::{Layout, OffsetList, PreferredContainer, Update}; +use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher; +use differential_dataflow::trace::rc_blanket_impls::RcBuilder; + +use crate::columnation::{ColMerger, ColumnationChunker, TimelyStack}; + +pub mod columnation; +pub mod huffman_container; +pub mod rhh; + +/// A trace implementation backed by columnar storage. +pub type PreferredSpine = Spine>>>; +/// A batcher for columnar storage. +pub type PreferredBatcher = MergeBatcher::Owned,::Owned),T,R)>, ColumnationChunker<((::Owned,::Owned),T,R)>, ColMerger<(::Owned,::Owned),T,R>>; +/// A builder for columnar storage. +pub type PreferredBuilder = RcBuilder, TimelyStack<((::Owned,::Owned),T,R)>>>; + +/// An update and layout description based on preferred containers. +pub struct Preferred { + phantom: std::marker::PhantomData<(Box, Box, T, D)>, +} + +impl Update for Preferred +where + K: ToOwned + ?Sized, + K::Owned: Ord+Clone+'static, + V: ToOwned + ?Sized, + V::Owned: Ord+Clone+'static, + T: Ord+Clone+Lattice+timely::progress::Timestamp, + R: Ord+Clone+Semigroup+'static, +{ + type Key = K::Owned; + type Val = V::Owned; + type Time = T; + type Diff = R; +} + +impl Layout for Preferred +where + K: Ord+ToOwned+PreferredContainer + ?Sized, + K::Owned: Ord+Clone+'static, + V: Ord+ToOwned+PreferredContainer + ?Sized, + V::Owned: Ord+Clone+'static, + T: Ord+Clone+Lattice+timely::progress::Timestamp, + D: Ord+Clone+Semigroup+'static, +{ + type Target = Preferred; + type KeyContainer = K::Container; + type ValContainer = V::Container; + type TimeContainer = Vec; + type DiffContainer = Vec; + type OffsetContainer = OffsetList; +} diff --git a/differential-dataflow/src/trace/implementations/rhh.rs b/containers/src/rhh.rs similarity index 97% rename from differential-dataflow/src/trace/implementations/rhh.rs rename to containers/src/rhh.rs index 48ce98899..aa03f10ac 100644 --- a/differential-dataflow/src/trace/implementations/rhh.rs +++ b/containers/src/rhh.rs @@ -7,19 +7,14 @@ use std::rc::Rc; use std::cmp::Ordering; - use serde::{Deserialize, Serialize}; - -use crate::Hashable; -use crate::containers::TimelyStack; -use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker}; -use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger}; -use crate::trace::implementations::spine_fueled::Spine; -use crate::trace::rc_blanket_impls::RcBuilder; - -use super::{Update, Layout, Vector, TStack}; - -use self::val_batch::{RhhValBatch, RhhValBuilder}; +use differential_dataflow::Hashable; +use differential_dataflow::trace::implementations::chunker::VecChunker; +use differential_dataflow::trace::implementations::merge_batcher::{MergeBatcher, VecMerger}; +use differential_dataflow::trace::implementations::spine_fueled::Spine; +use differential_dataflow::trace::implementations::Vector; +use differential_dataflow::trace::rc_blanket_impls::RcBuilder; +use crate::rhh::val_batch::{RhhValBatch, RhhValBuilder}; /// A trace implementation using a spine of ordered lists. pub type VecSpine = Spine>>>; @@ -31,13 +26,6 @@ pub type VecBuilder = RcBuilder, Vec< // /// A trace implementation for empty values using a spine of ordered lists. // pub type OrdKeySpine = Spine>>>; -/// A trace implementation backed by columnar storage. -pub type ColSpine = Spine>>>; -/// A batcher for columnar storage. -pub type ColBatcher = MergeBatcher, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>; -/// A builder for columnar storage. -pub type ColBuilder = RcBuilder, TimelyStack<((K,V),T,R)>>>; - // /// A trace implementation backed by columnar storage. // pub type ColKeySpine = Spine>>>; @@ -90,13 +78,11 @@ mod val_batch { use serde::{Deserialize, Serialize}; use timely::container::PushInto; use timely::progress::{Antichain, frontier::AntichainRef}; - - use crate::hashable::Hashable; - use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; - use crate::trace::implementations::{BatchContainer, BuilderInput}; - use crate::IntoOwned; - - use super::{Layout, Update, HashOrdered}; + use differential_dataflow::{Hashable, IntoOwned}; + use differential_dataflow::lattice::Lattice; + use differential_dataflow::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; + use differential_dataflow::trace::implementations::{BatchContainer, BuilderInput, Layout, Update}; + use crate::rhh::HashOrdered; /// Update tuples organized as a Robin Hood Hash map, ordered by `(hash(Key), Key, Val, Time)`. /// @@ -368,7 +354,6 @@ mod val_batch { fn new(batch1: &RhhValBatch, batch2: &RhhValBatch, compaction_frontier: AntichainRef<::Time>) -> Self { assert!(batch1.upper() == batch2.lower()); - use crate::lattice::Lattice; let mut since = batch1.description().since().join(batch2.description().since()); since = since.join(&compaction_frontier.to_owned()); @@ -594,7 +579,6 @@ mod val_batch { let time = source.times.index(i); let diff = source.diffs.index(i); let mut new_time = time.into_owned(); - use crate::lattice::Lattice; new_time.advance_by(self.description.since().borrow()); self.update_stash.push((new_time, diff.into_owned())); } @@ -602,8 +586,7 @@ mod val_batch { /// Consolidates `self.updates_stash` and produces the offset to record, if any. fn consolidate_updates(&mut self) -> Option { - use crate::consolidation; - consolidation::consolidate(&mut self.update_stash); + differential_dataflow::consolidation::consolidate(&mut self.update_stash); if !self.update_stash.is_empty() { // If there is a single element, equal to a just-prior recorded update, // we push nothing and report an unincremented offset to encode this case. diff --git a/differential-dataflow/Cargo.toml b/differential-dataflow/Cargo.toml index 42ab57d51..7b1e68cf0 100644 --- a/differential-dataflow/Cargo.toml +++ b/differential-dataflow/Cargo.toml @@ -20,10 +20,9 @@ indexmap = "2.1" rand="0.4" itertools="^0.13" graph_map = "0.1" -bytemuck = "1.18.0" [dependencies] -columnar = "0.5" +columnar = { workspace = true } columnation = "0.1.0" fnv="1.0.2" paste = "1.0" diff --git a/differential-dataflow/src/lib.rs b/differential-dataflow/src/lib.rs index c67b356cf..d4eb7f9e5 100644 --- a/differential-dataflow/src/lib.rs +++ b/differential-dataflow/src/lib.rs @@ -105,7 +105,6 @@ pub mod collection; pub mod logging; pub mod consolidation; pub mod capture; -pub mod containers; mod into_owned; /// Configuration options for differential dataflow. diff --git a/differential-dataflow/src/trace/implementations/chunker.rs b/differential-dataflow/src/trace/implementations/chunker.rs index 0eef4d779..87bb9b5f3 100644 --- a/differential-dataflow/src/trace/implementations/chunker.rs +++ b/differential-dataflow/src/trace/implementations/chunker.rs @@ -2,11 +2,9 @@ use std::collections::VecDeque; -use columnation::Columnation; use timely::Container; use timely::container::{ContainerBuilder, PushInto, SizableContainer}; -use crate::containers::TimelyStack; use crate::consolidation::{consolidate_updates, ConsolidateLayout}; use crate::difference::Semigroup; @@ -124,117 +122,6 @@ where } } -/// Chunk a stream of vectors into chains of vectors. -pub struct ColumnationChunker { - pending: Vec, - ready: VecDeque>, - empty: Option>, -} - -impl Default for ColumnationChunker { - fn default() -> Self { - Self { - pending: Vec::default(), - ready: VecDeque::default(), - empty: None, - } - } -} - -impl ColumnationChunker<(D, T, R)> -where - D: Columnation + Ord, - T: Columnation + Ord, - R: Columnation + Semigroup, -{ - const BUFFER_SIZE_BYTES: usize = 64 << 10; - fn chunk_capacity() -> usize { - let size = ::std::mem::size_of::<(D, T, R)>(); - if size == 0 { - Self::BUFFER_SIZE_BYTES - } else if size <= Self::BUFFER_SIZE_BYTES { - Self::BUFFER_SIZE_BYTES / size - } else { - 1 - } - } - - /// Form chunks out of pending data, if needed. This function is meant to be applied to - /// potentially full buffers, and ensures that if the buffer was full when called it is at most - /// half full when the function returns. - /// - /// `form_chunk` does the following: - /// * If pending is full, consolidate. - /// * If after consolidation it's more than half full, peel off chunks, - /// leaving behind any partial chunk in pending. - fn form_chunk(&mut self) { - consolidate_updates(&mut self.pending); - if self.pending.len() >= Self::chunk_capacity() { - while self.pending.len() > Self::chunk_capacity() { - let mut chunk = TimelyStack::with_capacity(Self::chunk_capacity()); - for item in self.pending.drain(..chunk.capacity()) { - chunk.copy(&item); - } - self.ready.push_back(chunk); - } - } - } -} - -impl<'a, D, T, R> PushInto<&'a mut Vec<(D, T, R)>> for ColumnationChunker<(D, T, R)> -where - D: Columnation + Ord + Clone, - T: Columnation + Ord + Clone, - R: Columnation + Semigroup + Clone, -{ - fn push_into(&mut self, container: &'a mut Vec<(D, T, R)>) { - // Ensure `self.pending` has the desired capacity. We should never have a larger capacity - // because we don't write more than capacity elements into the buffer. - if self.pending.capacity() < Self::chunk_capacity() * 2 { - self.pending.reserve(Self::chunk_capacity() * 2 - self.pending.len()); - } - - let mut drain = container.drain(..).peekable(); - while drain.peek().is_some() { - self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len())); - if self.pending.len() == self.pending.capacity() { - self.form_chunk(); - } - } - } -} - -impl ContainerBuilder for ColumnationChunker<(D, T, R)> -where - D: Columnation + Ord + Clone + 'static, - T: Columnation + Ord + Clone + 'static, - R: Columnation + Semigroup + Clone + 'static, -{ - type Container = TimelyStack<(D,T,R)>; - - fn extract(&mut self) -> Option<&mut Self::Container> { - if let Some(ready) = self.ready.pop_front() { - self.empty = Some(ready); - self.empty.as_mut() - } else { - None - } - } - - fn finish(&mut self) -> Option<&mut Self::Container> { - consolidate_updates(&mut self.pending); - while !self.pending.is_empty() { - let mut chunk = TimelyStack::with_capacity(Self::chunk_capacity()); - for item in self.pending.drain(..std::cmp::min(self.pending.len(), chunk.capacity())) { - chunk.copy(&item); - } - self.ready.push_back(chunk); - } - self.empty = self.ready.pop_front(); - self.empty.as_mut() - } -} - /// Chunk a stream of containers into chains of vectors. pub struct ContainerChunker { pending: Output, diff --git a/differential-dataflow/src/trace/implementations/merge_batcher.rs b/differential-dataflow/src/trace/implementations/merge_batcher.rs index 4de943594..924392aa7 100644 --- a/differential-dataflow/src/trace/implementations/merge_batcher.rs +++ b/differential-dataflow/src/trace/implementations/merge_batcher.rs @@ -227,7 +227,7 @@ pub trait Merger: Default { fn account(chunk: &Self::Chunk) -> (usize, usize, usize, usize); } -pub use container::{VecMerger, ColMerger}; +pub use container::VecMerger; pub mod container { @@ -520,91 +520,4 @@ pub mod container { } } } - - pub use columnation::ColMerger; - /// Implementations of `ContainerQueue` and `MergerChunk` for `TimelyStack` containers (columnation). - pub mod columnation { - - use timely::progress::{Antichain, frontier::AntichainRef}; - use columnation::Columnation; - - use crate::containers::TimelyStack; - use crate::difference::Semigroup; - - use super::{ContainerQueue, MergerChunk}; - - /// A `Merger` implementation backed by `TimelyStack` containers (columnation). - pub type ColMerger = super::ContainerMerger,TimelyStackQueue<(D, T, R)>>; - - /// TODO - pub struct TimelyStackQueue { - list: TimelyStack, - head: usize, - } - - impl ContainerQueue> for TimelyStackQueue<(D, T, R)> { - fn next_or_alloc(&mut self) -> Result<&(D, T, R), TimelyStack<(D, T, R)>> { - if self.is_empty() { - Err(std::mem::take(&mut self.list)) - } - else { - Ok(self.pop()) - } - } - fn is_empty(&self) -> bool { - self.head == self.list[..].len() - } - fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering { - let (data1, time1, _) = self.peek(); - let (data2, time2, _) = other.peek(); - (data1, time1).cmp(&(data2, time2)) - } - fn from(list: TimelyStack<(D, T, R)>) -> Self { - TimelyStackQueue { list, head: 0 } - } - } - - impl TimelyStackQueue { - fn pop(&mut self) -> &T { - self.head += 1; - &self.list[self.head - 1] - } - - fn peek(&self) -> &T { - &self.list[self.head] - } - } - - impl MergerChunk for TimelyStack<(D, T, R)> { - type TimeOwned = T; - type DiffOwned = R; - - fn time_kept((_, time, _): &Self::Item<'_>, upper: &AntichainRef, frontier: &mut Antichain) -> bool { - if upper.less_equal(time) { - frontier.insert_with(&time, |time| time.clone()); - true - } - else { false } - } - fn push_and_add<'a>(&mut self, item1: Self::Item<'a>, item2: Self::Item<'a>, stash: &mut Self::DiffOwned) { - let (data, time, diff1) = item1; - let (_data, _time, diff2) = item2; - stash.clone_from(diff1); - stash.plus_equals(&diff2); - if !stash.is_zero() { - self.copy_destructured(data, time, stash); - } - } - fn account(&self) -> (usize, usize, usize, usize) { - let (mut size, mut capacity, mut allocations) = (0, 0, 0); - let cb = |siz, cap| { - size += siz; - capacity += cap; - allocations += 1; - }; - self.heap_size(cb); - (self.len(), size, capacity, allocations) - } - } - } } diff --git a/differential-dataflow/src/trace/implementations/mod.rs b/differential-dataflow/src/trace/implementations/mod.rs index 717c9df68..65b78245e 100644 --- a/differential-dataflow/src/trace/implementations/mod.rs +++ b/differential-dataflow/src/trace/implementations/mod.rs @@ -42,8 +42,6 @@ pub mod spine_fueled; pub mod merge_batcher; pub mod ord_neu; -pub mod rhh; -pub mod huffman_container; pub mod chunker; // Opinionated takes on default spines. @@ -57,13 +55,11 @@ pub use self::ord_neu::RcOrdKeyBuilder as KeyBuilder; use std::borrow::{ToOwned}; use std::convert::TryInto; -use columnation::Columnation; use serde::{Deserialize, Serialize}; use timely::Container; use timely::container::PushInto; use timely::progress::Timestamp; -use crate::containers::TimelyStack; use crate::lattice::Lattice; use crate::difference::Semigroup; @@ -126,26 +122,6 @@ where type OffsetContainer = OffsetList; } -/// A layout based on timely stacks -pub struct TStack { - phantom: std::marker::PhantomData, -} - -impl Layout for TStack -where - U::Key: Columnation, - U::Val: Columnation, - U::Time: Columnation, - U::Diff: Columnation + Ord, -{ - type Target = U; - type KeyContainer = TimelyStack; - type ValContainer = TimelyStack; - type TimeContainer = TimelyStack; - type DiffContainer = TimelyStack; - type OffsetContainer = OffsetList; -} - /// A type with a preferred container. /// /// Examples include types that implement `Clone` who prefer @@ -162,43 +138,6 @@ impl PreferredContainer for [T] { type Container = SliceContainer; } -/// An update and layout description based on preferred containers. -pub struct Preferred { - phantom: std::marker::PhantomData<(Box, Box, T, D)>, -} - -impl Update for Preferred -where - K: ToOwned + ?Sized, - K::Owned: Ord+Clone+'static, - V: ToOwned + ?Sized, - V::Owned: Ord+Clone+'static, - T: Ord+Clone+Lattice+timely::progress::Timestamp, - R: Ord+Clone+Semigroup+'static, -{ - type Key = K::Owned; - type Val = V::Owned; - type Time = T; - type Diff = R; -} - -impl Layout for Preferred -where - K: Ord+ToOwned+PreferredContainer + ?Sized, - K::Owned: Ord+Clone+'static, - V: Ord+ToOwned+PreferredContainer + ?Sized, - V::Owned: Ord+Clone+'static, - T: Ord+Clone+Lattice+timely::progress::Timestamp, - D: Ord+Clone+Semigroup+'static, -{ - type Target = Preferred; - type KeyContainer = K::Container; - type ValContainer = V::Container; - type TimeContainer = Vec; - type DiffContainer = Vec; - type OffsetContainer = OffsetList; -} - /// A list of unsigned integers that uses `u32` elements as long as they are small enough, and switches to `u64` once they are not. #[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Serialize, Deserialize)] pub struct OffsetList { @@ -396,69 +335,14 @@ where } } -impl BuilderInput for TimelyStack<((K::Owned, V::Owned), T, R)> -where - K: BatchContainer, - for<'a> K::ReadItem<'a>: PartialEq<&'a K::Owned>, - K::Owned: Ord + Columnation + Clone + 'static, - V: BatchContainer, - for<'a> V::ReadItem<'a>: PartialEq<&'a V::Owned>, - V::Owned: Ord + Columnation + Clone + 'static, - T: Timestamp + Lattice + Columnation + Clone + 'static, - R: Ord + Clone + Semigroup + Columnation + 'static, -{ - type Key<'a> = &'a K::Owned; - type Val<'a> = &'a V::Owned; - type Time = T; - type Diff = R; - - fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) { - (key, val, time.clone(), diff.clone()) - } - - fn key_eq(this: &&K::Owned, other: K::ReadItem<'_>) -> bool { - K::reborrow(other) == *this - } - - fn val_eq(this: &&V::Owned, other: V::ReadItem<'_>) -> bool { - V::reborrow(other) == *this - } - - fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) { - let mut keys = 0; - let mut vals = 0; - let mut upds = 0; - let mut prev_keyval = None; - for link in chain.iter() { - for ((key, val), _, _) in link.iter() { - if let Some((p_key, p_val)) = prev_keyval { - if p_key != key { - keys += 1; - vals += 1; - } else if p_val != val { - vals += 1; - } - } else { - keys += 1; - vals += 1; - } - upds += 1; - prev_keyval = Some((key, val)); - } - } - (keys, vals, upds) - } -} pub use self::containers::{BatchContainer, SliceContainer}; /// Containers for data that resemble `Vec`, with leaner implementations. pub mod containers { - use columnation::Columnation; use timely::container::PushInto; - use crate::containers::TimelyStack; use crate::IntoOwned; /// A general-purpose container resembling `Vec`. @@ -576,30 +460,6 @@ pub mod containers { } } - // The `ToOwned` requirement exists to satisfy `self.reserve_items`, who must for now - // be presented with the actual contained type, rather than a type that borrows into it. - impl BatchContainer for TimelyStack { - type Owned = T; - type ReadItem<'a> = &'a T; - - fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item } - - fn with_capacity(size: usize) -> Self { - Self::with_capacity(size) - } - fn merge_capacity(cont1: &Self, cont2: &Self) -> Self { - let mut new = Self::default(); - new.reserve_regions(std::iter::once(cont1).chain(std::iter::once(cont2))); - new - } - fn index(&self, index: usize) -> Self::ReadItem<'_> { - &self[index] - } - fn len(&self) -> usize { - self[..].len() - } - } - /// A container that accepts slices `[B::Item]`. pub struct SliceContainer { /// Offsets that bound each contained slice. diff --git a/differential-dataflow/src/trace/implementations/ord_neu.rs b/differential-dataflow/src/trace/implementations/ord_neu.rs index 5035a229c..3798ff7e9 100644 --- a/differential-dataflow/src/trace/implementations/ord_neu.rs +++ b/differential-dataflow/src/trace/implementations/ord_neu.rs @@ -10,13 +10,12 @@ use std::rc::Rc; -use crate::containers::TimelyStack; -use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker}; +use crate::trace::implementations::chunker::VecChunker; use crate::trace::implementations::spine_fueled::Spine; -use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger}; +use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger}; use crate::trace::rc_blanket_impls::RcBuilder; -use super::{Update, Layout, Vector, TStack, Preferred}; +use super::{Update, Layout, Vector}; pub use self::val_batch::{OrdValBatch, OrdValBuilder}; pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder}; @@ -28,16 +27,6 @@ pub type OrdValBatcher = MergeBatcher, VecChunker<( /// A builder using ordered lists. pub type RcOrdValBuilder = RcBuilder, Vec<((K,V),T,R)>>>; -// /// A trace implementation for empty values using a spine of ordered lists. -// pub type OrdKeySpine = Spine>>>; - -/// A trace implementation backed by columnar storage. -pub type ColValSpine = Spine>>>; -/// A batcher for columnar storage. -pub type ColValBatcher = MergeBatcher, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>; -/// A builder for columnar storage. -pub type ColValBuilder = RcBuilder, TimelyStack<((K,V),T,R)>>>; - /// A trace implementation using a spine of ordered lists. pub type OrdKeySpine = Spine>>>; /// A batcher for ordered lists. @@ -45,27 +34,6 @@ pub type OrdKeyBatcher = MergeBatcher, VecChunker<((K /// A builder for ordered lists. pub type RcOrdKeyBuilder = RcBuilder, Vec<((K,()),T,R)>>>; -// /// A trace implementation for empty values using a spine of ordered lists. -// pub type OrdKeySpine = Spine>>>; - -/// A trace implementation backed by columnar storage. -pub type ColKeySpine = Spine>>>; -/// A batcher for columnar storage -pub type ColKeyBatcher = MergeBatcher, ColumnationChunker<((K,()),T,R)>, ColMerger<(K,()),T,R>>; -/// A builder for columnar storage -pub type ColKeyBuilder = RcBuilder, TimelyStack<((K,()),T,R)>>>; - -/// A trace implementation backed by columnar storage. -pub type PreferredSpine = Spine>>>; -/// A batcher for columnar storage. -pub type PreferredBatcher = MergeBatcher::Owned,::Owned),T,R)>, ColumnationChunker<((::Owned,::Owned),T,R)>, ColMerger<(::Owned,::Owned),T,R>>; -/// A builder for columnar storage. -pub type PreferredBuilder = RcBuilder, TimelyStack<((::Owned,::Owned),T,R)>>>; - -// /// A trace implementation backed by columnar storage. -// pub type ColKeySpine = Spine>>>; - - /// Types related to forming batches with values. pub mod val_batch {