Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions differential-dataflow/examples/columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ fn main() {
let data_pact = ExchangeCore::<ColumnBuilder<((String,()),u64,i64)>,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().map(|x| *x as u64).sum::<u64>() as u64);
let keys_pact = ExchangeCore::<ColumnBuilder<((String,()),u64,i64)>,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().map(|x| *x as u64).sum::<u64>() as u64);

let data = arrange_core::<_,_,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&data, data_pact, "Data");
let keys = arrange_core::<_,_,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&keys, keys_pact, "Keys");
use crate::batcher::Col2ValChunker;
let data = arrange_core::<_,_,_,Col2ValChunker<WordCount>,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&data, data_pact, "Data");
let keys = arrange_core::<_,_,_,Col2ValChunker<WordCount>,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&keys, keys_pact, "Keys");

keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
Expand Down Expand Up @@ -373,7 +374,8 @@ pub mod batcher {
use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher;

/// A batcher for columnar storage.
pub type Col2ValBatcher<K, V, T, R> = MergeBatcher<Column<((K,V),T,R)>, Chunker<Column<((K,V),T,R)>>, merger::ColumnMerger<(K,V),T,R>>;
pub type Col2ValChunker<T> = Chunker<Column<T>>;
pub type Col2ValBatcher<K, V, T, R> = MergeBatcher<merger::ColumnMerger<(K,V),T,R>>;
pub type Col2KeyBatcher<K, T, R> = Col2ValBatcher<K, (), T, R>;

// First draft: build a "chunker" and a "merger".
Expand Down Expand Up @@ -408,11 +410,11 @@ pub mod batcher {

impl<'a, D, T, R, C2> PushInto<&'a mut Column<(D, T, R)>> for Chunker<C2>
where
D: for<'b> Columnar,
D: Columnar,
for<'b> columnar::Ref<'b, D>: Ord,
T: for<'b> Columnar,
T: Columnar,
for<'b> columnar::Ref<'b, T>: Ord,
R: for<'b> Columnar + for<'b> Semigroup<columnar::Ref<'b, R>>,
R: Columnar + for<'b> Semigroup<columnar::Ref<'b, R>>,
for<'b> columnar::Ref<'b, R>: Ord,
C2: Container + for<'b, 'c> PushInto<(columnar::Ref<'b, D>, columnar::Ref<'b, T>, &'c R)>,
{
Expand Down
15 changes: 9 additions & 6 deletions differential-dataflow/examples/spines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,26 @@ fn main() {

match mode.as_str() {
"new" => {
use differential_dataflow::trace::implementations::ColumnationChunker;
use differential_dataflow::trace::implementations::ord_neu::{ColKeyBatcher, ColKeyBuilder, ColKeySpine};
let data = data.arrange::<ColKeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>();
let keys = keys.arrange::<ColKeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>();
let data = data.arrange::<ColumnationChunker<_>,ColKeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>();
let keys = keys.arrange::<ColumnationChunker<_>,ColKeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>();
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
"old" => {
use differential_dataflow::trace::implementations::VecChunker;
use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatcher, RcOrdKeyBuilder, OrdKeySpine};
let data = data.arrange::<OrdKeyBatcher<_,_,_>, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>();
let keys = keys.arrange::<OrdKeyBatcher<_,_,_>, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>();
let data = data.arrange::<VecChunker<_>,OrdKeyBatcher<_,_,_>, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>();
let keys = keys.arrange::<VecChunker<_>,OrdKeyBatcher<_,_,_>, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>();
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
"rhh" => {
use differential_dataflow::trace::implementations::VecChunker;
use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecBatcher, VecBuilder, VecSpine};
let data = data.map(|x| HashWrapper { inner: x }).arrange::<VecBatcher<_,(),_,_>, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>();
let keys = keys.map(|x| HashWrapper { inner: x }).arrange::<VecBatcher<_,(),_,_>, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>();
let data = data.map(|x| HashWrapper { inner: x }).arrange::<VecChunker<_>,VecBatcher<_,(),_,_>, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>();
let keys = keys.map(|x| HashWrapper { inner: x }).arrange::<VecChunker<_>,VecBatcher<_,(),_,_>, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>();
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
Expand Down
65 changes: 41 additions & 24 deletions differential-dataflow/src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::{Data, ExchangeData, Collection, AsCollection, Hashable};
use crate::difference::Semigroup;
use crate::lattice::Lattice;
use crate::trace::{self, Trace, TraceReader, BatchReader, Batcher, Builder, Cursor};
use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine, ValBatcher, ValBuilder, ValSpine};
use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine, ValBatcher, ValBuilder, ValSpine, VecChunker};

use trace::wrappers::enter::{TraceEnter, BatchEnter,};
use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
Expand Down Expand Up @@ -76,7 +76,7 @@ where
use ::timely::dataflow::scopes::Child;
use ::timely::progress::timestamp::Refines;
use timely::Container;
use timely::container::PushInto;
use timely::container::{ContainerBuilder, PushInto};

impl<G, Tr> Arranged<G, Tr>
where
Expand Down Expand Up @@ -348,20 +348,22 @@ where
G: Scope<Timestamp: Lattice>,
{
/// Arranges updates into a shared trace.
fn arrange<Ba, Bu, Tr>(&self) -> Arranged<G, TraceAgent<Tr>>
fn arrange<Chu, Ba, Bu, Tr>(&self) -> Arranged<G, TraceAgent<Tr>>
where
Ba: Batcher<Input=C, Time=G::Timestamp> + 'static,
Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
Chu: ContainerBuilder<Container=Ba::Container> + for<'a> PushInto<&'a mut C>,
Ba: Batcher<Time=G::Timestamp> + 'static,
Bu: Builder<Time=G::Timestamp, Input=Ba::Container, Output = Tr::Batch>,
Tr: Trace<Time=G::Timestamp> + 'static,
{
self.arrange_named::<Ba, Bu, Tr>("Arrange")
self.arrange_named::<Chu, Ba, Bu, Tr>("Arrange")
}

/// Arranges updates into a shared trace, with a supplied name.
fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
fn arrange_named<Chu, Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
Ba: Batcher<Input=C, Time=G::Timestamp> + 'static,
Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
Chu: ContainerBuilder<Container=Ba::Container> + for<'a> PushInto<&'a mut C>,
Ba: Batcher<Time=G::Timestamp> + 'static,
Bu: Builder<Time=G::Timestamp, Input=Ba::Container, Output = Tr::Batch>,
Tr: Trace<Time=G::Timestamp> + 'static,
;
}
Expand All @@ -373,14 +375,15 @@ where
V: ExchangeData,
R: ExchangeData + Semigroup,
{
fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
fn arrange_named<Chu, Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
Ba: Batcher<Input=Vec<((K, V), G::Timestamp, R)>, Time=G::Timestamp> + 'static,
Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
Chu: ContainerBuilder<Container=Ba::Container> + for<'a> PushInto<&'a mut Vec<((K, V), G::Timestamp, R)>>,
Ba: Batcher<Time=G::Timestamp> + 'static,
Bu: Builder<Time=G::Timestamp, Input=Ba::Container, Output = Tr::Batch>,
Tr: Trace<Time=G::Timestamp> + 'static,
{
let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
arrange_core::<_, _, Ba, Bu, _>(&self.inner, exchange, name)
arrange_core::<_, _, _, Chu, Ba, Bu, _>(&self.inner, exchange, name)
}
}

Expand All @@ -389,12 +392,14 @@ where
/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
/// It uses the supplied parallelization contract to distribute the data, which does not need to
/// be consistently by key (though this is the most common).
pub fn arrange_core<G, P, Ba, Bu, Tr>(stream: &StreamCore<G, Ba::Input>, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
pub fn arrange_core<G, P, C, Chu, Ba, Bu, Tr>(stream: &StreamCore<G, C>, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
G: Scope<Timestamp: Lattice>,
P: ParallelizationContract<G::Timestamp, Ba::Input>,
Ba: Batcher<Time=G::Timestamp,Input: Container + Clone + 'static> + 'static,
Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
P: ParallelizationContract<G::Timestamp, C>,
C: Container + Clone + 'static,
Chu: ContainerBuilder<Container=Ba::Container> + for<'a> PushInto<&'a mut C>,
Ba: Batcher<Time=G::Timestamp> + 'static,
Bu: Builder<Time=G::Timestamp, Input=Ba::Container, Output = Tr::Batch>,
Tr: Trace<Time=G::Timestamp>+'static,
{
// The `Arrange` operator is tasked with reacting to an advancing input
Expand Down Expand Up @@ -443,6 +448,8 @@ where
// Initialize to the minimal input frontier.
let mut prev_frontier = Antichain::from_elem(<G::Timestamp as Timestamp>::minimum());

let mut chunker = Chu::default();

move |input, output| {

// As we receive data, we need to (i) stash the data and (ii) keep *enough* capabilities.
Expand All @@ -451,7 +458,11 @@ where

input.for_each(|cap, data| {
capabilities.insert(cap.retain());
batcher.push_container(data);
chunker.push_into(data);
while let Some(chunk) = chunker.extract() {
let chunk = std::mem::take(chunk);
batcher.push_into(chunk);
}
});

// The frontier may have advanced by multiple elements, which is an issue because
Expand Down Expand Up @@ -481,6 +492,11 @@ where
// If there is at least one capability not in advance of the input frontier ...
if capabilities.elements().iter().any(|c| !input.frontier().less_equal(c.time())) {

while let Some(chunk) = chunker.finish() {
let chunk = std::mem::take(chunk);
batcher.push_into(chunk);
}

let mut upper = Antichain::new(); // re-used allocation for sealing batches.

// For each capability not in advance of the input frontier ...
Expand Down Expand Up @@ -547,14 +563,15 @@ impl<G, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> Arrange<G, Vec<((K,
where
G: Scope<Timestamp: Lattice+Ord>,
{
fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
fn arrange_named<Chu, Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
Ba: Batcher<Input=Vec<((K,()),G::Timestamp,R)>, Time=G::Timestamp> + 'static,
Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
Chu: ContainerBuilder<Container=Ba::Container> + for<'a> PushInto<&'a mut Vec<((K, ()), G::Timestamp, R)>>,
Ba: Batcher<Time=G::Timestamp> + 'static,
Bu: Builder<Time=G::Timestamp, Input=Ba::Container, Output = Tr::Batch>,
Tr: Trace<Time=G::Timestamp> + 'static,
{
let exchange = Exchange::new(move |update: &((K,()),G::Timestamp,R)| (update.0).0.hashed().into());
arrange_core::<_,_,Ba,Bu,_>(&self.map(|k| (k, ())).inner, exchange, name)
arrange_core::<_,_,_,Chu,Ba,Bu,_>(&self.map(|k| (k, ())).inner, exchange, name)
}
}

Expand Down Expand Up @@ -587,7 +604,7 @@ where
}

fn arrange_by_key_named(&self, name: &str) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>> {
self.arrange_named::<ValBatcher<_,_,_,_>,ValBuilder<_,_,_,_>,_>(name)
self.arrange_named::<VecChunker<_>, ValBatcher<_,_,_,_>,ValBuilder<_,_,_,_>,_>(name)
}
}

Expand Down Expand Up @@ -622,6 +639,6 @@ where

fn arrange_by_self_named(&self, name: &str) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>> {
self.map(|k| (k, ()))
.arrange_named::<KeyBatcher<_,_,_>,KeyBuilder<_,_,_>,_>(name)
.arrange_named::<VecChunker<_>, KeyBatcher<_,_,_>,KeyBuilder<_,_,_>,_>(name)
}
}
14 changes: 8 additions & 6 deletions differential-dataflow/src/operators/consolidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
//! underlying system can more clearly see that no work must be done in the later case, and we can
//! drop out of, e.g. iterative computations.

use timely::container::{ContainerBuilder, PushInto};
use timely::dataflow::Scope;

use crate::{Collection, ExchangeData, Hashable};
Expand Down Expand Up @@ -44,22 +45,23 @@ where
/// });
/// ```
pub fn consolidate(&self) -> Self {
use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine};
self.consolidate_named::<KeyBatcher<_, _, _>,KeyBuilder<_,_,_>, KeySpine<_,_,_>,_>("Consolidate", |key,&()| key.clone())
use crate::trace::implementations::{VecChunker, KeyBatcher, KeyBuilder, KeySpine};
self.consolidate_named::<VecChunker<_>,KeyBatcher<_, _, _>,KeyBuilder<_,_,_>, KeySpine<_,_,_>,_>("Consolidate", |key,&()| key.clone())
}

/// As `consolidate` but with the ability to name the operator, specify the trace type,
/// and provide the function `reify` to produce owned keys and values..
pub fn consolidate_named<Ba, Bu, Tr, F>(&self, name: &str, reify: F) -> Self
pub fn consolidate_named<Chu, Ba, Bu, Tr, F>(&self, name: &str, reify: F) -> Self
where
Ba: Batcher<Input=Vec<((D,()),G::Timestamp,R)>, Time=G::Timestamp> + 'static,
Chu: ContainerBuilder<Container=Ba::Container> + for<'a> PushInto<&'a mut Vec<((D, ()), G::Timestamp, R)>>,
Ba: Batcher<Time=G::Timestamp> + 'static,
Tr: for<'a> crate::trace::Trace<Time=G::Timestamp,Diff=R>+'static,
Bu: Builder<Time=Tr::Time, Input=Ba::Output, Output=Tr::Batch>,
Bu: Builder<Time=Tr::Time, Input=Ba::Container, Output=Tr::Batch>,
F: Fn(Tr::Key<'_>, Tr::Val<'_>) -> D + 'static,
{
use crate::operators::arrange::arrangement::Arrange;
self.map(|k| (k, ()))
.arrange_named::<Ba, Bu, Tr>(name)
.arrange_named::<Chu, Ba, Bu, Tr>(name)
.as_collection(reify)
}

Expand Down
51 changes: 15 additions & 36 deletions differential-dataflow/src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,16 @@
//! Implementations of `MergeBatcher` can be instantiated through the choice of both
//! the chunker and the merger, provided their respective output and input types align.

use std::marker::PhantomData;

use timely::progress::frontier::AntichainRef;
use timely::progress::{frontier::Antichain, Timestamp};
use timely::Container;
use timely::container::{ContainerBuilder, PushInto};
use timely::container::PushInto;

use crate::logging::{BatcherEvent, Logger};
use crate::trace::{Batcher, Builder, Description};

/// Creates batches from containers of unordered tuples.
///
/// To implement `Batcher`, the container builder `C` must accept `&mut Input` as inputs,
/// and must produce outputs of type `M::Chunk`.
pub struct MergeBatcher<Input, C, M: Merger> {
/// Transforms input streams to chunks of sorted, consolidated data.
chunker: C,
pub struct MergeBatcher<M: Merger> {
/// A sequence of power-of-two length lists of sorted, consolidated containers.
///
/// Do not push/pop directly but use the corresponding functions ([`Self::chain_push`]/[`Self::chain_pop`]).
Expand All @@ -43,54 +36,32 @@ pub struct MergeBatcher<Input, C, M: Merger> {
logger: Option<Logger>,
/// Timely operator ID.
operator_id: usize,
/// The `Input` type needs to be called out as the type of container accepted, but it is not otherwise present.
_marker: PhantomData<Input>,
}

impl<Input, C, M> Batcher for MergeBatcher<Input, C, M>
impl<M> Batcher for MergeBatcher<M>
where
C: ContainerBuilder<Container=M::Chunk> + Default + for<'a> PushInto<&'a mut Input>,
M: Merger<Time: Timestamp>,
{
type Input = Input;
type Time = M::Time;
type Output = M::Chunk;
type Container = M::Chunk;

fn new(logger: Option<Logger>, operator_id: usize) -> Self {
Self {
logger,
operator_id,
chunker: C::default(),
merger: M::default(),
chains: Vec::new(),
stash: Vec::new(),
frontier: Antichain::new(),
lower: Antichain::from_elem(M::Time::minimum()),
_marker: PhantomData,
}
}

/// Push a container of data into this merge batcher. Updates the internal chain structure if
/// needed.
fn push_container(&mut self, container: &mut Input) {
self.chunker.push_into(container);
while let Some(chunk) = self.chunker.extract() {
let chunk = std::mem::take(chunk);
self.insert_chain(vec![chunk]);
}
}

// Sealing a batch means finding those updates with times not greater or equal to any time
// in `upper`. All updates must have time greater or equal to the previously used `upper`,
// which we call `lower`, by assumption that after sealing a batcher we receive no more
// updates with times not greater or equal to `upper`.
fn seal<B: Builder<Input = Self::Output, Time = Self::Time>>(&mut self, upper: Antichain<M::Time>) -> B::Output {
// Finish
while let Some(chunk) = self.chunker.finish() {
let chunk = std::mem::take(chunk);
self.insert_chain(vec![chunk]);
}

fn seal<B: Builder<Input = Self::Container, Time = Self::Time>>(&mut self, upper: Antichain<M::Time>) -> B::Output {
// Merge all remaining chains into a single chain.
while self.chains.len() > 1 {
let list1 = self.chain_pop().unwrap();
Expand Down Expand Up @@ -125,8 +96,16 @@ where
self.frontier.borrow()
}
}
impl<M> PushInto<M::Chunk> for MergeBatcher<M>
where
M: Merger,
{
fn push_into(&mut self, item: M::Chunk) {
self.insert_chain(vec![item]);
}
}

impl<Input, C, M: Merger> MergeBatcher<Input, C, M> {
impl<M: Merger> MergeBatcher<M> {
/// Insert a chain and maintain chain properties: Chains are geometrically sized and ordered
/// by decreasing length.
fn insert_chain(&mut self, chain: Vec<M::Chunk>) {
Expand Down Expand Up @@ -190,7 +169,7 @@ impl<Input, C, M: Merger> MergeBatcher<Input, C, M> {
}
}

impl<Input, C, M: Merger> Drop for MergeBatcher<Input, C, M> {
impl<M: Merger> Drop for MergeBatcher<M> {
fn drop(&mut self) {
// Cleanup chain to retract accounting information.
while self.chain_pop().is_some() {}
Expand Down
1 change: 1 addition & 0 deletions differential-dataflow/src/trace/implementations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub mod huffman_container;
pub mod chunker;

// Opinionated takes on default spines.
pub use self::chunker::{ColumnationChunker, VecChunker};
pub use self::ord_neu::OrdValSpine as ValSpine;
pub use self::ord_neu::OrdValBatcher as ValBatcher;
pub use self::ord_neu::RcOrdValBuilder as ValBuilder;
Expand Down
Loading
Loading