diff --git a/differential-dataflow/examples/columnar.rs b/differential-dataflow/examples/columnar.rs index 6b74eb945..20c33a41b 100644 --- a/differential-dataflow/examples/columnar.rs +++ b/differential-dataflow/examples/columnar.rs @@ -581,11 +581,10 @@ pub mod dd_builder { use timely::container::PushInto; - use differential_dataflow::IntoOwned; use differential_dataflow::trace::Builder; use differential_dataflow::trace::Description; use differential_dataflow::trace::implementations::Layout; - use differential_dataflow::trace::implementations::Update; + use differential_dataflow::trace::implementations::layout; use differential_dataflow::trace::implementations::BatchContainer; use differential_dataflow::trace::implementations::ord_neu::{OrdValBatch, val_batch::OrdValStorage, OrdKeyBatch, Vals, Upds, layers::UpdsBuilder}; use differential_dataflow::trace::implementations::ord_neu::key_batch::OrdKeyStorage; @@ -611,18 +610,16 @@ pub mod dd_builder { impl Builder for OrdValBuilder where L: Layout, - ::Owned: Columnar, - ::Owned: Columnar, - ::Owned: Columnar, - ::Owned: Columnar, + layout::Key: Columnar, + layout::Val: Columnar, + layout::Time: Columnar, + layout::Diff: Columnar, // These two constraints seem .. like we could potentially replace by `Columnar::Ref<'a>`. - for<'a> L::KeyContainer: PushInto<&'a ::Owned>, - for<'a> L::ValContainer: PushInto<&'a ::Owned>, - for<'a> ::ReadItem<'a> : IntoOwned<'a, Owned = ::Time>, - for<'a> ::ReadItem<'a> : IntoOwned<'a, Owned = ::Diff>, + for<'a> L::KeyContainer: PushInto<&'a layout::Key>, + for<'a> L::ValContainer: PushInto<&'a layout::Val>, { - type Input = Column<((::Owned,::Owned),::Owned,::Owned)>; - type Time = ::Time; + type Input = Column<((layout::Key,layout::Val),layout::Time,layout::Diff)>; + type Time = layout::Time; type Output = OrdValBatch; fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { @@ -648,36 +645,36 @@ pub mod dd_builder { for ((key,val),time,diff) in chunk.drain() { // It would be great to avoid. - let key = <::Owned as Columnar>::into_owned(key); - let val = <::Owned as Columnar>::into_owned(val); + let key = as Columnar>::into_owned(key); + let val = as Columnar>::into_owned(val); // These feel fine (wrt the other versions) - let time = <::Owned as Columnar>::into_owned(time); - let diff = <::Owned as Columnar>::into_owned(diff); + let time = as Columnar>::into_owned(time); + let diff = as Columnar>::into_owned(diff); // Pre-load the first update. if self.result.keys.is_empty() { - self.result.vals.vals.push(&val); - self.result.keys.push(&key); + self.result.vals.vals.push_into(&val); + self.result.keys.push_into(&key); self.staging.push(time, diff); } // Perhaps this is a continuation of an already received key. - else if self.result.keys.last().map(|k| <::ReadItem<'_> as IntoOwned>::borrow_as(&key).eq(&k)).unwrap_or(false) { + else if self.result.keys.last().map(|k| L::KeyContainer::borrow_as(&key).eq(&k)).unwrap_or(false) { // Perhaps this is a continuation of an already received value. - if self.result.vals.vals.last().map(|v| <::ReadItem<'_> as IntoOwned>::borrow_as(&val).eq(&v)).unwrap_or(false) { + if self.result.vals.vals.last().map(|v| L::ValContainer::borrow_as(&val).eq(&v)).unwrap_or(false) { self.staging.push(time, diff); } else { // New value; complete representation of prior value. self.staging.seal(&mut self.result.upds); self.staging.push(time, diff); - self.result.vals.vals.push(&val); + self.result.vals.vals.push_into(&val); } } else { // New key; complete representation of prior key. self.staging.seal(&mut self.result.upds); self.staging.push(time, diff); - self.result.vals.offs.push(self.result.vals.len()); - self.result.vals.vals.push(&val); - self.result.keys.push(&key); + self.result.vals.offs.push_ref(self.result.vals.len()); + self.result.vals.vals.push_into(&val); + self.result.keys.push_into(&key); } } } @@ -685,7 +682,7 @@ pub mod dd_builder { #[inline(never)] fn done(mut self, description: Description) -> OrdValBatch { self.staging.seal(&mut self.result.upds); - self.result.vals.offs.push(self.result.vals.len()); + self.result.vals.offs.push_ref(self.result.vals.len()); OrdValBatch { updates: self.staging.total(), storage: self.result, @@ -718,18 +715,16 @@ pub mod dd_builder { impl Builder for OrdKeyBuilder where L: Layout, - ::Owned: Columnar, - ::Owned: Columnar, - ::Owned: Columnar, - ::Owned: Columnar, + layout::Key: Columnar, + layout::Val: Columnar, + layout::Time: Columnar, + layout::Diff: Columnar, // These two constraints seem .. like we could potentially replace by `Columnar::Ref<'a>`. - for<'a> L::KeyContainer: PushInto<&'a ::Owned>, - for<'a> L::ValContainer: PushInto<&'a ::Owned>, - for<'a> ::ReadItem<'a> : IntoOwned<'a, Owned = ::Time>, - for<'a> ::ReadItem<'a> : IntoOwned<'a, Owned = ::Diff>, + for<'a> L::KeyContainer: PushInto<&'a layout::Key>, + for<'a> L::ValContainer: PushInto<&'a layout::Val>, { - type Input = Column<((::Owned,::Owned),::Owned,::Owned)>; - type Time = ::Time; + type Input = Column<((layout::Key,layout::Val),layout::Time,layout::Diff)>; + type Time = layout::Time; type Output = OrdKeyBatch; fn with_capacity(keys: usize, _vals: usize, upds: usize) -> Self { @@ -754,24 +749,24 @@ pub mod dd_builder { for ((key,_val),time,diff) in chunk.drain() { // It would be great to avoid. - let key = <::Owned as Columnar>::into_owned(key); + let key = as Columnar>::into_owned(key); // These feel fine (wrt the other versions) - let time = <::Owned as Columnar>::into_owned(time); - let diff = <::Owned as Columnar>::into_owned(diff); + let time = as Columnar>::into_owned(time); + let diff = as Columnar>::into_owned(diff); // Pre-load the first update. if self.result.keys.is_empty() { - self.result.keys.push(&key); + self.result.keys.push_into(&key); self.staging.push(time, diff); } // Perhaps this is a continuation of an already received key. - else if self.result.keys.last().map(|k| <::ReadItem<'_> as IntoOwned>::borrow_as(&key).eq(&k)).unwrap_or(false) { + else if self.result.keys.last().map(|k| L::KeyContainer::borrow_as(&key).eq(&k)).unwrap_or(false) { self.staging.push(time, diff); } else { // New key; complete representation of prior key. self.staging.seal(&mut self.result.upds); self.staging.push(time, diff); - self.result.keys.push(&key); + self.result.keys.push_into(&key); } } } diff --git a/differential-dataflow/examples/cursors.rs b/differential-dataflow/examples/cursors.rs index 1f91f615d..032318acc 100644 --- a/differential-dataflow/examples/cursors.rs +++ b/differential-dataflow/examples/cursors.rs @@ -95,7 +95,7 @@ fn main() { /* Return trace content after the last round. */ let (mut cursor, storage) = graph_trace.cursor(); - cursor.to_vec(&storage) + cursor.to_vec(&storage, |k| k.clone(), |v| v.clone()) }) .unwrap().join(); diff --git a/differential-dataflow/examples/spines.rs b/differential-dataflow/examples/spines.rs index 9d5a82019..3fada3d5f 100644 --- a/differential-dataflow/examples/spines.rs +++ b/differential-dataflow/examples/spines.rs @@ -48,22 +48,6 @@ fn main() { keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); }, - "slc" => { - - use differential_dataflow::trace::implementations::ord_neu::{PreferredBatcher, PreferredBuilder, PreferredSpine}; - - let data = - data.map(|x| (x.clone().into_bytes(), x.into_bytes())) - .arrange::, PreferredBuilder<[u8],[u8],_,_>, PreferredSpine<[u8],[u8],_,_>>() - .reduce_abelian::<_, _, _, PreferredBuilder<[u8],(),_,_>, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1))); - let keys = - keys.map(|x| (x.clone().into_bytes(), 7)) - .arrange::, PreferredBuilder<[u8],u8,_,_>, PreferredSpine<[u8],u8,_,_>>() - .reduce_abelian::<_, _, _, PreferredBuilder<[u8],(),_,_>,PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1))); - - keys.join_core(&data, |_k, &(), &()| Option::<()>::None) - .probe_with(&mut probe); - }, _ => { println!("unrecognized mode: {:?}", mode) } diff --git a/differential-dataflow/src/consolidation.rs b/differential-dataflow/src/consolidation.rs index 1fbb283b1..292f1a63c 100644 --- a/differential-dataflow/src/consolidation.rs +++ b/differential-dataflow/src/consolidation.rs @@ -14,7 +14,7 @@ use std::cmp::Ordering; use std::collections::VecDeque; use timely::Container; use timely::container::{ContainerBuilder, PushInto}; -use crate::{IntoOwned, Data}; +use crate::Data; use crate::difference::{IsZero, Semigroup}; /// Sorts and consolidates `vec`. @@ -231,11 +231,14 @@ pub trait ConsolidateLayout: Container { type Key<'a>: Eq where Self: 'a; /// GAT diff type. - type Diff<'a>: IntoOwned<'a, Owned = Self::DiffOwned> where Self: 'a; + type Diff<'a>; /// Owned diff type. type DiffOwned: for<'a> Semigroup>; + /// Converts a reference diff into an owned diff. + fn owned_diff(diff: Self::Diff<'_>) -> Self::DiffOwned; + /// Deconstruct an item into key and diff. Must be cheap. fn into_parts(item: Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>); @@ -266,7 +269,7 @@ pub trait ConsolidateLayout: Container { let (k, d) = Self::into_parts(item); let mut prev_key = k; - let mut prev_diff = d.into_owned(); + let mut prev_diff = Self::owned_diff(d); for item in iter { let (next_key, next_diff) = Self::into_parts(item); @@ -278,7 +281,7 @@ pub trait ConsolidateLayout: Container { target.push_with_diff(prev_key, prev_diff); } prev_key = next_key; - prev_diff = next_diff.into_owned(); + prev_diff = Self::owned_diff(next_diff); } } @@ -293,12 +296,14 @@ impl ConsolidateLayout for Vec<(D, T, R)> where D: Ord + Clone + 'static, T: Ord + Clone + 'static, - R: Semigroup + for<'a> IntoOwned<'a, Owned = R> + Clone + 'static, + R: Semigroup + Clone + 'static, { type Key<'a> = (D, T) where Self: 'a; type Diff<'a> = R where Self: 'a; type DiffOwned = R; + fn owned_diff(diff: Self::Diff<'_>) -> Self::DiffOwned { diff } + fn into_parts((data, time, diff): Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) { ((data, time), diff) } diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index 189cf7dd2..7b461f816 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -215,7 +215,7 @@ where while let Some(val) = cursor.get_val(batch) { for datum in logic(key, val) { cursor.map_times(batch, |time, diff| { - session.give((datum.clone(), time.into_owned(), diff.into_owned())); + session.give((datum.clone(), Tr::owned_time(time), Tr::owned_diff(diff))); }); } cursor.step_val(batch); diff --git a/differential-dataflow/src/operators/arrange/upsert.rs b/differential-dataflow/src/operators/arrange/upsert.rs index fa556327d..423f45261 100644 --- a/differential-dataflow/src/operators/arrange/upsert.rs +++ b/differential-dataflow/src/operators/arrange/upsert.rs @@ -244,7 +244,7 @@ where // Determine the prior value associated with the key. while let Some(val) = trace_cursor.get_val(&trace_storage) { let mut count = 0; - trace_cursor.map_times(&trace_storage, |_time, diff| count += diff.into_owned()); + trace_cursor.map_times(&trace_storage, |_time, diff| count += Tr::owned_diff(diff)); assert!(count == 0 || count == 1); if count == 1 { assert!(prev_value.is_none()); diff --git a/differential-dataflow/src/operators/consolidate.rs b/differential-dataflow/src/operators/consolidate.rs index c70498b71..00ad3f4b9 100644 --- a/differential-dataflow/src/operators/consolidate.rs +++ b/differential-dataflow/src/operators/consolidate.rs @@ -8,7 +8,7 @@ use timely::dataflow::Scope; -use crate::{IntoOwned, Collection, ExchangeData, Hashable}; +use crate::{Collection, ExchangeData, Hashable}; use crate::consolidation::ConsolidatingContainerBuilder; use crate::difference::Semigroup; @@ -45,20 +45,22 @@ where /// ``` pub fn consolidate(&self) -> Self { use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine}; - self.consolidate_named::,KeyBuilder<_,_,_>, KeySpine<_,_,_>>("Consolidate") + self.consolidate_named::,KeyBuilder<_,_,_>, KeySpine<_,_,_>,_>("Consolidate", |key,&()| key.clone()) } - /// As `consolidate` but with the ability to name the operator and specify the trace type. - pub fn consolidate_named(&self, name: &str) -> Self + /// As `consolidate` but with the ability to name the operator, specify the trace type, + /// and provide the function `reify` to produce owned keys and values.. + pub fn consolidate_named(&self, name: &str, reify: F) -> Self where Ba: Batcher, Time=G::Timestamp> + 'static, - Tr: for<'a> crate::trace::Trace: IntoOwned<'a, Owned = D>,Time=G::Timestamp,Diff=R>+'static, + Tr: for<'a> crate::trace::Trace+'static, Bu: Builder, + F: Fn(Tr::Key<'_>, Tr::Val<'_>) -> D + 'static, { use crate::operators::arrange::arrangement::Arrange; self.map(|k| (k, ())) .arrange_named::(name) - .as_collection(|d, _| d.into_owned()) + .as_collection(reify) } /// Aggregates the weights of equal records. diff --git a/differential-dataflow/src/operators/count.rs b/differential-dataflow/src/operators/count.rs index 53098979c..3380a3028 100644 --- a/differential-dataflow/src/operators/count.rs +++ b/differential-dataflow/src/operators/count.rs @@ -6,7 +6,7 @@ use timely::dataflow::operators::Operator; use timely::dataflow::channels::pact::Pipeline; use crate::lattice::Lattice; -use crate::{IntoOwned, ExchangeData, Collection}; +use crate::{ExchangeData, Collection}; use crate::difference::{IsZero, Semigroup}; use crate::hashable::Hashable; use crate::collection::AsCollection; @@ -56,7 +56,7 @@ impl CountTotal for Arranged where G: Scope, T1: for<'a> TraceReader< - Key<'a>: IntoOwned<'a, Owned = K>, + Key<'a> = &'a K, Val<'a>=&'a (), Time: TotalOrder, Diff: ExchangeData+Semigroup> @@ -109,7 +109,7 @@ where if trace_cursor.get_key(&trace_storage) == Some(key) { trace_cursor.map_times(&trace_storage, |_, diff| { count.as_mut().map(|c| c.plus_equals(&diff)); - if count.is_none() { count = Some(diff.into_owned()); } + if count.is_none() { count = Some(T1::owned_diff(diff)); } }); } @@ -117,14 +117,14 @@ where if let Some(count) = count.as_ref() { if !count.is_zero() { - session.give(((key.into_owned(), count.clone()), time.into_owned(), R2::from(-1i8))); + session.give(((key.clone(), count.clone()), T1::owned_time(time), R2::from(-1i8))); } } count.as_mut().map(|c| c.plus_equals(&diff)); - if count.is_none() { count = Some(diff.into_owned()); } + if count.is_none() { count = Some(T1::owned_diff(diff)); } if let Some(count) = count.as_ref() { if !count.is_zero() { - session.give(((key.into_owned(), count.clone()), time.into_owned(), R2::from(1i8))); + session.give(((key.clone(), count.clone()), T1::owned_time(time), R2::from(1i8))); } } }); diff --git a/differential-dataflow/src/operators/join.rs b/differential-dataflow/src/operators/join.rs index 25be349c6..93be1c13a 100644 --- a/differential-dataflow/src/operators/join.rs +++ b/differential-dataflow/src/operators/join.rs @@ -171,7 +171,7 @@ where V: Data + 'static, { fn join_map(&self, other: &Collection, mut logic: L) -> Collection>::Output> - where + where Tr::Diff: Multiply, L: for<'a> FnMut(Tr::Key<'a>, Tr::Val<'a>, &V2)->D+'static, { @@ -660,14 +660,12 @@ where Ordering::Greater => batch.seek_key(batch_storage, trace_key), Ordering::Equal => { - use crate::IntoOwned; - thinker.history1.edits.load(trace, trace_storage, |time| { - let mut time = time.into_owned(); + let mut time = C1::owned_time(time); time.join_assign(meet); time }); - thinker.history2.edits.load(batch, batch_storage, |time| time.into_owned()); + thinker.history2.edits.load(batch, batch_storage, |time| C2::owned_time(time)); // populate `temp` with the results in the best way we know how. thinker.think(|v1,v2,t,r1,r2| { diff --git a/differential-dataflow/src/operators/mod.rs b/differential-dataflow/src/operators/mod.rs index ab875738f..6112a840f 100644 --- a/differential-dataflow/src/operators/mod.rs +++ b/differential-dataflow/src/operators/mod.rs @@ -22,7 +22,6 @@ pub mod threshold; use crate::lattice::Lattice; use crate::trace::Cursor; -use crate::IntoOwned; /// An accumulation of (value, time, diff) updates. struct EditList<'a, C: Cursor> { @@ -46,7 +45,7 @@ impl<'a, C: Cursor> EditList<'a, C> { { self.clear(); while let Some(val) = cursor.get_val(storage) { - cursor.map_times(storage, |time1, diff1| self.push(logic(time1), diff1.into_owned())); + cursor.map_times(storage, |time1, diff1| self.push(logic(time1), C::owned_diff(diff1))); self.seal(val); cursor.step_val(storage); } diff --git a/differential-dataflow/src/operators/reduce.rs b/differential-dataflow/src/operators/reduce.rs index 2267268ff..d60c013db 100644 --- a/differential-dataflow/src/operators/reduce.rs +++ b/differential-dataflow/src/operators/reduce.rs @@ -739,7 +739,7 @@ mod history_replay { // loaded times by performing the lattice `join` with this value. // Load the batch contents. - let mut batch_replay = self.batch_history.replay_key(batch_cursor, batch_storage, key, |time| time.into_owned()); + let mut batch_replay = self.batch_history.replay_key(batch_cursor, batch_storage, key, |time| C3::owned_time(time)); // We determine the meet of times we must reconsider (those from `batch` and `times`). This meet // can be used to advance other historical times, which may consolidate their representation. As @@ -776,23 +776,23 @@ mod history_replay { // Load the input and output histories. let mut input_replay = if let Some(meet) = meet.as_ref() { self.input_history.replay_key(source_cursor, source_storage, key, |time| { - let mut time = time.into_owned(); + let mut time = C1::owned_time(time); time.join_assign(meet); time }) } else { - self.input_history.replay_key(source_cursor, source_storage, key, |time| time.into_owned()) + self.input_history.replay_key(source_cursor, source_storage, key, |time| C1::owned_time(time)) }; let mut output_replay = if let Some(meet) = meet.as_ref() { self.output_history.replay_key(output_cursor, output_storage, key, |time| { - let mut time = time.into_owned(); + let mut time = C2::owned_time(time); time.join_assign(meet); time }) } else { - self.output_history.replay_key(output_cursor, output_storage, key, |time| time.into_owned()) + self.output_history.replay_key(output_cursor, output_storage, key, |time| C2::owned_time(time)) }; self.synth_times.clear(); diff --git a/differential-dataflow/src/operators/threshold.rs b/differential-dataflow/src/operators/threshold.rs index 6476b19b9..df4f110cc 100644 --- a/differential-dataflow/src/operators/threshold.rs +++ b/differential-dataflow/src/operators/threshold.rs @@ -42,10 +42,10 @@ pub trait ThresholdTotal, K: Exchang fn threshold_totalR2+'static>(&self, mut thresh: F) -> Collection { self.threshold_semigroup(move |key, new, old| { let mut new = thresh(key, new); - if let Some(old) = old { + if let Some(old) = old { let mut add = thresh(key, old); add.negate(); - new.plus_equals(&add); + new.plus_equals(&add); } if !new.is_zero() { Some(new) } else { None } }) @@ -102,7 +102,7 @@ impl ThresholdTotal for Arranged where G: Scope, T1: for<'a> TraceReader< - Key<'a>=&'a K, + Key<'a>=&'a K, Val<'a>=&'a (), Time: TotalOrder, Diff : ExchangeData + Semigroup>, @@ -144,7 +144,6 @@ where } }); - use crate::IntoOwned; if let Some(capability) = cap { let mut session = output.session(&capability); @@ -161,7 +160,7 @@ where if trace_cursor.get_key(&trace_storage) == Some(key) { trace_cursor.map_times(&trace_storage, |_, diff| { count.as_mut().map(|c| c.plus_equals(&diff)); - if count.is_none() { count = Some(diff.into_owned()); } + if count.is_none() { count = Some(T1::owned_diff(diff)); } }); } @@ -176,7 +175,7 @@ where temp.plus_equals(&diff); thresh(key, &temp, Some(old)) }, - None => { thresh(key, &diff.into_owned(), None) }, + None => { thresh(key, &T1::owned_diff(diff), None) }, }; // Either add or assign `diff` to `count`. @@ -184,12 +183,12 @@ where count.plus_equals(&diff); } else { - count = Some(diff.into_owned()); + count = Some(T1::owned_diff(diff)); } if let Some(difference) = difference { if !difference.is_zero() { - session.give((key.clone(), time.into_owned(), difference)); + session.give((key.clone(), T1::owned_time(time), difference)); } } }); diff --git a/differential-dataflow/src/trace/cursor/cursor_list.rs b/differential-dataflow/src/trace/cursor/cursor_list.rs index 2a175cef0..6c57bce4c 100644 --- a/differential-dataflow/src/trace/cursor/cursor_list.rs +++ b/differential-dataflow/src/trace/cursor/cursor_list.rs @@ -102,6 +102,10 @@ impl Cursor for CursorList { type Diff = C::Diff; type DiffGat<'a> = C::DiffGat<'a>; + #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { C::owned_time(time) } + #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { C::clone_time_onto(time, onto) } + #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } + type Storage = Vec; // validation methods diff --git a/differential-dataflow/src/trace/cursor/mod.rs b/differential-dataflow/src/trace/cursor/mod.rs index dfcab95f3..f31cfb75f 100644 --- a/differential-dataflow/src/trace/cursor/mod.rs +++ b/differential-dataflow/src/trace/cursor/mod.rs @@ -8,8 +8,6 @@ use timely::progress::Timestamp; use crate::difference::Semigroup; -// `pub use` for legacy reasons. -pub use crate::IntoOwned; use crate::lattice::Lattice; pub mod cursor_list; @@ -26,11 +24,18 @@ pub trait Cursor { /// Timestamps associated with updates type Time: Timestamp + Lattice + Ord + Clone; /// Borrowed form of timestamp. - type TimeGat<'a>: Copy + IntoOwned<'a, Owned = Self::Time>; + type TimeGat<'a>: Copy; /// Owned form of update difference. type Diff: Semigroup + 'static; /// Borrowed form of update difference. - type DiffGat<'a> : Copy + IntoOwned<'a, Owned = Self::Diff>; + type DiffGat<'a> : Copy; + + /// An owned copy of a reference to a time. + fn owned_time(time: Self::TimeGat<'_>) -> Self::Time; + /// Clones a reference time onto an owned time. + fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time); + /// An owned copy of a reference to a diff. + fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff; /// Storage required by the cursor. type Storage; @@ -74,10 +79,10 @@ pub trait Cursor { fn rewind_vals(&mut self, storage: &Self::Storage); /// Rewinds the cursor and outputs its contents to a Vec - fn to_vec(&mut self, storage: &Self::Storage) -> Vec<((K, V), Vec<(Self::Time, Self::Diff)>)> - where - for<'a> Self::Key<'a> : IntoOwned<'a, Owned = K>, - for<'a> Self::Val<'a> : IntoOwned<'a, Owned = V>, + fn to_vec(&mut self, storage: &Self::Storage, into_key: IK, into_val: IV) -> Vec<((K, V), Vec<(Self::Time, Self::Diff)>)> + where + IK: for<'a> Fn(Self::Key<'a>) -> K, + IV: for<'a> Fn(Self::Val<'a>) -> V, { let mut out = Vec::new(); self.rewind_keys(storage); @@ -86,9 +91,9 @@ pub trait Cursor { while let Some(val) = self.get_val(storage) { let mut kv_out = Vec::new(); self.map_times(storage, |ts, r| { - kv_out.push((ts.into_owned(), r.into_owned())); + kv_out.push((Self::owned_time(ts), Self::owned_diff(r))); }); - out.push(((key.into_owned(), val.into_owned()), kv_out)); + out.push(((into_key(key), into_val(val)), kv_out)); self.step_val(storage); } self.step_key(storage); diff --git a/differential-dataflow/src/trace/implementations/huffman_container.rs b/differential-dataflow/src/trace/implementations/huffman_container.rs index c5d08e6bb..e09c7634c 100644 --- a/differential-dataflow/src/trace/implementations/huffman_container.rs +++ b/differential-dataflow/src/trace/implementations/huffman_container.rs @@ -38,8 +38,8 @@ impl PushInto> for HuffmanContainer { bytes.extend(huffman.encode(item.iter())); self.offsets.push(bytes.len()); }, - Err(raw) => { - raw.extend(item); + Err(raw) => { + raw.extend(item); self.offsets.push(raw.len()); } } @@ -82,8 +82,26 @@ impl BatchContainer for HuffmanContainer { type Owned = Vec; type ReadItem<'a> = Wrapped<'a, B>; + fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { + match item.decode() { + Ok(decode) => decode.cloned().collect(), + Err(bytes) => bytes.to_vec(), + } + } + fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { + other.clear(); + match item.decode() { + Ok(decode) => other.extend(decode.cloned()), + Err(bytes) => other.extend_from_slice(bytes), + } + } + fn borrow_as<'a>(owned: &'a Self::Owned) -> Self::ReadItem<'a> { Self::ReadItem { inner: Err(&owned[..]) } } + fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item } + fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) } + fn push_own(&mut self, item: Self::Owned) { self.push_into(item) } + fn with_capacity(size: usize) -> Self { let mut offsets = OffsetList::with_capacity(size + 1); offsets.push(0); @@ -147,11 +165,10 @@ impl Default for HuffmanContainer { mod wrapper { - use crate::IntoOwned; use super::Encoded; pub struct Wrapped<'a, B: Ord> { - inner: Result, &'a [B]>, + pub(crate) inner: Result, &'a [B]>, } impl<'a, B: Ord> Wrapped<'a, B> { @@ -200,25 +217,6 @@ mod wrapper { self.partial_cmp(other).unwrap() } } - impl<'a, B: Ord+Clone> IntoOwned<'a> for Wrapped<'a, B> { - type Owned = Vec; - fn into_owned(self) -> Self::Owned { - match self.decode() { - Ok(decode) => decode.cloned().collect(), - Err(bytes) => bytes.to_vec(), - } - } - fn clone_onto(self, other: &mut Self::Owned) { - other.clear(); - match self.decode() { - Ok(decode) => other.extend(decode.cloned()), - Err(bytes) => other.extend_from_slice(bytes), - } - } - fn borrow_as(owned: &'a Self::Owned) -> Self { - Self { inner: Err(&owned[..]) } - } - } } /// Wrapper around a Huffman decoder and byte slices, decodeable to a byte sequence. @@ -254,7 +252,7 @@ mod huffman { use std::collections::BTreeMap; use std::convert::TryInto; - + use self::decoder::Decoder; use self::encoder::Encoder; @@ -281,7 +279,7 @@ mod huffman { } /// Decodes the provided bytes as a sequence of symbols. - pub fn decode(&self, bytes: I) -> Decoder<'_, T, I::IntoIter> + pub fn decode(&self, bytes: I) -> Decoder<'_, T, I::IntoIter> where I: IntoIterator { @@ -317,7 +315,7 @@ mod huffman { while let Some((node, level)) = todo.pop() { match node { Node::Leaf(sym) => { levels.push((level, sym)); }, - Node::Fork(l,r) => { + Node::Fork(l,r) => { todo.push((&tree[*l], level + 1)); todo.push((&tree[*r], level + 1)); }, @@ -345,13 +343,13 @@ mod huffman { } } - Huffman { + Huffman { encode, decode, } } - /// Inserts a symbol, and + /// Inserts a symbol, and fn insert_decode(map: &mut [Decode; 256], symbol: &T, bits: usize, code: u64) where T: Clone { let byte: u8 = (code >> 56).try_into().unwrap(); if bits <= 8 { @@ -376,7 +374,7 @@ mod huffman { Fork(usize, usize), } - /// Decoder + /// Decoder #[derive(Eq, PartialEq, Ord, PartialOrd, Debug, Default)] pub enum Decode { /// An as-yet unfilled slot. @@ -392,7 +390,7 @@ mod huffman { /// Tests to see if the map contains any invalid values. /// /// A correctly initialized map will have no invalid values. - /// A map with invalid values will be unable to decode some + /// A map with invalid values will be unable to decode some /// input byte sequences. fn any_void(&self) -> bool { match self { diff --git a/differential-dataflow/src/trace/implementations/mod.rs b/differential-dataflow/src/trace/implementations/mod.rs index 1c9ec7da6..0bc5c2651 100644 --- a/differential-dataflow/src/trace/implementations/mod.rs +++ b/differential-dataflow/src/trace/implementations/mod.rs @@ -54,7 +54,6 @@ pub use self::ord_neu::OrdKeySpine as KeySpine; pub use self::ord_neu::OrdKeyBatcher as KeyBatcher; pub use self::ord_neu::RcOrdKeyBuilder as KeyBuilder; -use std::borrow::{ToOwned}; use std::convert::TryInto; use columnation::Columnation; @@ -94,28 +93,116 @@ where /// A type with opinions on how updates should be laid out. pub trait Layout { - /// The represented update. - type Target: Update + ?Sized; /// Container for update keys. - // NB: The `PushInto` constraint is only required by `rhh.rs` to push default values. - type KeyContainer: BatchContainer + PushInto<::Key>; + type KeyContainer: BatchContainer; /// Container for update vals. type ValContainer: BatchContainer; /// Container for times. - type TimeContainer: BatchContainer::Time> + PushInto<::Time>; + type TimeContainer: BatchContainer; /// Container for diffs. - type DiffContainer: BatchContainer::Diff> + PushInto<::Diff>; + type DiffContainer: BatchContainer; /// Container for offsets. type OffsetContainer: for<'a> BatchContainer = usize>; } +/// A type bearing a layout. +pub trait LaidOut { + /// The layout. + type Layout: Layout; +} + +/// Automatically implemented trait for types with layouts. +pub trait LayoutExt : LaidOut { + /// Alias for an owned key of a layout. + type Key; + /// Alias for an borrowed key of a layout. + type KeyRef<'a>; + /// Alias for an owned val of a layout. + type Val; + /// Alias for an borrowed val of a layout. + type ValRef<'a>; + /// Alias for an owned time of a layout. + type Time; + /// Alias for an borrowed time of a layout. + type TimeRef<'a>; + /// Alias for an owned diff of a layout. + type Diff; + /// Alias for an borrowed diff of a layout. + type DiffRef<'a>; + + /// Construct an owned key from a reference. + fn owned_key(key: Self::KeyRef<'_>) -> Self::Key; + /// Construct an owned val from a reference. + fn owned_val(val: Self::ValRef<'_>) -> Self::Val; + /// Construct an owned time from a reference. + fn owned_time(time: Self::TimeRef<'_>) -> Self::Time; + /// Construct an owned diff from a reference. + fn owned_diff(diff: Self::DiffRef<'_>) -> Self::Diff; +} + +impl LayoutExt for L { + type Key = <::KeyContainer as BatchContainer>::Owned; + type KeyRef<'a> = <::KeyContainer as BatchContainer>::ReadItem<'a>; + type Val = <::ValContainer as BatchContainer>::Owned; + type ValRef<'a> = <::ValContainer as BatchContainer>::ReadItem<'a>; + type Time = <::TimeContainer as BatchContainer>::Owned; + type TimeRef<'a> = <::TimeContainer as BatchContainer>::ReadItem<'a>; + type Diff = <::DiffContainer as BatchContainer>::Owned; + type DiffRef<'a> = <::DiffContainer as BatchContainer>::ReadItem<'a>; + + #[inline(always)] fn owned_key(key: Self::KeyRef<'_>) -> Self::Key { ::KeyContainer::into_owned(key) } + #[inline(always)] fn owned_val(val: Self::ValRef<'_>) -> Self::Val { ::ValContainer::into_owned(val) } + #[inline(always)] fn owned_time(time: Self::TimeRef<'_>) -> Self::Time { ::TimeContainer::into_owned(time) } + #[inline(always)] fn owned_diff(diff: Self::DiffRef<'_>) -> Self::Diff { ::DiffContainer::into_owned(diff) } +} + +// An easy way to provide an explicit layout: as a 5-tuple. +// Valuable when one wants to perform layout surgery. +impl Layout for (KC, VC, TC, DC, OC) +where + KC: BatchContainer, + VC: BatchContainer, + TC: BatchContainer, + DC: BatchContainer, + OC: for<'a> BatchContainer = usize>, +{ + type KeyContainer = KC; + type ValContainer = VC; + type TimeContainer = TC; + type DiffContainer = DC; + type OffsetContainer = OC; +} + +/// Aliases for types provided by the containers within a `Layout`. +/// +/// For clarity, prefer `use layout;` and then `layout::Key` to retain the layout context. +pub mod layout { + use crate::trace::implementations::{BatchContainer, Layout}; + + /// Alias for an owned key of a layout. + pub type Key = <::KeyContainer as BatchContainer>::Owned; + /// Alias for an borrowed key of a layout. + pub type KeyRef<'a, L> = <::KeyContainer as BatchContainer>::ReadItem<'a>; + /// Alias for an owned val of a layout. + pub type Val = <::ValContainer as BatchContainer>::Owned; + /// Alias for an borrowed val of a layout. + pub type ValRef<'a, L> = <::ValContainer as BatchContainer>::ReadItem<'a>; + /// Alias for an owned time of a layout. + pub type Time = <::TimeContainer as BatchContainer>::Owned; + /// Alias for an borrowed time of a layout. + pub type TimeRef<'a, L> = <::TimeContainer as BatchContainer>::ReadItem<'a>; + /// Alias for an owned diff of a layout. + pub type Diff = <::DiffContainer as BatchContainer>::Owned; + /// Alias for an borrowed diff of a layout. + pub type DiffRef<'a, L> = <::DiffContainer as BatchContainer>::ReadItem<'a>; +} + /// A layout that uses vectors pub struct Vector { phantom: std::marker::PhantomData, } impl> Layout for Vector { - type Target = U; type KeyContainer = Vec; type ValContainer = Vec; type TimeContainer = Vec; @@ -137,7 +224,6 @@ where Diff: Columnation + Ord, >, { - type Target = U; type KeyContainer = TimelyStack; type ValContainer = TimelyStack; type TimeContainer = TimelyStack; @@ -145,55 +231,6 @@ where type OffsetContainer = OffsetList; } -/// A type with a preferred container. -/// -/// Examples include types that implement `Clone` who prefer -pub trait PreferredContainer : ToOwned { - /// The preferred container for the type. - type Container: BatchContainer + PushInto; -} - -impl PreferredContainer for T { - type Container = Vec; -} - -impl PreferredContainer for [T] { - type Container = SliceContainer; -} - -/// An update and layout description based on preferred containers. -pub struct Preferred { - phantom: std::marker::PhantomData<(Box, Box, T, D)>, -} - -impl Update for Preferred -where - K: ToOwned + ?Sized, - V: ToOwned + ?Sized, - T: Ord+Clone+Lattice+timely::progress::Timestamp, - R: Ord+Clone+Semigroup+'static, -{ - type Key = K::Owned; - type Val = V::Owned; - type Time = T; - type Diff = R; -} - -impl Layout for Preferred -where - K: Ord+ToOwned+PreferredContainer + ?Sized, - V: Ord+ToOwned+PreferredContainer + ?Sized, - T: Ord+Clone+Lattice+timely::progress::Timestamp, - D: Ord+Clone+Semigroup+'static, -{ - type Target = Preferred; - type KeyContainer = K::Container; - type ValContainer = V::Container; - type TimeContainer = Vec; - type DiffContainer = Vec; - type OffsetContainer = OffsetList; -} - /// A list of unsigned integers that uses `u32` elements as long as they are small enough, and switches to `u64` once they are not. #[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Serialize, Deserialize)] pub struct OffsetList { @@ -294,8 +331,16 @@ impl BatchContainer for OffsetList { type Owned = usize; type ReadItem<'a> = usize; + #[inline(always)] + fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item } + #[inline(always)] + fn borrow_as<'a>(owned: &'a Self::Owned) -> Self::ReadItem<'a> { *owned } + #[inline(always)] fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item } + fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) } + fn push_own(&mut self, item: Self::Owned) { self.push_into(item) } + fn with_capacity(size: usize) -> Self { Self::with_capacity(size) } @@ -454,20 +499,34 @@ pub mod containers { use timely::container::PushInto; use crate::containers::TimelyStack; - use crate::IntoOwned; /// A general-purpose container resembling `Vec`. - pub trait BatchContainer: for<'a> PushInto> + 'static { + pub trait BatchContainer: 'static { /// An owned instance of `Self::ReadItem<'_>`. type Owned; /// The type that can be read back out of the container. - type ReadItem<'a>: Copy + Ord + IntoOwned<'a, Owned = Self::Owned>; + type ReadItem<'a>: Copy + Ord; - /// Push an item into this container - fn push(&mut self, item: D) where Self: PushInto { - self.push_into(item); + + /// Conversion from an instance of this type to the owned type. + #[must_use] + fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned; + /// Clones `self` onto an existing instance of the owned type. + #[inline(always)] + fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { + *other = Self::into_owned(item); } + /// Borrows an owned instance as oneself. + #[must_use] + fn borrow_as<'a>(owned: &'a Self::Owned) -> Self::ReadItem<'a>; + + + /// Push an item into this container + fn push_ref(&mut self, item: Self::ReadItem<'_>); + /// Push an item into this container + fn push_own(&mut self, item: Self::Owned); + /// Creates a new container with sufficient capacity. fn with_capacity(size: usize) -> Self; /// Creates a new container with sufficient capacity. @@ -552,8 +611,15 @@ pub mod containers { type Owned = T; type ReadItem<'a> = &'a T; + #[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item.clone() } + #[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.clone_from(item); } + #[inline(always)] fn borrow_as<'a>(owned: &'a Self::Owned) -> Self::ReadItem<'a> { owned } + fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item } + fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) } + fn push_own(&mut self, item: Self::Owned) { self.push_into(item) } + fn with_capacity(size: usize) -> Self { Vec::with_capacity(size) } @@ -577,8 +643,15 @@ pub mod containers { type Owned = T; type ReadItem<'a> = &'a T; + #[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item.clone() } + #[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.clone_from(item); } + #[inline(always)] fn borrow_as<'a>(owned: &'a Self::Owned) -> Self::ReadItem<'a> { owned } + fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item } + fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) } + fn push_own(&mut self, item: Self::Owned) { self.push_into(item) } + fn with_capacity(size: usize) -> Self { Self::with_capacity(size) } @@ -637,8 +710,15 @@ pub mod containers { type Owned = Vec; type ReadItem<'a> = &'a [B]; + #[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item.to_vec() } + #[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.clone_from_slice(item); } + #[inline(always)] fn borrow_as<'a>(owned: &'a Self::Owned) -> Self::ReadItem<'a> { &owned[..] } + fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item } + fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) } + fn push_own(&mut self, item: Self::Owned) { self.push_into(item) } + fn with_capacity(size: usize) -> Self { let mut offsets = Vec::with_capacity(size + 1); offsets.push(0); diff --git a/differential-dataflow/src/trace/implementations/ord_neu.rs b/differential-dataflow/src/trace/implementations/ord_neu.rs index 4368c9391..40a5b0c28 100644 --- a/differential-dataflow/src/trace/implementations/ord_neu.rs +++ b/differential-dataflow/src/trace/implementations/ord_neu.rs @@ -16,7 +16,7 @@ use crate::trace::implementations::spine_fueled::Spine; use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger}; use crate::trace::rc_blanket_impls::RcBuilder; -use super::{Update, Layout, Vector, TStack, Preferred}; +use super::{Layout, Vector, TStack}; pub use self::val_batch::{OrdValBatch, OrdValBuilder}; pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder}; @@ -55,13 +55,6 @@ pub type ColKeyBatcher = MergeBatcher, ColumnationChu /// A builder for columnar storage pub type ColKeyBuilder = RcBuilder, TimelyStack<((K,()),T,R)>>>; -/// A trace implementation backed by columnar storage. -pub type PreferredSpine = Spine>>>; -/// A batcher for columnar storage. -pub type PreferredBatcher = MergeBatcher::Owned,::Owned),T,R)>, ColumnationChunker<((::Owned,::Owned),T,R)>, ColMerger<(::Owned,::Owned),T,R>>; -/// A builder for columnar storage. -pub type PreferredBuilder = RcBuilder, TimelyStack<((::Owned,::Owned),T,R)>>>; - // /// A trace implementation backed by columnar storage. // pub type ColKeySpine = Spine>>>; @@ -118,7 +111,7 @@ pub mod layers { /// Allocates with capacities for a number of lists and values. pub fn with_capacity(o_size: usize, v_size: usize) -> Self { let mut offs = ::with_capacity(o_size); - offs.push(0); + offs.push_ref(0); Self { offs, vals: ::with_capacity(v_size), @@ -127,7 +120,7 @@ pub mod layers { /// Allocates with enough capacity to contain two inputs. pub fn merge_capacity(this: &Self, that: &Self) -> Self { let mut offs = ::with_capacity(this.offs.len() + that.offs.len()); - offs.push(0); + offs.push_ref(0); Self { offs, vals: ::merge_capacity(&this.vals, &that.vals), @@ -184,7 +177,7 @@ pub mod layers { /// Allocates with capacities for a number of lists and values. pub fn with_capacity(o_size: usize, u_size: usize) -> Self { let mut offs = ::with_capacity(o_size); - offs.push(0); + offs.push_ref(0); Self { offs, times: ::with_capacity(u_size), @@ -194,7 +187,7 @@ pub mod layers { /// Allocates with enough capacity to contain two inputs. pub fn merge_capacity(this: &Self, that: &Self) -> Self { let mut offs = ::with_capacity(this.offs.len() + that.offs.len()); - offs.push(0); + offs.push_ref(0); Self { offs, times: ::merge_capacity(&this.times, &that.times), @@ -221,11 +214,10 @@ pub mod layers { } - use timely::container::PushInto; impl UpdsBuilder where - T: BatchContainer + PushInto, - D: BatchContainer + PushInto, + T: BatchContainer, + D: BatchContainer, { /// Stages one update, but does not seal the set of updates. pub fn push(&mut self, time: T::Owned, diff: D::Owned) { @@ -243,25 +235,24 @@ pub mod layers { // we push nothing and report an unincremented offset to encode this case. let time_diff = upds.times.last().zip(upds.diffs.last()); let last_eq = self.stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| { - use crate::IntoOwned; - let t1 = <::ReadItem<'_> as IntoOwned>::borrow_as(t1); - let d1 = <::ReadItem<'_> as IntoOwned>::borrow_as(d1); + let t1 = T::borrow_as(t1); + let d1 = D::borrow_as(d1); t1.eq(&t2) && d1.eq(&d2) }); if self.stash.len() == 1 && last_eq.unwrap_or(false) { // Just clear out the stash, as we won't drain it here. self.total += 1; self.stash.clear(); - upds.offs.push(upds.times.len()); + upds.offs.push_ref(upds.times.len()); } else { // Conventional; move `stash` into `updates`. self.total += self.stash.len(); for (time, diff) in self.stash.drain(..) { - upds.times.push(time); - upds.diffs.push(diff); + upds.times.push_own(time); + upds.diffs.push_own(diff); } - upds.offs.push(upds.times.len()); + upds.offs.push_ref(upds.times.len()); } true } else { @@ -284,9 +275,9 @@ pub mod val_batch { use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; use crate::trace::implementations::{BatchContainer, BuilderInput}; - use crate::IntoOwned; + use crate::trace::implementations::layout; - use super::{Layout, Update, Vals, Upds, layers::UpdsBuilder}; + use super::{Layout, Vals, Upds, layers::UpdsBuilder}; /// An immutable collection of update tuples, from a contiguous interval of logical times. #[derive(Debug, Serialize, Deserialize)] @@ -322,7 +313,7 @@ pub mod val_batch { /// The updates themselves. pub storage: OrdValStorage, /// Description of the update times this layer represents. - pub description: Description<::Time>, + pub description: Description>, /// The number of updates reflected in the batch. /// /// We track this separately from `storage` because due to the singleton optimization, @@ -332,12 +323,12 @@ pub mod val_batch { } impl BatchReader for OrdValBatch { - type Key<'a> = ::ReadItem<'a>; - type Val<'a> = ::ReadItem<'a>; - type Time = ::Time; - type TimeGat<'a> = ::ReadItem<'a>; - type Diff = ::Diff; - type DiffGat<'a> = ::ReadItem<'a>; + type Key<'a> = layout::KeyRef<'a, L>; + type Val<'a> = layout::ValRef<'a, L>; + type Time = layout::Time; + type TimeGat<'a> = layout::TimeRef<'a, L>; + type Diff = layout::Diff; + type DiffGat<'a> = layout::DiffRef<'a, L>; type Cursor = OrdValCursor; fn cursor(&self) -> Self::Cursor { @@ -352,13 +343,13 @@ pub mod val_batch { // Perhaps we should count such exceptions to the side, to provide a correct accounting. self.updates } - fn description(&self) -> &Description<::Time> { &self.description } + fn description(&self) -> &Description> { &self.description } } impl Batch for OrdValBatch { type Merger = OrdValMerger; - fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<::Time>) -> Self::Merger { + fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef>) -> Self::Merger { OrdValMerger::new(self, other, compaction_frontier) } @@ -385,16 +376,16 @@ pub mod val_batch { /// result that we are currently assembling. result: OrdValStorage, /// description - description: Description<::Time>, + description: Description>, /// Staging area to consolidate owned times and diffs, before sealing. staging: UpdsBuilder, } impl Merger> for OrdValMerger where - OrdValBatch: Batch::Time>, + OrdValBatch: Batch>, { - fn new(batch1: &OrdValBatch, batch2: &OrdValBatch, compaction_frontier: AntichainRef<::Time>) -> Self { + fn new(batch1: &OrdValBatch, batch2: &OrdValBatch, compaction_frontier: AntichainRef>) -> Self { assert!(batch1.upper() == batch2.lower()); use crate::lattice::Lattice; @@ -471,15 +462,15 @@ pub mod val_batch { while lower < upper { self.stash_updates_for_val(source, lower); if self.staging.seal(&mut self.result.upds) { - self.result.vals.vals.push(source.vals.get_abs(lower)); + self.result.vals.vals.push_ref(source.vals.get_abs(lower)); } lower += 1; } // If we have pushed any values, copy the key as well. if self.result.vals.vals.len() > init_vals { - self.result.keys.push(source.keys.index(cursor)); - self.result.vals.offs.push(self.result.vals.vals.len()); + self.result.keys.push_ref(source.keys.index(cursor)); + self.result.vals.offs.push_ref(self.result.vals.vals.len()); } } /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors. @@ -498,8 +489,8 @@ pub mod val_batch { let (lower1, upper1) = source1.vals.bounds(self.key_cursor1); let (lower2, upper2) = source2.vals.bounds(self.key_cursor2); if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) { - self.result.keys.push(source1.keys.index(self.key_cursor1)); - self.result.vals.offs.push(off); + self.result.keys.push_ref(source1.keys.index(self.key_cursor1)); + self.result.vals.offs.push_ref(off); } // Increment cursors in either case; the keys are merged. self.key_cursor1 += 1; @@ -532,7 +523,7 @@ pub mod val_batch { // Extend stash by updates, with logical compaction applied. self.stash_updates_for_val(source1, lower1); if self.staging.seal(&mut self.result.upds) { - self.result.vals.vals.push(source1.vals.get_abs(lower1)); + self.result.vals.vals.push_ref(source1.vals.get_abs(lower1)); } lower1 += 1; }, @@ -540,7 +531,7 @@ pub mod val_batch { self.stash_updates_for_val(source1, lower1); self.stash_updates_for_val(source2, lower2); if self.staging.seal(&mut self.result.upds) { - self.result.vals.vals.push(source1.vals.get_abs(lower1)); + self.result.vals.vals.push_ref(source1.vals.get_abs(lower1)); } lower1 += 1; lower2 += 1; @@ -549,7 +540,7 @@ pub mod val_batch { // Extend stash by updates, with logical compaction applied. self.stash_updates_for_val(source2, lower2); if self.staging.seal(&mut self.result.upds) { - self.result.vals.vals.push(source2.vals.get_abs(lower2)); + self.result.vals.vals.push_ref(source2.vals.get_abs(lower2)); } lower2 += 1; }, @@ -559,14 +550,14 @@ pub mod val_batch { while lower1 < upper1 { self.stash_updates_for_val(source1, lower1); if self.staging.seal(&mut self.result.upds) { - self.result.vals.vals.push(source1.vals.get_abs(lower1)); + self.result.vals.vals.push_ref(source1.vals.get_abs(lower1)); } lower1 += 1; } while lower2 < upper2 { self.stash_updates_for_val(source2, lower2); if self.staging.seal(&mut self.result.upds) { - self.result.vals.vals.push(source2.vals.get_abs(lower2)); + self.result.vals.vals.push_ref(source2.vals.get_abs(lower2)); } lower2 += 1; } @@ -586,9 +577,9 @@ pub mod val_batch { // NB: Here is where we would need to look back if `lower == upper`. let (time, diff) = source.upds.get_abs(i); use crate::lattice::Lattice; - let mut new_time: ::Time = time.into_owned(); + let mut new_time: layout::Time = L::TimeContainer::into_owned(time); new_time.advance_by(self.description.since().borrow()); - self.staging.push(new_time, diff.into_owned()); + self.staging.push(new_time, L::DiffContainer::into_owned(diff)); } } } @@ -605,12 +596,16 @@ pub mod val_batch { impl Cursor for OrdValCursor { - type Key<'a> = ::ReadItem<'a>; - type Val<'a> = ::ReadItem<'a>; - type Time = ::Time; - type TimeGat<'a> = ::ReadItem<'a>; - type Diff = ::Diff; - type DiffGat<'a> = ::ReadItem<'a>; + type Key<'a> = layout::KeyRef<'a, L>; + type Val<'a> = layout::ValRef<'a, L>; + type Time = layout::Time; + type TimeGat<'a> = layout::TimeRef<'a, L>; + type Diff = layout::Diff; + type DiffGat<'a> = layout::DiffRef<'a, L>; + + #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { L::TimeContainer::into_owned(time) } + #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { L::TimeContainer::clone_onto(time, onto) } + #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { L::DiffContainer::into_owned(diff) } type Storage = OrdValBatch; @@ -679,11 +674,11 @@ pub mod val_batch { KeyContainer: PushInto>, ValContainer: PushInto>, >, - CI: for<'a> BuilderInput::Time, Diff=::Diff>, + CI: for<'a> BuilderInput, Diff=layout::Diff>, { type Input = CI; - type Time = ::Time; + type Time = layout::Time; type Output = OrdValBatch; fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { @@ -705,8 +700,8 @@ pub mod val_batch { // Pre-load the first update. if self.result.keys.is_empty() { - self.result.vals.vals.push(val); - self.result.keys.push(key); + self.result.vals.vals.push_into(val); + self.result.keys.push_into(key); self.staging.push(time, diff); } // Perhaps this is a continuation of an already received key. @@ -718,15 +713,15 @@ pub mod val_batch { // New value; complete representation of prior value. self.staging.seal(&mut self.result.upds); self.staging.push(time, diff); - self.result.vals.vals.push(val); + self.result.vals.vals.push_into(val); } } else { // New key; complete representation of prior key. self.staging.seal(&mut self.result.upds); self.staging.push(time, diff); - self.result.vals.offs.push(self.result.vals.vals.len()); - self.result.vals.vals.push(val); - self.result.keys.push(key); + self.result.vals.offs.push_ref(self.result.vals.vals.len()); + self.result.vals.vals.push_into(val); + self.result.keys.push_into(key); } } } @@ -734,7 +729,7 @@ pub mod val_batch { #[inline(never)] fn done(mut self, description: Description) -> OrdValBatch { self.staging.seal(&mut self.result.upds); - self.result.vals.offs.push(self.result.vals.vals.len()); + self.result.vals.offs.push_ref(self.result.vals.vals.len()); OrdValBatch { updates: self.staging.total(), storage: self.result, @@ -764,9 +759,9 @@ pub mod key_batch { use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; use crate::trace::implementations::{BatchContainer, BuilderInput}; - use crate::IntoOwned; + use crate::trace::implementations::layout; - use super::{Layout, Update, Upds, layers::UpdsBuilder}; + use super::{Layout, Upds, layers::UpdsBuilder}; /// An immutable collection of update tuples, from a contiguous interval of logical times. #[derive(Debug, Serialize, Deserialize)] @@ -798,7 +793,7 @@ pub mod key_batch { /// The updates themselves. pub storage: OrdKeyStorage, /// Description of the update times this layer represents. - pub description: Description<::Time>, + pub description: Description>, /// The number of updates reflected in the batch. /// /// We track this separately from `storage` because due to the singleton optimization, @@ -809,12 +804,12 @@ pub mod key_batch { impl BatchReader for OrdKeyBatch { - type Key<'a> = ::ReadItem<'a>; + type Key<'a> = layout::KeyRef<'a, L>; type Val<'a> = &'a (); - type Time = ::Time; - type TimeGat<'a> = ::ReadItem<'a>; - type Diff = ::Diff; - type DiffGat<'a> = ::ReadItem<'a>; + type Time = layout::Time; + type TimeGat<'a> = layout::TimeRef<'a, L>; + type Diff = layout::Diff; + type DiffGat<'a> = layout::DiffRef<'a, L>; type Cursor = OrdKeyCursor; fn cursor(&self) -> Self::Cursor { @@ -829,13 +824,13 @@ pub mod key_batch { // Perhaps we should count such exceptions to the side, to provide a correct accounting. self.updates } - fn description(&self) -> &Description<::Time> { &self.description } + fn description(&self) -> &Description> { &self.description } } impl Batch for OrdKeyBatch { type Merger = OrdKeyMerger; - fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<::Time>) -> Self::Merger { + fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef>) -> Self::Merger { OrdKeyMerger::new(self, other, compaction_frontier) } @@ -861,7 +856,7 @@ pub mod key_batch { /// result that we are currently assembling. result: OrdKeyStorage, /// description - description: Description<::Time>, + description: Description>, /// Local stash of updates, to use for consolidation. staging: UpdsBuilder, @@ -869,9 +864,9 @@ pub mod key_batch { impl Merger> for OrdKeyMerger where - OrdKeyBatch: Batch::Time>, + OrdKeyBatch: Batch>, { - fn new(batch1: &OrdKeyBatch, batch2: &OrdKeyBatch, compaction_frontier: AntichainRef<::Time>) -> Self { + fn new(batch1: &OrdKeyBatch, batch2: &OrdKeyBatch, compaction_frontier: AntichainRef>) -> Self { assert!(batch1.upper() == batch2.lower()); use crate::lattice::Lattice; @@ -943,7 +938,7 @@ pub mod key_batch { fn copy_key(&mut self, source: &OrdKeyStorage, cursor: usize) { self.stash_updates_for_key(source, cursor); if self.staging.seal(&mut self.result.upds) { - self.result.keys.push(source.keys.index(cursor)); + self.result.keys.push_ref(source.keys.index(cursor)); } } /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors. @@ -962,7 +957,7 @@ pub mod key_batch { self.stash_updates_for_key(source1, self.key_cursor1); self.stash_updates_for_key(source2, self.key_cursor2); if self.staging.seal(&mut self.result.upds) { - self.result.keys.push(source1.keys.index(self.key_cursor1)); + self.result.keys.push_ref(source1.keys.index(self.key_cursor1)); } // Increment cursors in either case; the keys are merged. self.key_cursor1 += 1; @@ -982,9 +977,9 @@ pub mod key_batch { // NB: Here is where we would need to look back if `lower == upper`. let (time, diff) = source.upds.get_abs(i); use crate::lattice::Lattice; - let mut new_time = time.into_owned(); + let mut new_time = L::TimeContainer::into_owned(time); new_time.advance_by(self.description.since().borrow()); - self.staging.push(new_time, diff.into_owned()); + self.staging.push(new_time, L::DiffContainer::into_owned(diff)); } } } @@ -1000,12 +995,16 @@ pub mod key_batch { } impl Cursor for OrdKeyCursor { - type Key<'a> = ::ReadItem<'a>; + type Key<'a> = layout::KeyRef<'a, L>; type Val<'a> = &'a (); - type Time = ::Time; - type TimeGat<'a> = ::ReadItem<'a>; - type Diff = ::Diff; - type DiffGat<'a> = ::ReadItem<'a>; + type Time = layout::Time; + type TimeGat<'a> = layout::TimeRef<'a, L>; + type Diff = layout::Diff; + type DiffGat<'a> = layout::DiffRef<'a, L>; + + #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { L::TimeContainer::into_owned(time) } + #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { L::TimeContainer::clone_onto(time, onto) } + #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { L::DiffContainer::into_owned(diff) } type Storage = OrdKeyBatch; @@ -1066,11 +1065,11 @@ pub mod key_batch { impl Builder for OrdKeyBuilder where L: for<'a> Layout>>, - CI: BuilderInput::Time, Diff=::Diff>, + CI: BuilderInput, Diff=layout::Diff>, { type Input = CI; - type Time = ::Time; + type Time = layout::Time; type Output = OrdKeyBatch; fn with_capacity(keys: usize, _vals: usize, upds: usize) -> Self { @@ -1089,7 +1088,7 @@ pub mod key_batch { for item in chunk.drain() { let (key, _val, time, diff) = CI::into_parts(item); if self.result.keys.is_empty() { - self.result.keys.push(key); + self.result.keys.push_into(key); self.staging.push(time, diff); } // Perhaps this is a continuation of an already received key. @@ -1098,7 +1097,7 @@ pub mod key_batch { } else { self.staging.seal(&mut self.result.upds); self.staging.push(time, diff); - self.result.keys.push(key); + self.result.keys.push_into(key); } } } diff --git a/differential-dataflow/src/trace/implementations/rhh.rs b/differential-dataflow/src/trace/implementations/rhh.rs index 911697502..e844ebadf 100644 --- a/differential-dataflow/src/trace/implementations/rhh.rs +++ b/differential-dataflow/src/trace/implementations/rhh.rs @@ -1,7 +1,7 @@ //! Batch implementation based on Robin Hood Hashing. -//! +//! //! Items are ordered by `(hash(Key), Key)` rather than `Key`, which means -//! that these implementations should only be used with each other, under +//! that these implementations should only be used with each other, under //! the same `hash` function, or for types that also order by `(hash(X), X)`, //! for example wrapped types that implement `Ord` that way. @@ -17,7 +17,7 @@ use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColM use crate::trace::implementations::spine_fueled::Spine; use crate::trace::rc_blanket_impls::RcBuilder; -use super::{Update, Layout, Vector, TStack}; +use super::{Layout, Vector, TStack}; use self::val_batch::{RhhValBatch, RhhValBuilder}; @@ -92,28 +92,28 @@ mod val_batch { use crate::hashable::Hashable; use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; use crate::trace::implementations::{BatchContainer, BuilderInput}; - use crate::IntoOwned; + use crate::trace::implementations::layout; - use super::{Layout, Update, HashOrdered}; + use super::{Layout, HashOrdered}; /// Update tuples organized as a Robin Hood Hash map, ordered by `(hash(Key), Key, Val, Time)`. /// - /// Specifically, this means that we attempt to place any `Key` at `alloc_len * (hash(Key) / 2^64)`, - /// and spill onward if the slot is occupied. The cleverness of RHH is that you may instead evict - /// someone else, in order to maintain the ordering up above. In fact, that is basically the rule: + /// Specifically, this means that we attempt to place any `Key` at `alloc_len * (hash(Key) / 2^64)`, + /// and spill onward if the slot is occupied. The cleverness of RHH is that you may instead evict + /// someone else, in order to maintain the ordering up above. In fact, that is basically the rule: /// when there is a conflict, evict the greater of the two and attempt to place it in the next slot. /// /// This RHH implementation uses a repeated `keys_offs` offset to indicate an absent element, as all - /// keys for valid updates must have some associated values with updates. This is the same type of + /// keys for valid updates must have some associated values with updates. This is the same type of /// optimization made for repeated updates, and it rules out (here) using that trick for repeated values. /// - /// We will use the `Hashable` trait here, but any consistent hash function should work out ok. + /// We will use the `Hashable` trait here, but any consistent hash function should work out ok. /// We specifically want to use the highest bits of the result (we will) because the low bits have /// likely been spent shuffling the data between workers (by key), and are likely low entropy. #[derive(Debug, Serialize, Deserialize)] - pub struct RhhValStorage - where - ::Key: Default + HashOrdered, + pub struct RhhValStorage + where + layout::Key: Default + HashOrdered, { /// The requested capacity for `keys`. We use this when determining where a key with a certain hash @@ -123,7 +123,7 @@ mod val_batch { /// A number large enough that when it divides any `u64` the result is at most `self.key_capacity`. /// When that capacity is zero or one, this is set to zero instead. pub divisor: u64, - /// The number of present keys, distinct from `keys.len()` which contains + /// The number of present keys, distinct from `keys.len()` which contains pub key_count: usize, /// An ordered list of keys, corresponding to entries in `keys_offs`. @@ -150,9 +150,9 @@ mod val_batch { } impl RhhValStorage - where - ::Key: Default + HashOrdered, - for<'a> ::ReadItem<'a>: HashOrdered, + where + layout::Key: Default + HashOrdered, + for<'a> layout::KeyRef<'a, L>: HashOrdered, { /// Lower and upper bounds in `self.vals` corresponding to the key at `index`. fn values_for_key(&self, index: usize) -> (usize, usize) { @@ -184,22 +184,22 @@ mod val_batch { /// If `offset` is specified, we will insert it at the appropriate location. If it is not specified, /// we leave `keys_offs` ready to receive it as the next `push`. This is so that builders that may /// not know the final offset at the moment of key insertion can prepare for receiving the offset. - fn insert_key(&mut self, key: ::ReadItem<'_>, offset: Option) { + fn insert_key(&mut self, key: layout::KeyRef<'_, L>, offset: Option) { let desired = self.desired_location(&key); - // Were we to push the key now, it would be at `self.keys.len()`, so while that is wrong, + // Were we to push the key now, it would be at `self.keys.len()`, so while that is wrong, // push additional blank entries in. while self.keys.len() < desired { // We insert a default (dummy) key and repeat the offset to indicate this. let current_offset = self.keys_offs.index(self.keys.len()); - self.keys.push(<::Key as Default>::default()); - self.keys_offs.push(current_offset); + self.keys.push_own( as Default>::default()); + self.keys_offs.push_ref(current_offset); } // Now we insert the key. Even if it is no longer the desired location because of contention. // If an offset has been supplied we insert it, and otherwise leave it for future determination. - self.keys.push(key); + self.keys.push_ref(key); if let Some(offset) = offset { - self.keys_offs.push(offset); + self.keys_offs.push_ref(offset); } self.key_count += 1; } @@ -213,7 +213,7 @@ mod val_batch { } /// Returns true if one should advance one's index in the search for `key`. - fn advance_key(&self, index: usize, key: ::ReadItem<'_>) -> bool { + fn advance_key(&self, index: usize, key: layout::KeyRef<'_, L>) -> bool { // Ideally this short-circuits, as `self.keys[index]` is bogus data. !self.live_key(index) || self.keys.index(index).lt(&::reborrow(key)) } @@ -258,35 +258,35 @@ mod val_batch { L::DiffContainer: Serialize + for<'a> Deserialize<'a>, ")] pub struct RhhValBatch - where - ::Key: Default + HashOrdered, + where + layout::Key: Default + HashOrdered, { /// The updates themselves. pub storage: RhhValStorage, /// Description of the update times this layer represents. - pub description: Description<::Time>, + pub description: Description>, /// The number of updates reflected in the batch. /// /// We track this separately from `storage` because due to the singleton optimization, - /// we may have many more updates than `storage.updates.len()`. It should equal that + /// we may have many more updates than `storage.updates.len()`. It should equal that /// length, plus the number of singleton optimizations employed. pub updates: usize, } - impl BatchReader for RhhValBatch - where - ::Key: Default + HashOrdered, - for<'a> ::ReadItem<'a>: HashOrdered, + impl BatchReader for RhhValBatch + where + layout::Key: Default + HashOrdered, + for<'a> layout::KeyRef<'a, L>: HashOrdered, { - type Key<'a> = ::ReadItem<'a>; - type Val<'a> = ::ReadItem<'a>; - type Time = ::Time; - type TimeGat<'a> = ::ReadItem<'a>; - type Diff = ::Diff; - type DiffGat<'a> = ::ReadItem<'a>; + type Key<'a> = layout::KeyRef<'a, L>; + type Val<'a> = layout::ValRef<'a, L>; + type Time = layout::Time; + type TimeGat<'a> = layout::TimeRef<'a, L>; + type Diff = layout::Diff; + type DiffGat<'a> = layout::DiffRef<'a, L>; type Cursor = RhhValCursor; - fn cursor(&self) -> Self::Cursor { + fn cursor(&self) -> Self::Cursor { let mut cursor = RhhValCursor { key_cursor: 0, val_cursor: 0, @@ -295,22 +295,22 @@ mod val_batch { cursor.step_key(self); cursor } - fn len(&self) -> usize { + fn len(&self) -> usize { // Normally this would be `self.updates.len()`, but we have a clever compact encoding. // Perhaps we should count such exceptions to the side, to provide a correct accounting. self.updates } - fn description(&self) -> &Description<::Time> { &self.description } + fn description(&self) -> &Description> { &self.description } } - impl Batch for RhhValBatch - where - ::Key: Default + HashOrdered, - for<'a> ::ReadItem<'a>: HashOrdered, + impl Batch for RhhValBatch + where + layout::Key: Default + HashOrdered, + for<'a> layout::KeyRef<'a, L>: HashOrdered, { type Merger = RhhValMerger; - fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<::Time>) -> Self::Merger { + fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef>) -> Self::Merger { RhhValMerger::new(self, other, compaction_frontier) } @@ -335,9 +335,9 @@ mod val_batch { } /// State for an in-progress merge. - pub struct RhhValMerger - where - ::Key: Default + HashOrdered, + pub struct RhhValMerger + where + layout::Key: Default + HashOrdered, { /// Key position to merge next in the first batch. key_cursor1: usize, @@ -346,24 +346,24 @@ mod val_batch { /// result that we are currently assembling. result: RhhValStorage, /// description - description: Description<::Time>, + description: Description>, /// Local stash of updates, to use for consolidation. /// /// We could emulate a `ChangeBatch` here, with related compaction smarts. /// A `ChangeBatch` itself needs an `i64` diff type, which we have not. - update_stash: Vec<(::Time, ::Diff)>, + update_stash: Vec<(layout::Time, layout::Diff)>, /// Counts the number of singleton-optimized entries, that we may correctly count the updates. singletons: usize, } impl Merger> for RhhValMerger where - ::Key: Default + HashOrdered, - RhhValBatch: Batch::Time>, - for<'a> ::ReadItem<'a>: HashOrdered, + layout::Key: Default + HashOrdered, + RhhValBatch: Batch>, + for<'a> layout::KeyRef<'a, L>: HashOrdered, { - fn new(batch1: &RhhValBatch, batch2: &RhhValBatch, compaction_frontier: AntichainRef<::Time>) -> Self { + fn new(batch1: &RhhValBatch, batch2: &RhhValBatch, compaction_frontier: AntichainRef>) -> Self { assert!(batch1.upper() == batch2.lower()); use crate::lattice::Lattice; @@ -394,9 +394,9 @@ mod val_batch { // Mark explicit types because type inference fails to resolve it. let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs; - keys_offs.push(0); + keys_offs.push_ref(0); let vals_offs: &mut L::OffsetContainer = &mut storage.vals_offs; - vals_offs.push(0); + vals_offs.push_ref(0); RhhValMerger { key_cursor1: 0, @@ -432,7 +432,7 @@ mod val_batch { effort = (self.result.times.len() - starting_updates) as isize; } - // Merging is complete, and only copying remains. + // Merging is complete, and only copying remains. // Key-by-key copying allows effort interruption, and compaction. while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel { self.copy_key(&source1.storage, self.key_cursor1); @@ -452,17 +452,17 @@ mod val_batch { } // Helper methods in support of merging batches. - impl RhhValMerger - where - ::Key: Default + HashOrdered, - for<'a> ::ReadItem<'a>: HashOrdered, + impl RhhValMerger + where + layout::Key: Default + HashOrdered, + for<'a> layout::KeyRef<'a, L>: HashOrdered, { /// Copy the next key in `source`. /// /// The method extracts the key in `source` at `cursor`, and merges it in to `self`. /// If the result does not wholly cancel, they key will be present in `self` with the - /// compacted values and updates. - /// + /// compacted values and updates. + /// /// The caller should be certain to update the cursor, as this method does not do this. fn copy_key(&mut self, source: &RhhValStorage, cursor: usize) { // Capture the initial number of values to determine if the merge was ultimately non-empty. @@ -471,16 +471,16 @@ mod val_batch { while lower < upper { self.stash_updates_for_val(source, lower); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.push(off); - self.result.vals.push(source.vals.index(lower)); + self.result.vals_offs.push_ref(off); + self.result.vals.push_ref(source.vals.index(lower)); } lower += 1; - } + } // If we have pushed any values, copy the key as well. if self.result.vals.len() > init_vals { self.result.insert_key(source.keys.index(cursor), Some(self.result.vals.len())); - } + } } /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors. /// @@ -490,7 +490,7 @@ mod val_batch { use ::std::cmp::Ordering; match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) { - Ordering::Less => { + Ordering::Less => { self.copy_key(source1, self.key_cursor1); self.key_cursor1 += 1; }, @@ -516,8 +516,8 @@ mod val_batch { /// If the compacted result contains values with non-empty updates, the function returns /// an offset that should be recorded to indicate the upper extent of the result values. fn merge_vals( - &mut self, - (source1, mut lower1, upper1): (&RhhValStorage, usize, usize), + &mut self, + (source1, mut lower1, upper1): (&RhhValStorage, usize, usize), (source2, mut lower2, upper2): (&RhhValStorage, usize, usize), ) -> Option { // Capture the initial number of values to determine if the merge was ultimately non-empty. @@ -528,12 +528,12 @@ mod val_batch { // We could multi-way merge and it wouldn't be very complicated. use ::std::cmp::Ordering; match source1.vals.index(lower1).cmp(&source2.vals.index(lower2)) { - Ordering::Less => { + Ordering::Less => { // Extend stash by updates, with logical compaction applied. self.stash_updates_for_val(source1, lower1); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.push(off); - self.result.vals.push(source1.vals.index(lower1)); + self.result.vals_offs.push_ref(off); + self.result.vals.push_ref(source1.vals.index(lower1)); } lower1 += 1; }, @@ -541,18 +541,18 @@ mod val_batch { self.stash_updates_for_val(source1, lower1); self.stash_updates_for_val(source2, lower2); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.push(off); - self.result.vals.push(source1.vals.index(lower1)); + self.result.vals_offs.push_ref(off); + self.result.vals.push_ref(source1.vals.index(lower1)); } lower1 += 1; lower2 += 1; }, - Ordering::Greater => { + Ordering::Greater => { // Extend stash by updates, with logical compaction applied. self.stash_updates_for_val(source2, lower2); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.push(off); - self.result.vals.push(source2.vals.index(lower2)); + self.result.vals_offs.push_ref(off); + self.result.vals.push_ref(source2.vals.index(lower2)); } lower2 += 1; }, @@ -562,16 +562,16 @@ mod val_batch { while lower1 < upper1 { self.stash_updates_for_val(source1, lower1); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.push(off); - self.result.vals.push(source1.vals.index(lower1)); + self.result.vals_offs.push_ref(off); + self.result.vals.push_ref(source1.vals.index(lower1)); } lower1 += 1; } while lower2 < upper2 { self.stash_updates_for_val(source2, lower2); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.push(off); - self.result.vals.push(source2.vals.index(lower2)); + self.result.vals_offs.push_ref(off); + self.result.vals.push_ref(source2.vals.index(lower2)); } lower2 += 1; } @@ -591,10 +591,10 @@ mod val_batch { // NB: Here is where we would need to look back if `lower == upper`. let time = source.times.index(i); let diff = source.diffs.index(i); - let mut new_time = time.into_owned(); + let mut new_time = L::TimeContainer::into_owned(time); use crate::lattice::Lattice; new_time.advance_by(self.description.since().borrow()); - self.update_stash.push((new_time, diff.into_owned())); + self.update_stash.push((new_time, L::DiffContainer::into_owned(diff))); } } @@ -607,8 +607,8 @@ mod val_batch { // we push nothing and report an unincremented offset to encode this case. let time_diff = self.result.times.last().zip(self.result.diffs.last()); let last_eq = self.update_stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| { - let t1 = <::ReadItem<'_> as IntoOwned>::borrow_as(t1); - let d1 = <::ReadItem<'_> as IntoOwned>::borrow_as(d1); + let t1 = L::TimeContainer::borrow_as(t1); + let d1 = L::DiffContainer::borrow_as(d1); t1.eq(&t2) && d1.eq(&d2) }); if self.update_stash.len() == 1 && last_eq.unwrap_or(false) { @@ -619,8 +619,8 @@ mod val_batch { else { // Conventional; move `update_stash` into `updates`. for (time, diff) in self.update_stash.drain(..) { - self.result.times.push(time); - self.result.diffs.push(diff); + self.result.times.push_own(time); + self.result.diffs.push_own(diff); } } Some(self.result.times.len()) @@ -638,9 +638,9 @@ mod val_batch { /// Importantly, we should skip over invalid keys, rather than report them as /// invalid through `key_valid`: that method is meant to indicate the end of /// the cursor, rather than internal state. - pub struct RhhValCursor - where - ::Key: Default + HashOrdered, + pub struct RhhValCursor + where + layout::Key: Default + HashOrdered, { /// Absolute position of the current key. key_cursor: usize, @@ -650,17 +650,21 @@ mod val_batch { phantom: PhantomData, } - impl Cursor for RhhValCursor - where - ::Key: Default + HashOrdered, - for<'a> ::ReadItem<'a>: HashOrdered, + impl Cursor for RhhValCursor + where + layout::Key: Default + HashOrdered, + for<'a> layout::KeyRef<'a, L>: HashOrdered, { - type Key<'a> = ::ReadItem<'a>; - type Val<'a> = ::ReadItem<'a>; - type Time = ::Time; - type TimeGat<'a> = ::ReadItem<'a>; - type Diff = ::Diff; - type DiffGat<'a> = ::ReadItem<'a>; + type Key<'a> = layout::KeyRef<'a, L>; + type Val<'a> = layout::ValRef<'a, L>; + type Time = layout::Time; + type TimeGat<'a> = layout::TimeRef<'a, L>; + type Diff = layout::Diff; + type DiffGat<'a> = layout::DiffRef<'a, L>; + + #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { L::TimeContainer::into_owned(time) } + #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { L::TimeContainer::clone_onto(time, onto) } + #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { L::DiffContainer::into_owned(diff) } type Storage = RhhValBatch; @@ -711,7 +715,7 @@ mod val_batch { } } fn step_val(&mut self, storage: &RhhValBatch) { - self.val_cursor += 1; + self.val_cursor += 1; if !self.val_valid(storage) { self.val_cursor = storage.storage.values_for_key(self.key_cursor).1; } @@ -734,11 +738,11 @@ mod val_batch { /// A builder for creating layers from unsorted update tuples. pub struct RhhValBuilder - where - ::Key: Default + HashOrdered, + where + layout::Key: Default + HashOrdered, { result: RhhValStorage, - singleton: Option<(::Time, ::Diff)>, + singleton: Option<(layout::Time, layout::Diff)>, /// Counts the number of singleton optimizations we performed. /// /// This number allows us to correctly gauge the total number of updates reflected in a batch, @@ -748,8 +752,8 @@ mod val_batch { } impl RhhValBuilder - where - ::Key: Default + HashOrdered, + where + layout::Key: Default + HashOrdered, { /// Pushes a single update, which may set `self.singleton` rather than push. /// @@ -762,10 +766,10 @@ mod val_batch { /// previously pushed update exactly. In that case, we do not push the update into `updates`. /// The update tuple is retained in `self.singleton` in case we see another update and need /// to recover the singleton to push it into `updates` to join the second update. - fn push_update(&mut self, time: ::Time, diff: ::Diff) { + fn push_update(&mut self, time: layout::Time, diff: layout::Diff) { // If a just-pushed update exactly equals `(time, diff)` we can avoid pushing it. - let t1 = <::ReadItem<'_> as IntoOwned>::borrow_as(&time); - let d1 = <::ReadItem<'_> as IntoOwned>::borrow_as(&diff); + let t1 = L::TimeContainer::borrow_as(&time); + let d1 = L::DiffContainer::borrow_as(&diff); if self.result.times.last().map(|t| t == t1).unwrap_or(false) && self.result.diffs.last().map(|d| d == d1).unwrap_or(false) { assert!(self.singleton.is_none()); self.singleton = Some((time, diff)); @@ -773,38 +777,38 @@ mod val_batch { else { // If we have pushed a single element, we need to copy it out to meet this one. if let Some((time, diff)) = self.singleton.take() { - self.result.times.push(time); - self.result.diffs.push(diff); + self.result.times.push_own(time); + self.result.diffs.push_own(diff); } - self.result.times.push(time); - self.result.diffs.push(diff); + self.result.times.push_own(time); + self.result.diffs.push_own(diff); } } } impl Builder for RhhValBuilder where - ::Key: Default + HashOrdered, - CI: for<'a> BuilderInput = ::Key, Time=::Time, Diff=::Diff>, + layout::Key: Default + HashOrdered, + CI: for<'a> BuilderInput = layout::Key, Time=layout::Time, Diff=layout::Diff>, for<'a> L::ValContainer: PushInto>, - for<'a> ::ReadItem<'a>: HashOrdered + IntoOwned<'a, Owned = ::Key>, + for<'a> layout::KeyRef<'a, L>: HashOrdered, { type Input = CI; - type Time = ::Time; + type Time = layout::Time; type Output = RhhValBatch; fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { // Double the capacity for RHH; probably excessive. let rhh_capacity = 2 * keys; - let divisor = RhhValStorage::::divisor_for_capacity(rhh_capacity); + let divisor = RhhValStorage::::divisor_for_capacity(rhh_capacity); // We want some additive slop, in case we spill over. // This number magically chosen based on nothing in particular. // Worst case, we will re-alloc and copy if we spill beyond this. let keys = rhh_capacity + 10; // We don't introduce zero offsets as they will be introduced by the first `push` call. - Self { + Self { result: RhhValStorage { keys: L::KeyContainer::with_capacity(keys), keys_offs: L::OffsetContainer::with_capacity(keys + 1), @@ -833,20 +837,20 @@ mod val_batch { self.push_update(time, diff); } else { // New value; complete representation of prior value. - self.result.vals_offs.push(self.result.times.len()); + self.result.vals_offs.push_ref(self.result.times.len()); if self.singleton.take().is_some() { self.singletons += 1; } self.push_update(time, diff); - self.result.vals.push(val); + self.result.vals.push_into(val); } } else { // New key; complete representation of prior key. - self.result.vals_offs.push(self.result.times.len()); + self.result.vals_offs.push_ref(self.result.times.len()); if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push(self.result.vals.len()); + self.result.keys_offs.push_ref(self.result.vals.len()); self.push_update(time, diff); - self.result.vals.push(val); + self.result.vals.push_into(val); // Insert the key, but with no specified offset. - self.result.insert_key(IntoOwned::borrow_as(&key), None); + self.result.insert_key(L::KeyContainer::borrow_as(&key), None); } } } @@ -854,10 +858,10 @@ mod val_batch { #[inline(never)] fn done(mut self, description: Description) -> RhhValBatch { // Record the final offsets - self.result.vals_offs.push(self.result.times.len()); + self.result.vals_offs.push_ref(self.result.times.len()); // Remove any pending singleton, and if it was set increment our count. if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push(self.result.vals.len()); + self.result.keys_offs.push_ref(self.result.vals.len()); RhhValBatch { updates: self.result.times.len() + self.singletons, storage: self.result, diff --git a/differential-dataflow/src/trace/mod.rs b/differential-dataflow/src/trace/mod.rs index 86e6c8503..82c3dd6c3 100644 --- a/differential-dataflow/src/trace/mod.rs +++ b/differential-dataflow/src/trace/mod.rs @@ -17,7 +17,6 @@ use timely::progress::Timestamp; use crate::logging::Logger; use crate::difference::Semigroup; -use crate::IntoOwned; use crate::lattice::Lattice; pub use self::cursor::Cursor; pub use self::description::Description; @@ -54,11 +53,16 @@ pub trait TraceReader { /// Timestamps associated with updates type Time: Timestamp + Lattice + Ord + Clone; /// Borrowed form of timestamp. - type TimeGat<'a>: Copy + IntoOwned<'a, Owned = Self::Time>; + type TimeGat<'a>: Copy; /// Owned form of update difference. type Diff: Semigroup + 'static; /// Borrowed form of update difference. - type DiffGat<'a> : Copy + IntoOwned<'a, Owned = Self::Diff>; + type DiffGat<'a> : Copy; + + /// An owned copy of a reference to a time. + #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { Self::Cursor::owned_time(time) } + /// An owned copy of a reference to a diff. + #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { Self::Cursor::owned_diff(diff) } /// The type of an immutable collection of updates. type Batch: for<'a> BatchReader = Self::Key<'a>, Val<'a> = Self::Val<'a>, Time = Self::Time, TimeGat<'a> = Self::TimeGat<'a>, Diff = Self::Diff, DiffGat<'a> = Self::DiffGat<'a>>+Clone+'static; @@ -194,7 +198,7 @@ pub trait Trace : TraceReader { /// Sets the logic for exertion in the absence of updates. /// /// The function receives an iterator over batch levels, from large to small, as triples `(level, count, length)`, - /// indicating the level, the number of batches, and their total length in updates. It should return a number of + /// indicating the level, the number of batches, and their total length in updates. It should return a number of /// updates to perform, or `None` if no work is required. fn set_exert_logic(&mut self, logic: ExertionLogic); @@ -229,11 +233,16 @@ pub trait BatchReader : Sized { /// Timestamps associated with updates type Time: Timestamp + Lattice + Ord + Clone; /// Borrowed form of timestamp. - type TimeGat<'a>: Copy + IntoOwned<'a, Owned = Self::Time>; + type TimeGat<'a>: Copy; /// Owned form of update difference. type Diff: Semigroup + 'static; /// Borrowed form of update difference. - type DiffGat<'a> : Copy + IntoOwned<'a, Owned = Self::Diff>; + type DiffGat<'a> : Copy; + + /// An owned copy of a reference to a time. + #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { Self::Cursor::owned_time(time) } + /// An owned copy of a reference to a diff. + #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { Self::Cursor::owned_diff(diff) } /// The type used to enumerate the batch's contents. type Cursor: for<'a> Cursor = Self::Key<'a>, Val<'a> = Self::Val<'a>, Time = Self::Time, TimeGat<'a> = Self::TimeGat<'a>, Diff = Self::Diff, DiffGat<'a> = Self::DiffGat<'a>>; @@ -391,6 +400,10 @@ pub mod rc_blanket_impls { type Diff = C::Diff; type DiffGat<'a> = C::DiffGat<'a>; + #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { C::owned_time(time) } + #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { C::clone_time_onto(time, onto) } + #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } + type Storage = Rc; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } diff --git a/differential-dataflow/src/trace/wrappers/enter.rs b/differential-dataflow/src/trace/wrappers/enter.rs index 6c80f380d..72c488fbd 100644 --- a/differential-dataflow/src/trace/wrappers/enter.rs +++ b/differential-dataflow/src/trace/wrappers/enter.rs @@ -175,6 +175,10 @@ where type Diff = C::Diff; type DiffGat<'a> = C::DiffGat<'a>; + #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { time.clone() } + #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { onto.clone_from(time) } + #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } + type Storage = C::Storage; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } @@ -188,9 +192,8 @@ where #[inline] fn map_times)>(&mut self, storage: &Self::Storage, mut logic: L) { - use crate::IntoOwned; self.cursor.map_times(storage, |time, diff| { - logic(&TInner::to_inner(time.into_owned()), diff) + logic(&TInner::to_inner(C::owned_time(time)), diff) }) } @@ -232,6 +235,10 @@ where type Diff = C::Diff; type DiffGat<'a> = C::DiffGat<'a>; + #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { time.clone() } + #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { onto.clone_from(time) } + #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } + type Storage = BatchEnter; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) } @@ -245,9 +252,8 @@ where #[inline] fn map_times)>(&mut self, storage: &Self::Storage, mut logic: L) { - use crate::IntoOwned; self.cursor.map_times(&storage.batch, |time, diff| { - logic(&TInner::to_inner(time.into_owned()), diff) + logic(&TInner::to_inner(C::owned_time(time)), diff) }) } diff --git a/differential-dataflow/src/trace/wrappers/enter_at.rs b/differential-dataflow/src/trace/wrappers/enter_at.rs index b085d80be..6f9d51e22 100644 --- a/differential-dataflow/src/trace/wrappers/enter_at.rs +++ b/differential-dataflow/src/trace/wrappers/enter_at.rs @@ -202,6 +202,10 @@ where type Diff = C::Diff; type DiffGat<'a> = C::DiffGat<'a>; + #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { time.clone() } + #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { onto.clone_from(time) } + #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } + type Storage = C::Storage; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } @@ -264,6 +268,10 @@ where type Diff = C::Diff; type DiffGat<'a> = C::DiffGat<'a>; + #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { time.clone() } + #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { onto.clone_from(time) } + #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } + type Storage = BatchEnter; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) } diff --git a/differential-dataflow/src/trace/wrappers/filter.rs b/differential-dataflow/src/trace/wrappers/filter.rs index a2877402d..617dafc7b 100644 --- a/differential-dataflow/src/trace/wrappers/filter.rs +++ b/differential-dataflow/src/trace/wrappers/filter.rs @@ -133,6 +133,10 @@ where type Diff = C::Diff; type DiffGat<'a> = C::DiffGat<'a>; + #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { C::owned_time(time) } + #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { C::clone_time_onto(time, onto) } + #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } + type Storage = C::Storage; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } @@ -191,6 +195,10 @@ where type Diff = C::Diff; type DiffGat<'a> = C::DiffGat<'a>; + #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { C::owned_time(time) } + #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { C::clone_time_onto(time, onto) } + #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } + type Storage = BatchFilter; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) } diff --git a/differential-dataflow/src/trace/wrappers/freeze.rs b/differential-dataflow/src/trace/wrappers/freeze.rs index 1b3ae320e..aed9c781e 100644 --- a/differential-dataflow/src/trace/wrappers/freeze.rs +++ b/differential-dataflow/src/trace/wrappers/freeze.rs @@ -188,6 +188,10 @@ where type Diff = C::Diff; type DiffGat<'a> = C::DiffGat<'a>; + #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { time.clone() } + #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { onto.clone_from(time) } + #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } + type Storage = C::Storage; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } @@ -243,6 +247,10 @@ where type Diff = C::Diff; type DiffGat<'a> = C::DiffGat<'a>; + #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { time.clone() } + #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { onto.clone_from(time) } + #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } + type Storage = BatchFreeze; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) } diff --git a/differential-dataflow/src/trace/wrappers/frontier.rs b/differential-dataflow/src/trace/wrappers/frontier.rs index e368e3938..f69dc1f59 100644 --- a/differential-dataflow/src/trace/wrappers/frontier.rs +++ b/differential-dataflow/src/trace/wrappers/frontier.rs @@ -135,6 +135,10 @@ impl Cursor for CursorFrontier { type Diff = C::Diff; type DiffGat<'a> = C::DiffGat<'a>; + #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { time.clone() } + #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { onto.clone_from(time) } + #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } + type Storage = C::Storage; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) } @@ -152,8 +156,7 @@ impl Cursor for CursorFrontier { let until = self.until.borrow(); let mut temp: C::Time = ::minimum(); self.cursor.map_times(storage, |time, diff| { - use crate::IntoOwned; - time.clone_onto(&mut temp); + C::clone_time_onto(time, &mut temp); temp.advance_by(since); if !until.less_equal(&temp) { logic(&temp, diff); @@ -198,6 +201,10 @@ impl> Cursor for BatchCursorFrontier { type Diff = C::Diff; type DiffGat<'a> = C::DiffGat<'a>; + #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { time.clone() } + #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { onto.clone_from(time) } + #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { C::owned_diff(diff) } + type Storage = BatchFrontier; #[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(&storage.batch) } @@ -215,8 +222,7 @@ impl> Cursor for BatchCursorFrontier { let until = self.until.borrow(); let mut temp: C::Time = ::minimum(); self.cursor.map_times(&storage.batch, |time, diff| { - use crate::IntoOwned; - time.clone_onto(&mut temp); + C::clone_time_onto(time, &mut temp); temp.advance_by(since); if !until.less_equal(&temp) { logic(&temp, diff); diff --git a/differential-dataflow/tests/trace.rs b/differential-dataflow/tests/trace.rs index 033da8bf7..54f111a7d 100644 --- a/differential-dataflow/tests/trace.rs +++ b/differential-dataflow/tests/trace.rs @@ -34,11 +34,11 @@ fn test_trace() { let mut trace = get_trace(); let (mut cursor1, storage1) = trace.cursor_through(AntichainRef::new(&[1])).unwrap(); - let vec_1 = cursor1.to_vec(&storage1); + let vec_1 = cursor1.to_vec(&storage1, |k| k.clone(), |v| v.clone()); assert_eq!(vec_1, vec![((1, 2), vec![(0, 1)])]); let (mut cursor2, storage2) = trace.cursor_through(AntichainRef::new(&[2])).unwrap(); - let vec_2 = cursor2.to_vec(&storage2); + let vec_2 = cursor2.to_vec(&storage2, |k| k.clone(), |v| v.clone()); println!("--> {:?}", vec_2); assert_eq!(vec_2, vec![ ((1, 2), vec![(0, 1)]), @@ -46,13 +46,13 @@ fn test_trace() { ]); let (mut cursor3, storage3) = trace.cursor_through(AntichainRef::new(&[3])).unwrap(); - let vec_3 = cursor3.to_vec(&storage3); + let vec_3 = cursor3.to_vec(&storage3, |k| k.clone(), |v| v.clone()); assert_eq!(vec_3, vec![ ((1, 2), vec![(0, 1)]), ((2, 3), vec![(1, 1), (2, -1)]), ]); let (mut cursor4, storage4) = trace.cursor(); - let vec_4 = cursor4.to_vec(&storage4); + let vec_4 = cursor4.to_vec(&storage4, |k| k.clone(), |v| v.clone()); assert_eq!(vec_4, vec_3); } diff --git a/dogsdogsdogs/src/operators/count.rs b/dogsdogsdogs/src/operators/count.rs index 102c65cfa..d2bbbf1cd 100644 --- a/dogsdogsdogs/src/operators/count.rs +++ b/dogsdogsdogs/src/operators/count.rs @@ -4,7 +4,7 @@ use differential_dataflow::{ExchangeData, Collection, Hashable}; use differential_dataflow::difference::{Semigroup, Monoid, Multiply}; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::TraceReader; -use differential_dataflow::trace::cursor::IntoOwned; +use differential_dataflow::IntoOwned; /// Reports a number of extensions to a stream of prefixes. /// @@ -22,6 +22,7 @@ where G: Scope, Tr: for<'a> TraceReader< Key<'a>: IntoOwned<'a, Owned = K>, + TimeGat<'a>: IntoOwned<'a, Owned = Tr::Time>, Diff=isize, >+Clone+'static, for<'a> Tr::Diff : Semigroup>, diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index 17648a194..0ed560232 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -50,7 +50,7 @@ use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::{Cursor, TraceReader}; use differential_dataflow::consolidation::{consolidate, consolidate_updates}; -use differential_dataflow::trace::cursor::IntoOwned; +use differential_dataflow::IntoOwned; /// A binary equijoin that responds to updates on only its first input. /// @@ -86,7 +86,7 @@ where K: Hashable + ExchangeData, V: ExchangeData, R: ExchangeData + Monoid, - Tr: for<'a> TraceReader : IntoOwned<'a, Owned = K>>+Clone+'static, + Tr: for<'a> TraceReader : IntoOwned<'a, Owned = K>, TimeGat<'a> : IntoOwned<'a, Owned = Tr::Time>>+Clone+'static, R: Mul, FF: Fn(&G::Timestamp, &mut Antichain) + 'static, CF: Fn(Tr::TimeGat<'_>, &G::Timestamp) -> bool + 'static, @@ -156,7 +156,7 @@ where K: Hashable + ExchangeData, V: ExchangeData, R: ExchangeData + Monoid, - Tr: for<'a> TraceReader : IntoOwned<'a, Owned = K>>+Clone+'static, + Tr: for<'a> TraceReader : IntoOwned<'a, Owned = K>, TimeGat<'a> : IntoOwned<'a, Owned = Tr::Time>>+Clone+'static, FF: Fn(&G::Timestamp, &mut Antichain) + 'static, CF: Fn(Tr::TimeGat<'_>, &Tr::Time) -> bool + 'static, Y: Fn(std::time::Instant, usize) -> bool + 'static, @@ -271,7 +271,7 @@ where // drop fully processed capabilities. stash.retain(|_,proposals| !proposals.is_empty()); - + for (capability, proposals) in stash_additions.into_iter() { stash.entry(capability).or_insert(Vec::new()).extend(proposals); } @@ -316,7 +316,7 @@ fn process_proposals( ) -> bool where G: Scope, - Tr: for<'a> TraceReader : IntoOwned<'a, Owned = K>>, + Tr: for<'a> TraceReader : IntoOwned<'a, Owned = K>, TimeGat<'a> : IntoOwned<'a, Owned = Tr::Time>>, CF: Fn(Tr::TimeGat<'_>, &Tr::Time) -> bool + 'static, Y: Fn(Instant, usize) -> bool + 'static, S: FnMut(&mut SessionFor, &K, &V, Tr::Val<'_>, &G::Timestamp, &R, &mut Vec<(G::Timestamp, Tr::Diff)>) + 'static, @@ -336,7 +336,7 @@ where // Use TOTAL ORDER to allow the release of `time`. yielded = yielded || yield_function(timer, *work); if !yielded && !frontier.iter().any(|t| comparison( as IntoOwned>::borrow_as(t), initial)) { - use differential_dataflow::trace::cursor::IntoOwned; + use differential_dataflow::IntoOwned; cursor.seek_key(&storage, IntoOwned::borrow_as(key)); if cursor.get_key(&storage) == Some(IntoOwned::borrow_as(key)) { while let Some(val2) = cursor.get_val(&storage) { @@ -344,7 +344,7 @@ where if comparison(t, initial) { let mut t = t.into_owned(); t.join_assign(time); - output_buffer.push((t, d.into_owned())) + output_buffer.push((t, Tr::owned_diff(d))) } }); consolidate(&mut output_buffer); diff --git a/dogsdogsdogs/src/operators/lookup_map.rs b/dogsdogsdogs/src/operators/lookup_map.rs index 54fc11bb2..436f8da7c 100644 --- a/dogsdogsdogs/src/operators/lookup_map.rs +++ b/dogsdogsdogs/src/operators/lookup_map.rs @@ -10,7 +10,7 @@ use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable}; use differential_dataflow::difference::{IsZero, Semigroup, Monoid}; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::{Cursor, TraceReader}; -use differential_dataflow::trace::cursor::IntoOwned; +use differential_dataflow::IntoOwned; /// Proposes extensions to a stream of prefixes. /// @@ -30,6 +30,7 @@ where G: Scope, Tr: for<'a> TraceReader< Key<'a>: IntoOwned<'a, Owned = K>, + TimeGat<'a>: IntoOwned<'a, Owned = Tr::Time>, Diff : Semigroup>+Monoid+ExchangeData, >+Clone+'static, K: Hashable + Ord + 'static, @@ -93,7 +94,7 @@ where for &mut (ref prefix, ref time, ref mut diff) in prefixes.iter_mut() { if !input2.frontier.less_equal(time) { logic2(prefix, &mut key1); - use differential_dataflow::trace::cursor::IntoOwned; + use differential_dataflow::IntoOwned; cursor.seek_key(&storage, IntoOwned::borrow_as(&key1)); if cursor.get_key(&storage) == Some(IntoOwned::borrow_as(&key1)) { while let Some(value) = cursor.get_val(&storage) { diff --git a/dogsdogsdogs/src/operators/propose.rs b/dogsdogsdogs/src/operators/propose.rs index 571cf5071..7b96f00fd 100644 --- a/dogsdogsdogs/src/operators/propose.rs +++ b/dogsdogsdogs/src/operators/propose.rs @@ -4,7 +4,7 @@ use differential_dataflow::{ExchangeData, Collection, Hashable}; use differential_dataflow::difference::{Semigroup, Monoid, Multiply}; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::TraceReader; -use differential_dataflow::trace::cursor::IntoOwned; +use differential_dataflow::IntoOwned; /// Proposes extensions to a prefix stream. /// @@ -24,6 +24,7 @@ where Tr: for<'a> TraceReader< Key<'a> : IntoOwned<'a, Owned = K>, Val<'a> : IntoOwned<'a, Owned = V>, + TimeGat<'a>: IntoOwned<'a, Owned = Tr::Time>, Diff: Monoid+Multiply+ExchangeData+Semigroup>, >+Clone+'static, K: Hashable + Default + Ord + 'static, @@ -57,6 +58,7 @@ where Tr: for<'a> TraceReader< Key<'a> : IntoOwned<'a, Owned = K>, Val<'a> : IntoOwned<'a, Owned = V>, + TimeGat<'a>: IntoOwned<'a, Owned = Tr::Time>, Diff : Semigroup>+Monoid+Multiply+ExchangeData, >+Clone+'static, K: Hashable + Default + Ord + 'static, diff --git a/dogsdogsdogs/src/operators/validate.rs b/dogsdogsdogs/src/operators/validate.rs index 51fe7df3a..330661027 100644 --- a/dogsdogsdogs/src/operators/validate.rs +++ b/dogsdogsdogs/src/operators/validate.rs @@ -6,7 +6,7 @@ use differential_dataflow::{ExchangeData, Collection}; use differential_dataflow::difference::{Semigroup, Monoid, Multiply}; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::TraceReader; -use differential_dataflow::trace::cursor::IntoOwned; +use differential_dataflow::IntoOwned; /// Proposes extensions to a stream of prefixes. /// @@ -22,6 +22,7 @@ where G: Scope, Tr: for<'a> TraceReader< Key<'a> : IntoOwned<'a, Owned = (K, V)>, + TimeGat<'a>: IntoOwned<'a, Owned = Tr::Time>, Diff : Semigroup>+Monoid+Multiply+ExchangeData, >+Clone+'static, K: Ord+Hash+Clone+Default + 'static,