Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 36 additions & 41 deletions differential-dataflow/examples/columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,11 +581,10 @@ pub mod dd_builder {

use timely::container::PushInto;

use differential_dataflow::IntoOwned;
use differential_dataflow::trace::Builder;
use differential_dataflow::trace::Description;
use differential_dataflow::trace::implementations::Layout;
use differential_dataflow::trace::implementations::Update;
use differential_dataflow::trace::implementations::layout;
use differential_dataflow::trace::implementations::BatchContainer;
use differential_dataflow::trace::implementations::ord_neu::{OrdValBatch, val_batch::OrdValStorage, OrdKeyBatch, Vals, Upds, layers::UpdsBuilder};
use differential_dataflow::trace::implementations::ord_neu::key_batch::OrdKeyStorage;
Expand All @@ -611,18 +610,16 @@ pub mod dd_builder {
impl<L> Builder for OrdValBuilder<L>
where
L: Layout,
<L::KeyContainer as BatchContainer>::Owned: Columnar,
<L::ValContainer as BatchContainer>::Owned: Columnar,
<L::TimeContainer as BatchContainer>::Owned: Columnar,
<L::DiffContainer as BatchContainer>::Owned: Columnar,
layout::Key<L>: Columnar,
layout::Val<L>: Columnar,
layout::Time<L>: Columnar,
layout::Diff<L>: Columnar,
// These two constraints seem .. like we could potentially replace by `Columnar::Ref<'a>`.
for<'a> L::KeyContainer: PushInto<&'a <L::KeyContainer as BatchContainer>::Owned>,
for<'a> L::ValContainer: PushInto<&'a <L::ValContainer as BatchContainer>::Owned>,
for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
for<'a> L::KeyContainer: PushInto<&'a layout::Key<L>>,
for<'a> L::ValContainer: PushInto<&'a layout::Val<L>>,
{
type Input = Column<((<L::KeyContainer as BatchContainer>::Owned,<L::ValContainer as BatchContainer>::Owned),<L::TimeContainer as BatchContainer>::Owned,<L::DiffContainer as BatchContainer>::Owned)>;
type Time = <L::Target as Update>::Time;
type Input = Column<((layout::Key<L>,layout::Val<L>),layout::Time<L>,layout::Diff<L>)>;
type Time = layout::Time<L>;
type Output = OrdValBatch<L>;

fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
Expand All @@ -648,44 +645,44 @@ pub mod dd_builder {

for ((key,val),time,diff) in chunk.drain() {
// It would be great to avoid.
let key = <<L::KeyContainer as BatchContainer>::Owned as Columnar>::into_owned(key);
let val = <<L::ValContainer as BatchContainer>::Owned as Columnar>::into_owned(val);
let key = <layout::Key<L> as Columnar>::into_owned(key);
let val = <layout::Val<L> as Columnar>::into_owned(val);
// These feel fine (wrt the other versions)
let time = <<L::TimeContainer as BatchContainer>::Owned as Columnar>::into_owned(time);
let diff = <<L::DiffContainer as BatchContainer>::Owned as Columnar>::into_owned(diff);
let time = <layout::Time<L> as Columnar>::into_owned(time);
let diff = <layout::Diff<L> as Columnar>::into_owned(diff);

// Pre-load the first update.
if self.result.keys.is_empty() {
self.result.vals.vals.push(&val);
self.result.keys.push(&key);
self.result.vals.vals.push_into(&val);
self.result.keys.push_into(&key);
self.staging.push(time, diff);
}
// Perhaps this is a continuation of an already received key.
else if self.result.keys.last().map(|k| <<L::KeyContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&key).eq(&k)).unwrap_or(false) {
else if self.result.keys.last().map(|k| L::KeyContainer::borrow_as(&key).eq(&k)).unwrap_or(false) {
// Perhaps this is a continuation of an already received value.
if self.result.vals.vals.last().map(|v| <<L::ValContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&val).eq(&v)).unwrap_or(false) {
if self.result.vals.vals.last().map(|v| L::ValContainer::borrow_as(&val).eq(&v)).unwrap_or(false) {
self.staging.push(time, diff);
} else {
// New value; complete representation of prior value.
self.staging.seal(&mut self.result.upds);
self.staging.push(time, diff);
self.result.vals.vals.push(&val);
self.result.vals.vals.push_into(&val);
}
} else {
// New key; complete representation of prior key.
self.staging.seal(&mut self.result.upds);
self.staging.push(time, diff);
self.result.vals.offs.push(self.result.vals.len());
self.result.vals.vals.push(&val);
self.result.keys.push(&key);
self.result.vals.offs.push_ref(self.result.vals.len());
self.result.vals.vals.push_into(&val);
self.result.keys.push_into(&key);
}
}
}

#[inline(never)]
fn done(mut self, description: Description<Self::Time>) -> OrdValBatch<L> {
self.staging.seal(&mut self.result.upds);
self.result.vals.offs.push(self.result.vals.len());
self.result.vals.offs.push_ref(self.result.vals.len());
OrdValBatch {
updates: self.staging.total(),
storage: self.result,
Expand Down Expand Up @@ -718,18 +715,16 @@ pub mod dd_builder {
impl<L> Builder for OrdKeyBuilder<L>
where
L: Layout,
<L::KeyContainer as BatchContainer>::Owned: Columnar,
<L::ValContainer as BatchContainer>::Owned: Columnar,
<L::TimeContainer as BatchContainer>::Owned: Columnar,
<L::DiffContainer as BatchContainer>::Owned: Columnar,
layout::Key<L>: Columnar,
layout::Val<L>: Columnar,
layout::Time<L>: Columnar,
layout::Diff<L>: Columnar,
// These two constraints seem .. like we could potentially replace by `Columnar::Ref<'a>`.
for<'a> L::KeyContainer: PushInto<&'a <L::KeyContainer as BatchContainer>::Owned>,
for<'a> L::ValContainer: PushInto<&'a <L::ValContainer as BatchContainer>::Owned>,
for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
for<'a> L::KeyContainer: PushInto<&'a layout::Key<L>>,
for<'a> L::ValContainer: PushInto<&'a layout::Val<L>>,
{
type Input = Column<((<L::KeyContainer as BatchContainer>::Owned,<L::ValContainer as BatchContainer>::Owned),<L::TimeContainer as BatchContainer>::Owned,<L::DiffContainer as BatchContainer>::Owned)>;
type Time = <L::Target as Update>::Time;
type Input = Column<((layout::Key<L>,layout::Val<L>),layout::Time<L>,layout::Diff<L>)>;
type Time = layout::Time<L>;
type Output = OrdKeyBatch<L>;

fn with_capacity(keys: usize, _vals: usize, upds: usize) -> Self {
Expand All @@ -754,24 +749,24 @@ pub mod dd_builder {

for ((key,_val),time,diff) in chunk.drain() {
// It would be great to avoid.
let key = <<L::KeyContainer as BatchContainer>::Owned as Columnar>::into_owned(key);
let key = <layout::Key<L> as Columnar>::into_owned(key);
// These feel fine (wrt the other versions)
let time = <<L::TimeContainer as BatchContainer>::Owned as Columnar>::into_owned(time);
let diff = <<L::DiffContainer as BatchContainer>::Owned as Columnar>::into_owned(diff);
let time = <layout::Time<L> as Columnar>::into_owned(time);
let diff = <layout::Diff<L> as Columnar>::into_owned(diff);

// Pre-load the first update.
if self.result.keys.is_empty() {
self.result.keys.push(&key);
self.result.keys.push_into(&key);
self.staging.push(time, diff);
}
// Perhaps this is a continuation of an already received key.
else if self.result.keys.last().map(|k| <<L::KeyContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&key).eq(&k)).unwrap_or(false) {
else if self.result.keys.last().map(|k| L::KeyContainer::borrow_as(&key).eq(&k)).unwrap_or(false) {
self.staging.push(time, diff);
} else {
// New key; complete representation of prior key.
self.staging.seal(&mut self.result.upds);
self.staging.push(time, diff);
self.result.keys.push(&key);
self.result.keys.push_into(&key);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion differential-dataflow/examples/cursors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ fn main() {

/* Return trace content after the last round. */
let (mut cursor, storage) = graph_trace.cursor();
cursor.to_vec(&storage)
cursor.to_vec(&storage, |k| k.clone(), |v| v.clone())
})
.unwrap().join();

Expand Down
16 changes: 0 additions & 16 deletions differential-dataflow/examples/spines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,22 +48,6 @@ fn main() {
keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
"slc" => {

use differential_dataflow::trace::implementations::ord_neu::{PreferredBatcher, PreferredBuilder, PreferredSpine};

let data =
data.map(|x| (x.clone().into_bytes(), x.into_bytes()))
.arrange::<PreferredBatcher<[u8],[u8],_,_>, PreferredBuilder<[u8],[u8],_,_>, PreferredSpine<[u8],[u8],_,_>>()
.reduce_abelian::<_, _, _, PreferredBuilder<[u8],(),_,_>, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));
let keys =
keys.map(|x| (x.clone().into_bytes(), 7))
.arrange::<PreferredBatcher<[u8],u8,_,_>, PreferredBuilder<[u8],u8,_,_>, PreferredSpine<[u8],u8,_,_>>()
.reduce_abelian::<_, _, _, PreferredBuilder<[u8],(),_,_>,PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));

keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
.probe_with(&mut probe);
},
_ => {
println!("unrecognized mode: {:?}", mode)
}
Expand Down
15 changes: 10 additions & 5 deletions differential-dataflow/src/consolidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::cmp::Ordering;
use std::collections::VecDeque;
use timely::Container;
use timely::container::{ContainerBuilder, PushInto};
use crate::{IntoOwned, Data};
use crate::Data;
use crate::difference::{IsZero, Semigroup};

/// Sorts and consolidates `vec`.
Expand Down Expand Up @@ -231,11 +231,14 @@ pub trait ConsolidateLayout: Container {
type Key<'a>: Eq where Self: 'a;

/// GAT diff type.
type Diff<'a>: IntoOwned<'a, Owned = Self::DiffOwned> where Self: 'a;
type Diff<'a>;

/// Owned diff type.
type DiffOwned: for<'a> Semigroup<Self::Diff<'a>>;

/// Converts a reference diff into an owned diff.
fn owned_diff(diff: Self::Diff<'_>) -> Self::DiffOwned;

/// Deconstruct an item into key and diff. Must be cheap.
fn into_parts(item: Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>);

Expand Down Expand Up @@ -266,7 +269,7 @@ pub trait ConsolidateLayout: Container {

let (k, d) = Self::into_parts(item);
let mut prev_key = k;
let mut prev_diff = d.into_owned();
let mut prev_diff = Self::owned_diff(d);

for item in iter {
let (next_key, next_diff) = Self::into_parts(item);
Expand All @@ -278,7 +281,7 @@ pub trait ConsolidateLayout: Container {
target.push_with_diff(prev_key, prev_diff);
}
prev_key = next_key;
prev_diff = next_diff.into_owned();
prev_diff = Self::owned_diff(next_diff);
}
}

Expand All @@ -293,12 +296,14 @@ impl<D, T, R> ConsolidateLayout for Vec<(D, T, R)>
where
D: Ord + Clone + 'static,
T: Ord + Clone + 'static,
R: Semigroup + for<'a> IntoOwned<'a, Owned = R> + Clone + 'static,
R: Semigroup + Clone + 'static,
{
type Key<'a> = (D, T) where Self: 'a;
type Diff<'a> = R where Self: 'a;
type DiffOwned = R;

fn owned_diff(diff: Self::Diff<'_>) -> Self::DiffOwned { diff }

fn into_parts((data, time, diff): Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) {
((data, time), diff)
}
Expand Down
2 changes: 1 addition & 1 deletion differential-dataflow/src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ where
while let Some(val) = cursor.get_val(batch) {
for datum in logic(key, val) {
cursor.map_times(batch, |time, diff| {
session.give((datum.clone(), time.into_owned(), diff.into_owned()));
session.give((datum.clone(), Tr::owned_time(time), Tr::owned_diff(diff)));
});
}
cursor.step_val(batch);
Expand Down
2 changes: 1 addition & 1 deletion differential-dataflow/src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ where
// Determine the prior value associated with the key.
while let Some(val) = trace_cursor.get_val(&trace_storage) {
let mut count = 0;
trace_cursor.map_times(&trace_storage, |_time, diff| count += diff.into_owned());
trace_cursor.map_times(&trace_storage, |_time, diff| count += Tr::owned_diff(diff));
assert!(count == 0 || count == 1);
if count == 1 {
assert!(prev_value.is_none());
Expand Down
14 changes: 8 additions & 6 deletions differential-dataflow/src/operators/consolidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

use timely::dataflow::Scope;

use crate::{IntoOwned, Collection, ExchangeData, Hashable};
use crate::{Collection, ExchangeData, Hashable};
use crate::consolidation::ConsolidatingContainerBuilder;
use crate::difference::Semigroup;

Expand Down Expand Up @@ -45,20 +45,22 @@ where
/// ```
pub fn consolidate(&self) -> Self {
use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine};
self.consolidate_named::<KeyBatcher<_, _, _>,KeyBuilder<_,_,_>, KeySpine<_,_,_>>("Consolidate")
self.consolidate_named::<KeyBatcher<_, _, _>,KeyBuilder<_,_,_>, KeySpine<_,_,_>,_>("Consolidate", |key,&()| key.clone())
}

/// As `consolidate` but with the ability to name the operator and specify the trace type.
pub fn consolidate_named<Ba, Bu, Tr>(&self, name: &str) -> Self
/// As `consolidate` but with the ability to name the operator, specify the trace type,
/// and provide the function `reify` to produce owned keys and values..
pub fn consolidate_named<Ba, Bu, Tr, F>(&self, name: &str, reify: F) -> Self
where
Ba: Batcher<Input=Vec<((D,()),G::Timestamp,R)>, Time=G::Timestamp> + 'static,
Tr: for<'a> crate::trace::Trace<Key<'a>: IntoOwned<'a, Owned = D>,Time=G::Timestamp,Diff=R>+'static,
Tr: for<'a> crate::trace::Trace<Time=G::Timestamp,Diff=R>+'static,
Bu: Builder<Time=Tr::Time, Input=Ba::Output, Output=Tr::Batch>,
F: Fn(Tr::Key<'_>, Tr::Val<'_>) -> D + 'static,
{
use crate::operators::arrange::arrangement::Arrange;
self.map(|k| (k, ()))
.arrange_named::<Ba, Bu, Tr>(name)
.as_collection(|d, _| d.into_owned())
.as_collection(reify)
}

/// Aggregates the weights of equal records.
Expand Down
12 changes: 6 additions & 6 deletions differential-dataflow/src/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use timely::dataflow::operators::Operator;
use timely::dataflow::channels::pact::Pipeline;

use crate::lattice::Lattice;
use crate::{IntoOwned, ExchangeData, Collection};
use crate::{ExchangeData, Collection};
use crate::difference::{IsZero, Semigroup};
use crate::hashable::Hashable;
use crate::collection::AsCollection;
Expand Down Expand Up @@ -56,7 +56,7 @@ impl<G, K, T1> CountTotal<G, K, T1::Diff> for Arranged<G, T1>
where
G: Scope<Timestamp=T1::Time>,
T1: for<'a> TraceReader<
Key<'a>: IntoOwned<'a, Owned = K>,
Key<'a> = &'a K,
Val<'a>=&'a (),
Time: TotalOrder,
Diff: ExchangeData+Semigroup<T1::DiffGat<'a>>
Expand Down Expand Up @@ -109,22 +109,22 @@ where
if trace_cursor.get_key(&trace_storage) == Some(key) {
trace_cursor.map_times(&trace_storage, |_, diff| {
count.as_mut().map(|c| c.plus_equals(&diff));
if count.is_none() { count = Some(diff.into_owned()); }
if count.is_none() { count = Some(T1::owned_diff(diff)); }
});
}

batch_cursor.map_times(&batch_storage, |time, diff| {

if let Some(count) = count.as_ref() {
if !count.is_zero() {
session.give(((key.into_owned(), count.clone()), time.into_owned(), R2::from(-1i8)));
session.give(((key.clone(), count.clone()), T1::owned_time(time), R2::from(-1i8)));
}
}
count.as_mut().map(|c| c.plus_equals(&diff));
if count.is_none() { count = Some(diff.into_owned()); }
if count.is_none() { count = Some(T1::owned_diff(diff)); }
if let Some(count) = count.as_ref() {
if !count.is_zero() {
session.give(((key.into_owned(), count.clone()), time.into_owned(), R2::from(1i8)));
session.give(((key.clone(), count.clone()), T1::owned_time(time), R2::from(1i8)));
}
}
});
Expand Down
8 changes: 3 additions & 5 deletions differential-dataflow/src/operators/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ where
V: Data + 'static,
{
fn join_map<V2: ExchangeData, R2: ExchangeData+Semigroup, D: Data, L>(&self, other: &Collection<G, (K, V2), R2>, mut logic: L) -> Collection<G, D, <Tr::Diff as Multiply<R2>>::Output>
where
where
Tr::Diff: Multiply<R2, Output: Semigroup+'static>,
L: for<'a> FnMut(Tr::Key<'a>, Tr::Val<'a>, &V2)->D+'static,
{
Expand Down Expand Up @@ -660,14 +660,12 @@ where
Ordering::Greater => batch.seek_key(batch_storage, trace_key),
Ordering::Equal => {

use crate::IntoOwned;

thinker.history1.edits.load(trace, trace_storage, |time| {
let mut time = time.into_owned();
let mut time = C1::owned_time(time);
time.join_assign(meet);
time
});
thinker.history2.edits.load(batch, batch_storage, |time| time.into_owned());
thinker.history2.edits.load(batch, batch_storage, |time| C2::owned_time(time));

// populate `temp` with the results in the best way we know how.
thinker.think(|v1,v2,t,r1,r2| {
Expand Down
Loading
Loading