From b4770354b10b0f5f23af12f67b694ae4a0ed8e64 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Tue, 15 Jul 2025 14:32:40 +0200 Subject: [PATCH] Separate chunker from batcher The chunker was part of the batcher and responsible for transforming input data into the batcher's chain format. Hence, the batcher needed to be aware of its input types, although it would not otherwise use this information. With this change, the chunker is separate of the batcher. This simplifies the logic within the chunker slightly, but most importantly moves the responsibility to form chunks to whoever holds the batcher. In Differential, this is `arrange_core`. It now learns about an input container type and a chunker and uses the chunker to convert input data to chunks of sorted and consolidated data. Signed-off-by: Moritz Hoffmann --- differential-dataflow/examples/columnar.rs | 14 ++-- differential-dataflow/examples/spines.rs | 15 +++-- .../src/operators/arrange/arrangement.rs | 65 ++++++++++++------- .../src/operators/consolidate.rs | 14 ++-- .../trace/implementations/merge_batcher.rs | 51 +++++---------- .../src/trace/implementations/mod.rs | 1 + .../src/trace/implementations/ord_neu.rs | 9 ++- .../src/trace/implementations/rhh.rs | 5 +- differential-dataflow/src/trace/mod.rs | 14 ++-- differential-dataflow/tests/trace.rs | 3 +- experiments/src/bin/deals.rs | 14 ++-- experiments/src/bin/graspan1.rs | 6 +- experiments/src/bin/graspan2.rs | 40 ++++++------ 13 files changed, 126 insertions(+), 125 deletions(-) diff --git a/differential-dataflow/examples/columnar.rs b/differential-dataflow/examples/columnar.rs index b5af06ef2..ef4c56871 100644 --- a/differential-dataflow/examples/columnar.rs +++ b/differential-dataflow/examples/columnar.rs @@ -44,8 +44,9 @@ fn main() { let data_pact = ExchangeCore::,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().map(|x| *x as u64).sum::() as u64); let keys_pact = ExchangeCore::,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().map(|x| *x as u64).sum::() as u64); - let data = arrange_core::<_,_,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&data, data_pact, "Data"); - let keys = arrange_core::<_,_,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&keys, keys_pact, "Keys"); + use crate::batcher::Col2ValChunker; + let data = arrange_core::<_,_,_,Col2ValChunker,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&data, data_pact, "Data"); + let keys = arrange_core::<_,_,_,Col2ValChunker,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&keys, keys_pact, "Keys"); keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); @@ -373,7 +374,8 @@ pub mod batcher { use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher; /// A batcher for columnar storage. - pub type Col2ValBatcher = MergeBatcher, Chunker>, merger::ColumnMerger<(K,V),T,R>>; + pub type Col2ValChunker = Chunker>; + pub type Col2ValBatcher = MergeBatcher>; pub type Col2KeyBatcher = Col2ValBatcher; // First draft: build a "chunker" and a "merger". @@ -408,11 +410,11 @@ pub mod batcher { impl<'a, D, T, R, C2> PushInto<&'a mut Column<(D, T, R)>> for Chunker where - D: for<'b> Columnar, + D: Columnar, for<'b> columnar::Ref<'b, D>: Ord, - T: for<'b> Columnar, + T: Columnar, for<'b> columnar::Ref<'b, T>: Ord, - R: for<'b> Columnar + for<'b> Semigroup>, + R: Columnar + for<'b> Semigroup>, for<'b> columnar::Ref<'b, R>: Ord, C2: Container + for<'b, 'c> PushInto<(columnar::Ref<'b, D>, columnar::Ref<'b, T>, &'c R)>, { diff --git a/differential-dataflow/examples/spines.rs b/differential-dataflow/examples/spines.rs index 3fada3d5f..e36623d51 100644 --- a/differential-dataflow/examples/spines.rs +++ b/differential-dataflow/examples/spines.rs @@ -28,23 +28,26 @@ fn main() { match mode.as_str() { "new" => { + use differential_dataflow::trace::implementations::ColumnationChunker; use differential_dataflow::trace::implementations::ord_neu::{ColKeyBatcher, ColKeyBuilder, ColKeySpine}; - let data = data.arrange::, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(); - let keys = keys.arrange::, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(); + let data = data.arrange::,ColKeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(); + let keys = keys.arrange::,ColKeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(); keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); }, "old" => { + use differential_dataflow::trace::implementations::VecChunker; use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatcher, RcOrdKeyBuilder, OrdKeySpine}; - let data = data.arrange::, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>(); - let keys = keys.arrange::, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>(); + let data = data.arrange::,OrdKeyBatcher<_,_,_>, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>(); + let keys = keys.arrange::,OrdKeyBatcher<_,_,_>, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>(); keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); }, "rhh" => { + use differential_dataflow::trace::implementations::VecChunker; use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecBatcher, VecBuilder, VecSpine}; - let data = data.map(|x| HashWrapper { inner: x }).arrange::, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>(); - let keys = keys.map(|x| HashWrapper { inner: x }).arrange::, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>(); + let data = data.map(|x| HashWrapper { inner: x }).arrange::,VecBatcher<_,(),_,_>, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>(); + let keys = keys.map(|x| HashWrapper { inner: x }).arrange::,VecBatcher<_,(),_,_>, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>(); keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); }, diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index 05cab5328..00b229e79 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -30,7 +30,7 @@ use crate::{Data, ExchangeData, Collection, AsCollection, Hashable}; use crate::difference::Semigroup; use crate::lattice::Lattice; use crate::trace::{self, Trace, TraceReader, BatchReader, Batcher, Builder, Cursor}; -use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine, ValBatcher, ValBuilder, ValSpine}; +use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine, ValBatcher, ValBuilder, ValSpine, VecChunker}; use trace::wrappers::enter::{TraceEnter, BatchEnter,}; use trace::wrappers::enter_at::TraceEnter as TraceEnterAt; @@ -76,7 +76,7 @@ where use ::timely::dataflow::scopes::Child; use ::timely::progress::timestamp::Refines; use timely::Container; -use timely::container::PushInto; +use timely::container::{ContainerBuilder, PushInto}; impl Arranged where @@ -348,20 +348,22 @@ where G: Scope, { /// Arranges updates into a shared trace. - fn arrange(&self) -> Arranged> + fn arrange(&self) -> Arranged> where - Ba: Batcher + 'static, - Bu: Builder, + Chu: ContainerBuilder + for<'a> PushInto<&'a mut C>, + Ba: Batcher + 'static, + Bu: Builder, Tr: Trace + 'static, { - self.arrange_named::("Arrange") + self.arrange_named::("Arrange") } /// Arranges updates into a shared trace, with a supplied name. - fn arrange_named(&self, name: &str) -> Arranged> + fn arrange_named(&self, name: &str) -> Arranged> where - Ba: Batcher + 'static, - Bu: Builder, + Chu: ContainerBuilder + for<'a> PushInto<&'a mut C>, + Ba: Batcher + 'static, + Bu: Builder, Tr: Trace + 'static, ; } @@ -373,14 +375,15 @@ where V: ExchangeData, R: ExchangeData + Semigroup, { - fn arrange_named(&self, name: &str) -> Arranged> + fn arrange_named(&self, name: &str) -> Arranged> where - Ba: Batcher, Time=G::Timestamp> + 'static, - Bu: Builder, + Chu: ContainerBuilder + for<'a> PushInto<&'a mut Vec<((K, V), G::Timestamp, R)>>, + Ba: Batcher + 'static, + Bu: Builder, Tr: Trace + 'static, { let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); - arrange_core::<_, _, Ba, Bu, _>(&self.inner, exchange, name) + arrange_core::<_, _, _, Chu, Ba, Bu, _>(&self.inner, exchange, name) } } @@ -389,12 +392,14 @@ where /// This operator arranges a stream of values into a shared trace, whose contents it maintains. /// It uses the supplied parallelization contract to distribute the data, which does not need to /// be consistently by key (though this is the most common). -pub fn arrange_core(stream: &StreamCore, pact: P, name: &str) -> Arranged> +pub fn arrange_core(stream: &StreamCore, pact: P, name: &str) -> Arranged> where G: Scope, - P: ParallelizationContract, - Ba: Batcher + 'static, - Bu: Builder, + P: ParallelizationContract, + C: Container + Clone + 'static, + Chu: ContainerBuilder + for<'a> PushInto<&'a mut C>, + Ba: Batcher + 'static, + Bu: Builder, Tr: Trace+'static, { // The `Arrange` operator is tasked with reacting to an advancing input @@ -443,6 +448,8 @@ where // Initialize to the minimal input frontier. let mut prev_frontier = Antichain::from_elem(::minimum()); + let mut chunker = Chu::default(); + move |input, output| { // As we receive data, we need to (i) stash the data and (ii) keep *enough* capabilities. @@ -451,7 +458,11 @@ where input.for_each(|cap, data| { capabilities.insert(cap.retain()); - batcher.push_container(data); + chunker.push_into(data); + while let Some(chunk) = chunker.extract() { + let chunk = std::mem::take(chunk); + batcher.push_into(chunk); + } }); // The frontier may have advanced by multiple elements, which is an issue because @@ -481,6 +492,11 @@ where // If there is at least one capability not in advance of the input frontier ... if capabilities.elements().iter().any(|c| !input.frontier().less_equal(c.time())) { + while let Some(chunk) = chunker.finish() { + let chunk = std::mem::take(chunk); + batcher.push_into(chunk); + } + let mut upper = Antichain::new(); // re-used allocation for sealing batches. // For each capability not in advance of the input frontier ... @@ -547,14 +563,15 @@ impl Arrange, { - fn arrange_named(&self, name: &str) -> Arranged> + fn arrange_named(&self, name: &str) -> Arranged> where - Ba: Batcher, Time=G::Timestamp> + 'static, - Bu: Builder, + Chu: ContainerBuilder + for<'a> PushInto<&'a mut Vec<((K, ()), G::Timestamp, R)>>, + Ba: Batcher + 'static, + Bu: Builder, Tr: Trace + 'static, { let exchange = Exchange::new(move |update: &((K,()),G::Timestamp,R)| (update.0).0.hashed().into()); - arrange_core::<_,_,Ba,Bu,_>(&self.map(|k| (k, ())).inner, exchange, name) + arrange_core::<_,_,_,Chu,Ba,Bu,_>(&self.map(|k| (k, ())).inner, exchange, name) } } @@ -587,7 +604,7 @@ where } fn arrange_by_key_named(&self, name: &str) -> Arranged>> { - self.arrange_named::,ValBuilder<_,_,_,_>,_>(name) + self.arrange_named::, ValBatcher<_,_,_,_>,ValBuilder<_,_,_,_>,_>(name) } } @@ -622,6 +639,6 @@ where fn arrange_by_self_named(&self, name: &str) -> Arranged>> { self.map(|k| (k, ())) - .arrange_named::,KeyBuilder<_,_,_>,_>(name) + .arrange_named::, KeyBatcher<_,_,_>,KeyBuilder<_,_,_>,_>(name) } } diff --git a/differential-dataflow/src/operators/consolidate.rs b/differential-dataflow/src/operators/consolidate.rs index 00ad3f4b9..1430ecfa2 100644 --- a/differential-dataflow/src/operators/consolidate.rs +++ b/differential-dataflow/src/operators/consolidate.rs @@ -6,6 +6,7 @@ //! underlying system can more clearly see that no work must be done in the later case, and we can //! drop out of, e.g. iterative computations. +use timely::container::{ContainerBuilder, PushInto}; use timely::dataflow::Scope; use crate::{Collection, ExchangeData, Hashable}; @@ -44,22 +45,23 @@ where /// }); /// ``` pub fn consolidate(&self) -> Self { - use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine}; - self.consolidate_named::,KeyBuilder<_,_,_>, KeySpine<_,_,_>,_>("Consolidate", |key,&()| key.clone()) + use crate::trace::implementations::{VecChunker, KeyBatcher, KeyBuilder, KeySpine}; + self.consolidate_named::,KeyBatcher<_, _, _>,KeyBuilder<_,_,_>, KeySpine<_,_,_>,_>("Consolidate", |key,&()| key.clone()) } /// As `consolidate` but with the ability to name the operator, specify the trace type, /// and provide the function `reify` to produce owned keys and values.. - pub fn consolidate_named(&self, name: &str, reify: F) -> Self + pub fn consolidate_named(&self, name: &str, reify: F) -> Self where - Ba: Batcher, Time=G::Timestamp> + 'static, + Chu: ContainerBuilder + for<'a> PushInto<&'a mut Vec<((D, ()), G::Timestamp, R)>>, + Ba: Batcher + 'static, Tr: for<'a> crate::trace::Trace+'static, - Bu: Builder, + Bu: Builder, F: Fn(Tr::Key<'_>, Tr::Val<'_>) -> D + 'static, { use crate::operators::arrange::arrangement::Arrange; self.map(|k| (k, ())) - .arrange_named::(name) + .arrange_named::(name) .as_collection(reify) } diff --git a/differential-dataflow/src/trace/implementations/merge_batcher.rs b/differential-dataflow/src/trace/implementations/merge_batcher.rs index 90a74f662..daf53e174 100644 --- a/differential-dataflow/src/trace/implementations/merge_batcher.rs +++ b/differential-dataflow/src/trace/implementations/merge_batcher.rs @@ -10,23 +10,16 @@ //! Implementations of `MergeBatcher` can be instantiated through the choice of both //! the chunker and the merger, provided their respective output and input types align. -use std::marker::PhantomData; - use timely::progress::frontier::AntichainRef; use timely::progress::{frontier::Antichain, Timestamp}; use timely::Container; -use timely::container::{ContainerBuilder, PushInto}; +use timely::container::PushInto; use crate::logging::{BatcherEvent, Logger}; use crate::trace::{Batcher, Builder, Description}; /// Creates batches from containers of unordered tuples. -/// -/// To implement `Batcher`, the container builder `C` must accept `&mut Input` as inputs, -/// and must produce outputs of type `M::Chunk`. -pub struct MergeBatcher { - /// Transforms input streams to chunks of sorted, consolidated data. - chunker: C, +pub struct MergeBatcher { /// A sequence of power-of-two length lists of sorted, consolidated containers. /// /// Do not push/pop directly but use the corresponding functions ([`Self::chain_push`]/[`Self::chain_pop`]). @@ -43,40 +36,24 @@ pub struct MergeBatcher { logger: Option, /// Timely operator ID. operator_id: usize, - /// The `Input` type needs to be called out as the type of container accepted, but it is not otherwise present. - _marker: PhantomData, } -impl Batcher for MergeBatcher +impl Batcher for MergeBatcher where - C: ContainerBuilder + Default + for<'a> PushInto<&'a mut Input>, M: Merger, { - type Input = Input; type Time = M::Time; - type Output = M::Chunk; + type Container = M::Chunk; fn new(logger: Option, operator_id: usize) -> Self { Self { logger, operator_id, - chunker: C::default(), merger: M::default(), chains: Vec::new(), stash: Vec::new(), frontier: Antichain::new(), lower: Antichain::from_elem(M::Time::minimum()), - _marker: PhantomData, - } - } - - /// Push a container of data into this merge batcher. Updates the internal chain structure if - /// needed. - fn push_container(&mut self, container: &mut Input) { - self.chunker.push_into(container); - while let Some(chunk) = self.chunker.extract() { - let chunk = std::mem::take(chunk); - self.insert_chain(vec![chunk]); } } @@ -84,13 +61,7 @@ where // in `upper`. All updates must have time greater or equal to the previously used `upper`, // which we call `lower`, by assumption that after sealing a batcher we receive no more // updates with times not greater or equal to `upper`. - fn seal>(&mut self, upper: Antichain) -> B::Output { - // Finish - while let Some(chunk) = self.chunker.finish() { - let chunk = std::mem::take(chunk); - self.insert_chain(vec![chunk]); - } - + fn seal>(&mut self, upper: Antichain) -> B::Output { // Merge all remaining chains into a single chain. while self.chains.len() > 1 { let list1 = self.chain_pop().unwrap(); @@ -125,8 +96,16 @@ where self.frontier.borrow() } } +impl PushInto for MergeBatcher +where + M: Merger, +{ + fn push_into(&mut self, item: M::Chunk) { + self.insert_chain(vec![item]); + } +} -impl MergeBatcher { +impl MergeBatcher { /// Insert a chain and maintain chain properties: Chains are geometrically sized and ordered /// by decreasing length. fn insert_chain(&mut self, chain: Vec) { @@ -190,7 +169,7 @@ impl MergeBatcher { } } -impl Drop for MergeBatcher { +impl Drop for MergeBatcher { fn drop(&mut self) { // Cleanup chain to retract accounting information. while self.chain_pop().is_some() {} diff --git a/differential-dataflow/src/trace/implementations/mod.rs b/differential-dataflow/src/trace/implementations/mod.rs index c9c15206a..0ab2370c2 100644 --- a/differential-dataflow/src/trace/implementations/mod.rs +++ b/differential-dataflow/src/trace/implementations/mod.rs @@ -47,6 +47,7 @@ pub mod huffman_container; pub mod chunker; // Opinionated takes on default spines. +pub use self::chunker::{ColumnationChunker, VecChunker}; pub use self::ord_neu::OrdValSpine as ValSpine; pub use self::ord_neu::OrdValBatcher as ValBatcher; pub use self::ord_neu::RcOrdValBuilder as ValBuilder; diff --git a/differential-dataflow/src/trace/implementations/ord_neu.rs b/differential-dataflow/src/trace/implementations/ord_neu.rs index 218fc612a..6aab8c168 100644 --- a/differential-dataflow/src/trace/implementations/ord_neu.rs +++ b/differential-dataflow/src/trace/implementations/ord_neu.rs @@ -11,7 +11,6 @@ use std::rc::Rc; use crate::containers::TimelyStack; -use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker}; use crate::trace::implementations::spine_fueled::Spine; use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger}; use crate::trace::rc_blanket_impls::RcBuilder; @@ -24,7 +23,7 @@ pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder}; /// A trace implementation using a spine of ordered lists. pub type OrdValSpine = Spine>>>; /// A batcher using ordered lists. -pub type OrdValBatcher = MergeBatcher, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>>; +pub type OrdValBatcher = MergeBatcher>; /// A builder using ordered lists. pub type RcOrdValBuilder = RcBuilder, Vec<((K,V),T,R)>>>; @@ -34,14 +33,14 @@ pub type RcOrdValBuilder = RcBuilder = Spine>>>; /// A batcher for columnar storage. -pub type ColValBatcher = MergeBatcher, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>; +pub type ColValBatcher = MergeBatcher>; /// A builder for columnar storage. pub type ColValBuilder = RcBuilder, TimelyStack<((K,V),T,R)>>>; /// A trace implementation using a spine of ordered lists. pub type OrdKeySpine = Spine>>>; /// A batcher for ordered lists. -pub type OrdKeyBatcher = MergeBatcher, VecChunker<((K,()),T,R)>, VecMerger<(K, ()), T, R>>; +pub type OrdKeyBatcher = MergeBatcher>; /// A builder for ordered lists. pub type RcOrdKeyBuilder = RcBuilder, Vec<((K,()),T,R)>>>; @@ -51,7 +50,7 @@ pub type RcOrdKeyBuilder = RcBuilder /// A trace implementation backed by columnar storage. pub type ColKeySpine = Spine>>>; /// A batcher for columnar storage -pub type ColKeyBatcher = MergeBatcher, ColumnationChunker<((K,()),T,R)>, ColMerger<(K,()),T,R>>; +pub type ColKeyBatcher = MergeBatcher>; /// A builder for columnar storage pub type ColKeyBuilder = RcBuilder, TimelyStack<((K,()),T,R)>>>; diff --git a/differential-dataflow/src/trace/implementations/rhh.rs b/differential-dataflow/src/trace/implementations/rhh.rs index 71d20b69d..ee2a1f0d1 100644 --- a/differential-dataflow/src/trace/implementations/rhh.rs +++ b/differential-dataflow/src/trace/implementations/rhh.rs @@ -12,7 +12,6 @@ use serde::{Deserialize, Serialize}; use crate::Hashable; use crate::containers::TimelyStack; -use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker}; use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger}; use crate::trace::implementations::spine_fueled::Spine; use crate::trace::rc_blanket_impls::RcBuilder; @@ -24,7 +23,7 @@ use self::val_batch::{RhhValBatch, RhhValBuilder}; /// A trace implementation using a spine of ordered lists. pub type VecSpine = Spine>>>; /// A batcher for ordered lists. -pub type VecBatcher = MergeBatcher, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>>; +pub type VecBatcher = MergeBatcher>; /// A builder for ordered lists. pub type VecBuilder = RcBuilder, Vec<((K,V),T,R)>>>; @@ -34,7 +33,7 @@ pub type VecBuilder = RcBuilder, Vec< /// A trace implementation backed by columnar storage. pub type ColSpine = Spine>>>; /// A batcher for columnar storage. -pub type ColBatcher = MergeBatcher, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>; +pub type ColBatcher = MergeBatcher>; /// A builder for columnar storage. pub type ColBuilder = RcBuilder, TimelyStack<((K,V),T,R)>>>; diff --git a/differential-dataflow/src/trace/mod.rs b/differential-dataflow/src/trace/mod.rs index 0197e09c0..7edfa0e9b 100644 --- a/differential-dataflow/src/trace/mod.rs +++ b/differential-dataflow/src/trace/mod.rs @@ -12,6 +12,7 @@ pub mod description; pub mod implementations; pub mod wrappers; +use timely::container::PushInto; use timely::progress::{Antichain, frontier::AntichainRef}; use timely::progress::Timestamp; @@ -299,20 +300,17 @@ pub trait Batch : BatchReader + Sized { fn empty(lower: Antichain, upper: Antichain) -> Self; } -/// Functionality for collecting and batching updates. -pub trait Batcher { - /// Type pushed into the batcher. - type Input; +/// Functionality for collecting and batching updates. Accepts chunks and transforms them into +/// chains of chunks. The chunks have type `Container` and must be sorted. +pub trait Batcher: PushInto { /// Type produced by the batcher. - type Output; + type Container: Default; /// Times at which batches are formed. type Time: Timestamp; /// Allocates a new empty batcher. fn new(logger: Option, operator_id: usize) -> Self; - /// Adds an unordered container of elements to the batcher. - fn push_container(&mut self, batch: &mut Self::Input); /// Returns all updates not greater or equal to an element of `upper`. - fn seal>(&mut self, upper: Antichain) -> B::Output; + fn seal>(&mut self, upper: Antichain) -> B::Output; /// Returns the lower envelope of contained update times. fn frontier(&mut self) -> AntichainRef<'_, Self::Time>; } diff --git a/differential-dataflow/tests/trace.rs b/differential-dataflow/tests/trace.rs index 54f111a7d..75911e5bc 100644 --- a/differential-dataflow/tests/trace.rs +++ b/differential-dataflow/tests/trace.rs @@ -1,3 +1,4 @@ +use timely::container::PushInto; use timely::dataflow::operators::generic::OperatorInfo; use timely::progress::{Antichain, frontier::AntichainRef}; @@ -14,7 +15,7 @@ fn get_trace() -> ValSpine { { let mut batcher = ValBatcher::::new(None, 0); - batcher.push_container(&mut vec![ + batcher.push_into(vec![ ((1, 2), 0, 1), ((2, 3), 1, 1), ((2, 3), 2, -1), diff --git a/experiments/src/bin/deals.rs b/experiments/src/bin/deals.rs index f9776d17a..0132f2512 100644 --- a/experiments/src/bin/deals.rs +++ b/experiments/src/bin/deals.rs @@ -6,7 +6,7 @@ use differential_dataflow::input::Input; use differential_dataflow::Collection; use differential_dataflow::operators::*; -use differential_dataflow::trace::implementations::{ValSpine, KeySpine, KeyBatcher, KeyBuilder, ValBatcher, ValBuilder}; +use differential_dataflow::trace::implementations::{ValSpine, KeySpine, KeyBatcher, KeyBuilder, ValBatcher, ValBuilder, VecChunker}; use differential_dataflow::operators::arrange::TraceAgent; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::operators::arrange::Arrange; @@ -41,7 +41,7 @@ fn main() { let (input, graph) = scope.new_collection(); // each edge should exist in both directions. - let graph = graph.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + let graph = graph.arrange::, ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); match program.as_str() { "tc" => tc(&graph).filter(move |_| inspect).map(|_| ()).consolidate().inspect(|x| println!("tc count: {:?}", x)).probe(), @@ -94,10 +94,10 @@ fn tc>(edges: &EdgeArranged) -> C let result = inner .map(|(x,y)| (y,x)) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&edges, |_y,&x,&z| Some((x, z))) .concat(&edges.as_collection(|&k,&v| (k,v))) - .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() + .arrange::,KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; @@ -121,12 +121,12 @@ fn sg>(edges: &EdgeArranged) -> C let result = inner - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&edges, |_,&x,&z| Some((x, z))) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&edges, |_,&x,&z| Some((x, z))) .concat(&peers) - .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() + .arrange::,KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; diff --git a/experiments/src/bin/graspan1.rs b/experiments/src/bin/graspan1.rs index e93bb5381..eee0c2940 100644 --- a/experiments/src/bin/graspan1.rs +++ b/experiments/src/bin/graspan1.rs @@ -6,7 +6,7 @@ use timely::order::Product; use differential_dataflow::difference::Present; use differential_dataflow::input::Input; -use differential_dataflow::trace::implementations::{ValBatcher, ValBuilder, ValSpine}; +use differential_dataflow::trace::implementations::{ValBatcher, ValBuilder, ValSpine, VecChunker}; use differential_dataflow::operators::*; use differential_dataflow::operators::arrange::Arrange; use differential_dataflow::operators::iterate::SemigroupVariable; @@ -31,7 +31,7 @@ fn main() { let (n_handle, nodes) = scope.new_collection(); let (e_handle, edges) = scope.new_collection(); - let edges = edges.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + let edges = edges.arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); // a N c <- a N b && b E c // N(a,c) <- N(a,b), E(b, c) @@ -46,7 +46,7 @@ fn main() { let next = labels.join_core(&edges, |_b, a, c| Some((*c, *a))) .concat(&nodes) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() // .distinct_total_core::(); .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }); diff --git a/experiments/src/bin/graspan2.rs b/experiments/src/bin/graspan2.rs index 042e9f486..bbc03137d 100644 --- a/experiments/src/bin/graspan2.rs +++ b/experiments/src/bin/graspan2.rs @@ -10,7 +10,7 @@ use differential_dataflow::Collection; use differential_dataflow::input::Input; use differential_dataflow::operators::*; use differential_dataflow::operators::arrange::Arrange; -use differential_dataflow::trace::implementations::{ValSpine, KeySpine, ValBatcher, KeyBatcher, ValBuilder, KeyBuilder}; +use differential_dataflow::trace::implementations::{ValSpine, KeySpine, ValBatcher, KeyBatcher, ValBuilder, KeyBuilder, VecChunker}; use differential_dataflow::difference::Present; type Node = u32; @@ -47,7 +47,7 @@ fn unoptimized() { .flat_map(|(a,b)| vec![a,b]) .concat(&dereference.flat_map(|(a,b)| vec![a,b])); - let dereference = dereference.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + let dereference = dereference.arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); let (value_flow, memory_alias, value_alias) = scope @@ -60,14 +60,14 @@ fn unoptimized() { let value_flow = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); let memory_alias = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); - let value_flow_arranged = value_flow.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); - let memory_alias_arranged = memory_alias.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + let value_flow_arranged = value_flow.arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + let memory_alias_arranged = memory_alias.arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); // VA(a,b) <- VF(x,a),VF(x,b) // VA(a,b) <- VF(x,a),MA(x,y),VF(y,b) let value_alias_next = value_flow_arranged.join_core(&value_flow_arranged, |_,&a,&b| Some((a,b))); let value_alias_next = value_flow_arranged.join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a))) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&value_flow_arranged, |_,&a,&b| Some((a,b))) .concat(&value_alias_next); @@ -77,16 +77,16 @@ fn unoptimized() { let value_flow_next = assignment .map(|(a,b)| (b,a)) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a))) .concat(&assignment.map(|(a,b)| (b,a))) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&value_flow_arranged, |_,&a,&b| Some((a,b))) .concat(&nodes.map(|n| (n,n))); let value_flow_next = value_flow_next - .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() + .arrange::,KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() // .distinct_total_core::() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; @@ -95,12 +95,12 @@ fn unoptimized() { let memory_alias_next: Collection<_,_,Present> = value_alias_next .join_core(&dereference, |_x,&y,&a| Some((y,a))) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&dereference, |_y,&a,&b| Some((a,b))); let memory_alias_next: Collection<_,_,Present> = memory_alias_next - .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() + .arrange::,KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() // .distinct_total_core::() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; @@ -172,7 +172,7 @@ fn optimized() { .flat_map(|(a,b)| vec![a,b]) .concat(&dereference.flat_map(|(a,b)| vec![a,b])); - let dereference = dereference.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + let dereference = dereference.arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); let (value_flow, memory_alias) = scope @@ -185,8 +185,8 @@ fn optimized() { let value_flow = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); let memory_alias = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); - let value_flow_arranged = value_flow.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); - let memory_alias_arranged = memory_alias.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + let value_flow_arranged = value_flow.arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + let memory_alias_arranged = memory_alias.arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); // VF(a,a) <- // VF(a,b) <- A(a,x),VF(x,b) @@ -194,13 +194,13 @@ fn optimized() { let value_flow_next = assignment .map(|(a,b)| (b,a)) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a))) .concat(&assignment.map(|(a,b)| (b,a))) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&value_flow_arranged, |_,&a,&b| Some((a,b))) .concat(&nodes.map(|n| (n,n))) - .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() + .arrange::,KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() // .distinct_total_core::() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; @@ -209,9 +209,9 @@ fn optimized() { let value_flow_deref = value_flow .map(|(a,b)| (b,a)) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&dereference, |_x,&a,&b| Some((a,b))) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + .arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); // MA(a,b) <- VFD(x,a),VFD(y,b) // MA(a,b) <- VFD(x,a),MA(x,y),VFD(y,b) @@ -222,10 +222,10 @@ fn optimized() { let memory_alias_next = memory_alias_arranged .join_core(&value_flow_deref, |_x,&y,&a| Some((y,a))) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&value_flow_deref, |_y,&a,&b| Some((a,b))) .concat(&memory_alias_next) - .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() + .arrange::,KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() // .distinct_total_core::() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ;