Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/monoid-bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ where G::Timestamp: Lattice+Ord {

use differential_dataflow::operators::iterate::SemigroupVariable;
use differential_dataflow::operators::reduce::ReduceCore;
use differential_dataflow::trace::implementations::KeySpine;
use differential_dataflow::trace::implementations::{KeySpine, KeyBuilder};


use timely::order::Product;
Expand All @@ -146,7 +146,7 @@ where G::Timestamp: Lattice+Ord {
.join_map(&edges, |_k,&(),d| *d)
.concat(&roots)
.map(|x| (x,()))
.reduce_core::<_,KeySpine<_,_,_>>("Reduce", |_key, input, output, updates| {
.reduce_core::<_,KeyBuilder<_,_,_>,KeySpine<_,_,_>>("Reduce", |_key, input, output, updates| {
if output.is_empty() || input[0].1 < output[0].1 {
updates.push(((), input[0].1));
}
Expand Down
34 changes: 17 additions & 17 deletions examples/spines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,46 +28,46 @@ fn main() {

match mode.as_str() {
"new" => {
use differential_dataflow::trace::implementations::ord_neu::{ColKeyBatcher, ColKeySpine};
let data = data.arrange::<ColKeyBatcher<_,_,_>, ColKeySpine<_,_,_>>();
let keys = keys.arrange::<ColKeyBatcher<_,_,_>, ColKeySpine<_,_,_>>();
use differential_dataflow::trace::implementations::ord_neu::{ColKeyBatcher, ColKeyBuilder, ColKeySpine};
let data = data.arrange::<ColKeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>();
let keys = keys.arrange::<ColKeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>();
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
"old" => {
use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatcher, OrdKeySpine};
let data = data.arrange::<OrdKeyBatcher<_,_,_>, OrdKeySpine<_,_,_>>();
let keys = keys.arrange::<OrdKeyBatcher<_,_,_>, OrdKeySpine<_,_,_>>();
use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatcher, RcOrdKeyBuilder, OrdKeySpine};
let data = data.arrange::<OrdKeyBatcher<_,_,_>, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>();
let keys = keys.arrange::<OrdKeyBatcher<_,_,_>, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>();
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
"rhh" => {
use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecBatcher, VecSpine};
let data = data.map(|x| HashWrapper { inner: x }).arrange::<VecBatcher<_,(),_,_>, VecSpine<_,(),_,_>>();
let keys = keys.map(|x| HashWrapper { inner: x }).arrange::<VecBatcher<_,(),_,_>, VecSpine<_,(),_,_>>();
use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecBatcher, VecBuilder, VecSpine};
let data = data.map(|x| HashWrapper { inner: x }).arrange::<VecBatcher<_,(),_,_>, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>();
let keys = keys.map(|x| HashWrapper { inner: x }).arrange::<VecBatcher<_,(),_,_>, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>();
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
"slc" => {

use differential_dataflow::trace::implementations::ord_neu::{PreferredBatcher, PreferredSpine};
use differential_dataflow::trace::implementations::ord_neu::{PreferredBatcher, PreferredBuilder, PreferredSpine};

let data =
data.map(|x| (x.clone().into_bytes(), x.into_bytes()))
.arrange::<PreferredBatcher<[u8],[u8],_,_>, PreferredSpine<[u8],[u8],_,_>>()
.reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));
.arrange::<PreferredBatcher<[u8],[u8],_,_>, PreferredBuilder<[u8],[u8],_,_>, PreferredSpine<[u8],[u8],_,_>>()
.reduce_abelian::<_, _, _, PreferredBuilder<[u8],(),_,_>, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));
let keys =
keys.map(|x| (x.clone().into_bytes(), 7))
.arrange::<PreferredBatcher<[u8],u8,_,_>, PreferredSpine<[u8],u8,_,_>>()
.reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));
.arrange::<PreferredBatcher<[u8],u8,_,_>, PreferredBuilder<[u8],u8,_,_>, PreferredSpine<[u8],u8,_,_>>()
.reduce_abelian::<_, _, _, PreferredBuilder<[u8],(),_,_>,PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));

keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
"flat" => {
use differential_dataflow::trace::implementations::ord_neu::{FlatKeyBatcherDefault, FlatKeySpineDefault};
let data = data.arrange::<FlatKeyBatcherDefault<String,usize,isize,_>, FlatKeySpineDefault<String,usize,isize>>();
let keys = keys.arrange::<FlatKeyBatcherDefault<String,usize,isize,_>, FlatKeySpineDefault<String,usize,isize>>();
use differential_dataflow::trace::implementations::ord_neu::{FlatKeyBatcherDefault, FlatKeyBuilderDefault, FlatKeySpineDefault};
let data = data.arrange::<FlatKeyBatcherDefault<String,usize,isize,_>, FlatKeyBuilderDefault<String,usize,isize>, FlatKeySpineDefault<String,usize,isize>>();
let keys = keys.arrange::<FlatKeyBatcherDefault<String,usize,isize,_>, FlatKeyBuilderDefault<String,usize,isize>, FlatKeySpineDefault<String,usize,isize>>();
keys.join_core(&data, |_k, (), ()| Option::<()>::None)
.probe_with(&mut probe);
}
Expand Down
4 changes: 2 additions & 2 deletions src/algorithms/graphs/propagate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ where

use crate::operators::reduce::ReduceCore;
use crate::operators::iterate::SemigroupVariable;
use crate::trace::implementations::ValSpine;
use crate::trace::implementations::{ValBuilder, ValSpine};

use timely::order::Product;

Expand All @@ -96,7 +96,7 @@ where
let labels =
proposals
.concat(&nodes)
.reduce_abelian::<_,ValSpine<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1_i8))));
.reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1_i8))));

let propagate: Collection<_, (N, L), R> =
labels
Expand Down
50 changes: 26 additions & 24 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::{Data, ExchangeData, Collection, AsCollection, Hashable};
use crate::difference::Semigroup;
use crate::lattice::Lattice;
use crate::trace::{self, Trace, TraceReader, Batch, BatchReader, Batcher, Builder, Cursor};
use crate::trace::implementations::{KeyBatcher, KeySpine, ValBatcher, ValSpine};
use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine, ValBatcher, ValBuilder, ValSpine};

use trace::wrappers::enter::{TraceEnter, BatchEnter,};
use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
Expand Down Expand Up @@ -289,7 +289,7 @@ where
T1: TraceReader + Clone + 'static,
{
/// A direct implementation of `ReduceCore::reduce_abelian`.
pub fn reduce_abelian<L, K, V, T2>(&self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
pub fn reduce_abelian<L, K, V, Bu, T2>(&self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
where
for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
T2: for<'a> Trace<Key<'a>= T1::Key<'a>, Time=T1::Time>+'static,
Expand All @@ -298,10 +298,11 @@ where
for<'a> T2::Val<'a> : IntoOwned<'a, Owned = V>,
T2::Diff: Abelian,
T2::Batch: Batch,
<T2::Builder as Builder>::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>,
Bu: Builder<Time=G::Timestamp, Output = T2::Batch>,
Bu::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static,
{
self.reduce_core::<_,K,V,T2>(name, move |key, input, output, change| {
self.reduce_core::<_,K,V,Bu,T2>(name, move |key, input, output, change| {
if !input.is_empty() {
logic(key, input, change);
}
Expand All @@ -311,19 +312,20 @@ where
}

/// A direct implementation of `ReduceCore::reduce_core`.
pub fn reduce_core<L, K, V, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
pub fn reduce_core<L, K, V, Bu, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
where
for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
T2: for<'a> Trace<Key<'a>=T1::Key<'a>, Time=T1::Time>+'static,
K: Ord + 'static,
V: Data,
for<'a> T2::Val<'a> : IntoOwned<'a, Owned = V>,
T2::Batch: Batch,
<T2::Builder as Builder>::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>,
Bu: Builder<Time=G::Timestamp, Output = T2::Batch>,
Bu::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
{
use crate::operators::reduce::reduce_trace;
reduce_trace::<_,_,_,_,V,_>(self, name, logic)
reduce_trace::<_,_,Bu,_,_,V,_>(self, name, logic)
}
}

Expand Down Expand Up @@ -353,23 +355,23 @@ where
G::Timestamp: Lattice,
{
/// Arranges updates into a shared trace.
fn arrange<Ba, Tr>(&self) -> Arranged<G, TraceAgent<Tr>>
fn arrange<Ba, Bu, Tr>(&self) -> Arranged<G, TraceAgent<Tr>>
where
Ba: Batcher<Input=C, Time=G::Timestamp> + 'static,
Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
Tr: Trace<Time=G::Timestamp> + 'static,
Tr::Batch: Batch,
Tr::Builder: Builder<Input=Ba::Output>,
{
self.arrange_named::<Ba, Tr>("Arrange")
self.arrange_named::<Ba, Bu, Tr>("Arrange")
}

/// Arranges updates into a shared trace, with a supplied name.
fn arrange_named<Ba, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
Ba: Batcher<Input=C, Time=G::Timestamp> + 'static,
Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
Tr: Trace<Time=G::Timestamp> + 'static,
Tr::Batch: Batch,
Tr::Builder: Builder<Input=Ba::Output>,
;
}

Expand All @@ -381,15 +383,15 @@ where
V: ExchangeData,
R: ExchangeData + Semigroup,
{
fn arrange_named<Ba, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
Ba: Batcher<Input=Vec<((K, V), G::Timestamp, R)>, Time=G::Timestamp> + 'static,
Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
Tr: Trace<Time=G::Timestamp> + 'static,
Tr::Batch: Batch,
Tr::Builder: Builder<Input=Ba::Output>,
{
let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
arrange_core::<_, _, Ba, _>(&self.inner, exchange, name)
arrange_core::<_, _, Ba, Bu, _>(&self.inner, exchange, name)
}
}

Expand All @@ -398,16 +400,16 @@ where
/// This operator arranges a stream of values into a shared trace, whose contents it maintains.
/// It uses the supplied parallelization contract to distribute the data, which does not need to
/// be consistently by key (though this is the most common).
pub fn arrange_core<G, P, Ba, Tr>(stream: &StreamCore<G, Ba::Input>, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
pub fn arrange_core<G, P, Ba, Bu, Tr>(stream: &StreamCore<G, Ba::Input>, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
G: Scope,
G::Timestamp: Lattice,
P: ParallelizationContract<G::Timestamp, Ba::Input>,
Ba: Batcher<Time=G::Timestamp> + 'static,
Ba::Input: Container,
Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
Tr: Trace<Time=G::Timestamp>+'static,
Tr::Batch: Batch,
Tr::Builder: Builder<Input=Ba::Output>,
{
// The `Arrange` operator is tasked with reacting to an advancing input
// frontier by producing the sequence of batches whose lower and upper
Expand Down Expand Up @@ -515,7 +517,7 @@ where
}

// Extract updates not in advance of `upper`.
let batch = batcher.seal::<Tr::Builder>(upper.clone());
let batch = batcher.seal::<Bu>(upper.clone());

writer.insert(batch.clone(), Some(capability.time().clone()));

Expand Down Expand Up @@ -543,7 +545,7 @@ where
}
else {
// Announce progress updates, even without data.
let _batch = batcher.seal::<Tr::Builder>(input.frontier().frontier().to_owned());
let _batch = batcher.seal::<Bu>(input.frontier().frontier().to_owned());
writer.seal(input.frontier().frontier().to_owned());
}

Expand All @@ -562,15 +564,15 @@ impl<G: Scope, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> Arrange<G, V
where
G::Timestamp: Lattice+Ord,
{
fn arrange_named<Ba, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
fn arrange_named<Ba, Bu, Tr>(&self, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
Ba: Batcher<Input=Vec<((K,()),G::Timestamp,R)>, Time=G::Timestamp> + 'static,
Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
Tr: Trace<Time=G::Timestamp> + 'static,
Tr::Batch: Batch,
Tr::Builder: Builder<Input=Ba::Output>,
{
let exchange = Exchange::new(move |update: &((K,()),G::Timestamp,R)| (update.0).0.hashed().into());
arrange_core::<_,_,Ba,_>(&self.map(|k| (k, ())).inner, exchange, name)
arrange_core::<_,_,Ba,Bu,_>(&self.map(|k| (k, ())).inner, exchange, name)
}
}

Expand Down Expand Up @@ -601,7 +603,7 @@ where
}

fn arrange_by_key_named(&self, name: &str) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>> {
self.arrange_named::<ValBatcher<_,_,_,_>, _>(name)
self.arrange_named::<ValBatcher<_,_,_,_>,ValBuilder<_,_,_,_>,_>(name)
}
}

Expand Down Expand Up @@ -636,6 +638,6 @@ where

fn arrange_by_self_named(&self, name: &str) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>> {
self.map(|k| (k, ()))
.arrange_named::<KeyBatcher<_,_,_>, _>(name)
.arrange_named::<KeyBatcher<_,_,_>,KeyBuilder<_,_,_>,_>(name)
}
}
10 changes: 5 additions & 5 deletions src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@
//! worker.dataflow(|scope| {
//!
//! use timely::dataflow::operators::Input;
//! use differential_dataflow::trace::implementations::ValSpine;
//! use differential_dataflow::trace::implementations::{ValBuilder, ValSpine};
//! use differential_dataflow::operators::arrange::upsert;
//!
//! let stream = scope.input_from(&mut input);
//! let arranged = upsert::arrange_from_upsert::<_, _, _, ValSpine<Key, Val, _, _>>(&stream, &"test");
//! let arranged = upsert::arrange_from_upsert::<_, _, _, ValBuilder<Key, Val, _, _>, ValSpine<Key, Val, _, _>>(&stream, &"test");
//!
//! arranged
//! .as_collection(|k,v| (k.clone(), v.clone()))
Expand Down Expand Up @@ -126,7 +126,7 @@ use super::TraceAgent;
/// This method is only implemented for totally ordered times, as we do not yet
/// understand what a "sequence" of upserts would mean for partially ordered
/// timestamps.
pub fn arrange_from_upsert<G, K, V, Tr>(
pub fn arrange_from_upsert<G, K, V, Bu, Tr>(
stream: &Stream<G, (K, Option<V>, G::Timestamp)>,
name: &str,
) -> Arranged<G, TraceAgent<Tr>>
Expand All @@ -139,7 +139,7 @@ where
for<'a> Tr::Val<'a> : IntoOwned<'a, Owned = V>,
Tr::Time: TotalOrder+ExchangeData,
Tr::Batch: Batch,
Tr::Builder: Builder<Input = Vec<((K, V), Tr::Time, Tr::Diff)>>,
Bu: Builder<Time=G::Timestamp, Input = Vec<((K, V), Tr::Time, Tr::Diff)>, Output = Tr::Batch>,
{
let mut reader: Option<TraceAgent<Tr>> = None;

Expand Down Expand Up @@ -240,7 +240,7 @@ where
// Prepare a cursor to the existing arrangement, and a batch builder for
// new stuff that we add.
let (mut trace_cursor, trace_storage) = reader_local.cursor();
let mut builder = Tr::Builder::new();
let mut builder = Bu::new();
for (key, mut list) in to_process.drain(..) {

// The prior value associated with the key.
Expand Down
7 changes: 2 additions & 5 deletions src/operators/arrange/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use std::rc::{Rc, Weak};
use std::cell::RefCell;

use timely::progress::{Antichain, Timestamp};
use timely::progress::Antichain;

use crate::trace::{Trace, Batch, BatchReader};
use crate::trace::wrappers::rc::TraceBox;
Expand Down Expand Up @@ -93,10 +93,7 @@ where
/// Inserts an empty batch up to `upper`.
pub fn seal(&mut self, upper: Antichain<Tr::Time>) {
if self.upper != upper {
use crate::trace::Builder;
let builder = Tr::Builder::new();
let batch = builder.done(self.upper.clone(), upper, Antichain::from_elem(Tr::Time::minimum()));
self.insert(batch, None);
self.insert(Tr::Batch::empty(self.upper.clone(), upper), None);
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions src/operators/consolidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,22 @@ where
/// });
/// ```
pub fn consolidate(&self) -> Self {
use crate::trace::implementations::{KeyBatcher, KeySpine};
self.consolidate_named::<KeyBatcher<_, _, _>, KeySpine<_,_,_>>("Consolidate")
use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine};
self.consolidate_named::<KeyBatcher<_, _, _>,KeyBuilder<_,_,_>, KeySpine<_,_,_>>("Consolidate")
}

/// As `consolidate` but with the ability to name the operator and specify the trace type.
pub fn consolidate_named<Ba, Tr>(&self, name: &str) -> Self
pub fn consolidate_named<Ba, Bu, Tr>(&self, name: &str) -> Self
where
Ba: Batcher<Input=Vec<((D,()),G::Timestamp,R)>, Time=G::Timestamp> + 'static,
Tr: crate::trace::Trace<Time=G::Timestamp,Diff=R>+'static,
for<'a> Tr::Key<'a>: IntoOwned<'a, Owned = D>,
Tr::Batch: crate::trace::Batch,
Tr::Builder: Builder<Input=Ba::Output>,
Bu: Builder<Time=Tr::Time, Input=Ba::Output, Output=Tr::Batch>,
{
use crate::operators::arrange::arrangement::Arrange;
self.map(|k| (k, ()))
.arrange_named::<Ba, Tr>(name)
.arrange_named::<Ba, Bu, Tr>(name)
.as_collection(|d, _| d.into_owned())
}

Expand Down
Loading
Loading