Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 20 additions & 18 deletions differential-dataflow/examples/columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -579,8 +579,6 @@ pub mod dd_builder {

use columnar::Columnar;

use timely::container::PushInto;

use differential_dataflow::trace::Builder;
use differential_dataflow::trace::Description;
use differential_dataflow::trace::implementations::Layout;
Expand Down Expand Up @@ -614,9 +612,6 @@ pub mod dd_builder {
layout::Val<L>: Columnar,
layout::Time<L>: Columnar,
layout::Diff<L>: Columnar,
// These two constraints seem .. like we could potentially replace by `Columnar::Ref<'a>`.
for<'a> L::KeyContainer: PushInto<&'a layout::Key<L>>,
for<'a> L::ValContainer: PushInto<&'a layout::Val<L>>,
{
type Input = Column<((layout::Key<L>,layout::Val<L>),layout::Time<L>,layout::Diff<L>)>;
type Time = layout::Time<L>;
Expand All @@ -643,6 +638,9 @@ pub mod dd_builder {
// Owned key and val would need to be members of `self`, as this method can be called multiple times,
// and we need to correctly cache last for reasons of correctness, not just performance.

let mut key_con = L::KeyContainer::with_capacity(1);
let mut val_con = L::ValContainer::with_capacity(1);

for ((key,val),time,diff) in chunk.drain() {
// It would be great to avoid.
let key = <layout::Key<L> as Columnar>::into_owned(key);
Expand All @@ -651,30 +649,33 @@ pub mod dd_builder {
let time = <layout::Time<L> as Columnar>::into_owned(time);
let diff = <layout::Diff<L> as Columnar>::into_owned(diff);

key_con.clear(); key_con.push_own(&key);
val_con.clear(); val_con.push_own(&val);

// Pre-load the first update.
if self.result.keys.is_empty() {
self.result.vals.vals.push_into(&val);
self.result.keys.push_into(&key);
self.result.vals.vals.push_own(&val);
self.result.keys.push_own(&key);
self.staging.push(time, diff);
}
// Perhaps this is a continuation of an already received key.
else if self.result.keys.last().map(|k| L::KeyContainer::borrow_as(&key).eq(&k)).unwrap_or(false) {
else if self.result.keys.last() == key_con.get(0) {
// Perhaps this is a continuation of an already received value.
if self.result.vals.vals.last().map(|v| L::ValContainer::borrow_as(&val).eq(&v)).unwrap_or(false) {
if self.result.vals.vals.last() == val_con.get(0) {
self.staging.push(time, diff);
} else {
// New value; complete representation of prior value.
self.staging.seal(&mut self.result.upds);
self.staging.push(time, diff);
self.result.vals.vals.push_into(&val);
self.result.vals.vals.push_own(&val);
}
} else {
// New key; complete representation of prior key.
self.staging.seal(&mut self.result.upds);
self.staging.push(time, diff);
self.result.vals.offs.push_ref(self.result.vals.len());
self.result.vals.vals.push_into(&val);
self.result.keys.push_into(&key);
self.result.vals.vals.push_own(&val);
self.result.keys.push_own(&key);
}
}
}
Expand Down Expand Up @@ -719,9 +720,6 @@ pub mod dd_builder {
layout::Val<L>: Columnar,
layout::Time<L>: Columnar,
layout::Diff<L>: Columnar,
// These two constraints seem .. like we could potentially replace by `Columnar::Ref<'a>`.
for<'a> L::KeyContainer: PushInto<&'a layout::Key<L>>,
for<'a> L::ValContainer: PushInto<&'a layout::Val<L>>,
{
type Input = Column<((layout::Key<L>,layout::Val<L>),layout::Time<L>,layout::Diff<L>)>;
type Time = layout::Time<L>;
Expand All @@ -747,26 +745,30 @@ pub mod dd_builder {
// Owned key and val would need to be members of `self`, as this method can be called multiple times,
// and we need to correctly cache last for reasons of correctness, not just performance.

let mut key_con = L::KeyContainer::with_capacity(1);

for ((key,_val),time,diff) in chunk.drain() {
// It would be great to avoid.
let key = <layout::Key<L> as Columnar>::into_owned(key);
// These feel fine (wrt the other versions)
let time = <layout::Time<L> as Columnar>::into_owned(time);
let diff = <layout::Diff<L> as Columnar>::into_owned(diff);

key_con.clear(); key_con.push_own(&key);

// Pre-load the first update.
if self.result.keys.is_empty() {
self.result.keys.push_into(&key);
self.result.keys.push_own(&key);
self.staging.push(time, diff);
}
// Perhaps this is a continuation of an already received key.
else if self.result.keys.last().map(|k| L::KeyContainer::borrow_as(&key).eq(&k)).unwrap_or(false) {
else if self.result.keys.last() == key_con.get(0) {
self.staging.push(time, diff);
} else {
// New key; complete representation of prior key.
self.staging.seal(&mut self.result.upds);
self.staging.push(time, diff);
self.result.keys.push_into(&key);
self.result.keys.push_own(&key);
}
}
}
Expand Down
7 changes: 5 additions & 2 deletions differential-dataflow/src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,14 +233,17 @@ where
// new stuff that we add.
let (mut trace_cursor, trace_storage) = reader_local.cursor();
let mut builder = Bu::new();
let mut key_con = Tr::KeyContainer::with_capacity(1);
for (key, mut list) in to_process {

key_con.clear(); key_con.push_own(&key);

// The prior value associated with the key.
let mut prev_value: Option<Tr::ValOwn> = None;

// Attempt to find the key in the trace.
trace_cursor.seek_key(&trace_storage, Tr::KeyContainer::borrow_as(&key));
if trace_cursor.get_key(&trace_storage).map(|k| k.eq(&Tr::KeyContainer::borrow_as(&key))).unwrap_or(false) {
trace_cursor.seek_key(&trace_storage, key_con.index(0));
if trace_cursor.get_key(&trace_storage).map(|k| k.eq(&key_con.index(0))).unwrap_or(false) {
// Determine the prior value associated with the key.
while let Some(val) = trace_cursor.get_val(&trace_storage) {
let mut count = 0;
Expand Down
25 changes: 14 additions & 11 deletions differential-dataflow/src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,11 +431,16 @@ where
// We first extract those times from this list that lie in the interval we will process.
sort_dedup(&mut interesting);
// `exposed` contains interesting (key, time)s now below `upper_limit`
let exposed = {
let (exposed, new_interesting) = interesting.drain(..).partition(|(_, time)| !upper_limit.less_equal(time));
interesting = new_interesting;
exposed
};
let mut exposed_keys = T1::KeyContainer::with_capacity(0);
let mut exposed_time = T1::TimeContainer::with_capacity(0);
// Keep pairs greater or equal to `upper_limit`, and "expose" other pairs.
interesting.retain(|(key, time)| {
if upper_limit.less_equal(time) { true } else {
exposed_keys.push_own(key);
exposed_time.push_own(time);
false
}
});

// Prepare an output buffer and builder for each capability.
//
Expand Down Expand Up @@ -471,12 +476,10 @@ where
// indicates whether more data remain. We move through `exposed` using (index) `exposed_position`.
// There could perhaps be a less provocative variable name.
let mut exposed_position = 0;
while batch_cursor.key_valid(batch_storage) || exposed_position < exposed.len() {

use std::borrow::Borrow;
while batch_cursor.key_valid(batch_storage) || exposed_position < exposed_keys.len() {

// Determine the next key we will work on; could be synthetic, could be from a batch.
let key1 = exposed.get(exposed_position).map(|x| T1::KeyContainer::borrow_as(&x.0));
let key1 = exposed_keys.get(exposed_position);
let key2 = batch_cursor.get_key(batch_storage);
let key = match (key1, key2) {
(Some(key1), Some(key2)) => ::std::cmp::min(key1, key2),
Expand All @@ -492,8 +495,8 @@ where
interesting_times.clear();

// Populate `interesting_times` with synthetic interesting times (below `upper_limit`) for this key.
while exposed.get(exposed_position).map(|x| x.0.borrow()).map(|k| key.eq(&T1::KeyContainer::borrow_as(&k))).unwrap_or(false) {
interesting_times.push(exposed[exposed_position].1.clone());
while exposed_keys.get(exposed_position) == Some(key) {
interesting_times.push(T1::owned_time(exposed_time.index(exposed_position)));
exposed_position += 1;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ impl<B: Ord + Clone> HuffmanContainer<B> {
}
}

impl<B: Ord + Clone + 'static> PushInto<Vec<B>> for HuffmanContainer<B> {
fn push_into(&mut self, item: Vec<B>) {
impl<'a, B: Ord + Clone + 'static> PushInto<&'a Vec<B>> for HuffmanContainer<B> {
fn push_into(&mut self, item: &'a Vec<B>) {
for x in item.iter() { *self.stats.entry(x.clone()).or_insert(0) += 1; }
match &mut self.inner {
Ok((huffman, bytes)) => {
bytes.extend(huffman.encode(item.iter()));
self.offsets.push(bytes.len());
},
Err(raw) => {
raw.extend(item);
raw.extend(item.iter().cloned());
self.offsets.push(raw.len());
}
}
Expand Down Expand Up @@ -95,12 +95,12 @@ impl<B: Ord + Clone + 'static> BatchContainer for HuffmanContainer<B> {
Err(bytes) => other.extend_from_slice(bytes),
}
}
fn borrow_as<'a>(owned: &'a Self::Owned) -> Self::ReadItem<'a> { Self::ReadItem { inner: Err(&owned[..]) } }

fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }

fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
fn push_own(&mut self, item: Self::Owned) { self.push_into(item) }
fn push_own(&mut self, item: &Self::Owned) { self.push_into(item) }

fn clear(&mut self) { *self = Self::default(); }

fn with_capacity(size: usize) -> Self {
let mut offsets = OffsetList::with_capacity(size + 1);
Expand Down
43 changes: 20 additions & 23 deletions differential-dataflow/src/trace/implementations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,12 +353,12 @@ impl BatchContainer for OffsetList {
#[inline(always)]
fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item }
#[inline(always)]
fn borrow_as<'a>(owned: &'a Self::Owned) -> Self::ReadItem<'a> { *owned }
#[inline(always)]
fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }

fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
fn push_own(&mut self, item: Self::Owned) { self.push_into(item) }
fn push_own(&mut self, item: &Self::Owned) { self.push_into(*item) }

fn clear(&mut self) { self.zero_prefix = 0; self.smol.clear(); self.chonk.clear(); }

fn with_capacity(size: usize) -> Self {
Self::with_capacity(size)
Expand Down Expand Up @@ -536,15 +536,14 @@ pub mod containers {
fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
*other = Self::into_owned(item);
}
/// Borrows an owned instance as oneself.
#[must_use]
fn borrow_as<'a>(owned: &'a Self::Owned) -> Self::ReadItem<'a>;


/// Push an item into this container
fn push_ref(&mut self, item: Self::ReadItem<'_>);
/// Push an item into this container
fn push_own(&mut self, item: Self::Owned);
fn push_own(&mut self, item: &Self::Owned);

/// Clears the container. May not release resources.
fn clear(&mut self);

/// Creates a new container with sufficient capacity.
fn with_capacity(size: usize) -> Self;
Expand Down Expand Up @@ -632,12 +631,13 @@ pub mod containers {

#[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item.clone() }
#[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.clone_from(item); }
#[inline(always)] fn borrow_as<'a>(owned: &'a Self::Owned) -> Self::ReadItem<'a> { owned }

fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }

fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
fn push_own(&mut self, item: Self::Owned) { self.push_into(item) }
fn push_own(&mut self, item: &Self::Owned) { self.push_into(item.clone()) }

fn clear(&mut self) { self.clear() }

fn with_capacity(size: usize) -> Self {
Vec::with_capacity(size)
Expand All @@ -664,12 +664,13 @@ pub mod containers {

#[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item.clone() }
#[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.clone_from(item); }
#[inline(always)] fn borrow_as<'a>(owned: &'a Self::Owned) -> Self::ReadItem<'a> { owned }

fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }

fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
fn push_own(&mut self, item: Self::Owned) { self.push_into(item) }
fn push_own(&mut self, item: &Self::Owned) { self.push_into(item) }

fn clear(&mut self) { self.clear() }

fn with_capacity(size: usize) -> Self {
Self::with_capacity(size)
Expand Down Expand Up @@ -713,15 +714,6 @@ pub mod containers {
}
}

impl<B> PushInto<Vec<B>> for SliceContainer<B> {
fn push_into(&mut self, item: Vec<B>) {
for x in item.into_iter() {
self.inner.push(x);
}
self.offsets.push(self.inner.len());
}
}

impl<B> BatchContainer for SliceContainer<B>
where
B: Ord + Clone + Sized + 'static,
Expand All @@ -731,12 +723,17 @@ pub mod containers {

#[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item.to_vec() }
#[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.clone_from_slice(item); }
#[inline(always)] fn borrow_as<'a>(owned: &'a Self::Owned) -> Self::ReadItem<'a> { &owned[..] }

fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }

fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
fn push_own(&mut self, item: Self::Owned) { self.push_into(item) }
fn push_own(&mut self, item: &Self::Owned) { self.push_into(item) }

fn clear(&mut self) {
self.offsets.clear();
self.offsets.push(0);
self.inner.clear();
}

fn with_capacity(size: usize) -> Self {
let mut offsets = Vec::with_capacity(size + 1);
Expand Down
47 changes: 23 additions & 24 deletions differential-dataflow/src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,15 @@ pub mod layers {
///
/// Tracked independently to account for duplicate compression.
total: usize,

/// Time container to stage singleton times for evaluation.
time_con: T,
/// Diff container to stage singleton times for evaluation.
diff_con: D,
}

impl<T: BatchContainer, D: BatchContainer> Default for UpdsBuilder<T, D> {
fn default() -> Self { Self { stash: Vec::default(), total: 0, } }
fn default() -> Self { Self { stash: Vec::default(), total: 0, time_con: BatchContainer::with_capacity(1), diff_con: BatchContainer::with_capacity(1) } }
}


Expand All @@ -230,34 +235,28 @@ pub mod layers {
pub fn seal<O: for<'a> BatchContainer<ReadItem<'a> = usize>>(&mut self, upds: &mut Upds<O, T, D>) -> bool {
use crate::consolidation;
consolidation::consolidate(&mut self.stash);
if !self.stash.is_empty() {
// If there is a single element, equal to a just-prior recorded update,
// we push nothing and report an unincremented offset to encode this case.
let time_diff = upds.times.last().zip(upds.diffs.last());
let last_eq = self.stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| {
let t1 = T::borrow_as(t1);
let d1 = D::borrow_as(d1);
t1.eq(&t2) && d1.eq(&d2)
});
if self.stash.len() == 1 && last_eq.unwrap_or(false) {
// Just clear out the stash, as we won't drain it here.
// If everything consolidates away, return false.
if self.stash.is_empty() { return false; }
// If there is a singleton, we may be able to optimize.
if self.stash.len() == 1 {
let (time, diff) = self.stash.last().unwrap();
self.time_con.clear(); self.time_con.push_own(time);
self.diff_con.clear(); self.diff_con.push_own(diff);
if upds.times.last() == self.time_con.get(0) && upds.diffs.last() == self.diff_con.get(0) {
self.total += 1;
self.stash.clear();
upds.offs.push_ref(upds.times.len());
return true;
}
else {
// Conventional; move `stash` into `updates`.
self.total += self.stash.len();
for (time, diff) in self.stash.drain(..) {
upds.times.push_own(time);
upds.diffs.push_own(diff);
}
upds.offs.push_ref(upds.times.len());
}
true
} else {
false
}
// Conventional; move `stash` into `updates`.
self.total += self.stash.len();
for (time, diff) in self.stash.drain(..) {
upds.times.push_own(&time);
upds.diffs.push_own(&diff);
}
upds.offs.push_ref(upds.times.len());
true
}

/// Completes the building and returns the total updates sealed.
Expand Down
Loading
Loading