Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 124 additions & 14 deletions differential-dataflow/examples/columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
timely::dataflow::ProbeHandle,
};

use differential_dataflow::trace::implementations::ord_neu::ColKeySpine;

use differential_dataflow::operators::arrange::arrangement::arrange_core;

fn main() {
Expand Down Expand Up @@ -44,10 +42,10 @@
let data_pact = ExchangeCore::<ColumnBuilder<((String,()),u64,i64)>,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().map(|x| *x as u64).sum::<u64>() as u64);
let keys_pact = ExchangeCore::<ColumnBuilder<((String,()),u64,i64)>,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().map(|x| *x as u64).sum::<u64>() as u64);

let data = arrange_core::<_,_,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&data, data_pact, "Data");
let keys = arrange_core::<_,_,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&keys, keys_pact, "Keys");
let data = arrange_core::<_,_,Col2ValBatcher<_,_,_,_>, ColValBuilder<_,_,_,_>, ColValSpine<_,_,_,_>>(&data, data_pact, "Data");
let keys = arrange_core::<_,_,Col2ValBatcher<_,_,_,_>, ColValBuilder<_,_,_,_>, ColValSpine<_,_,_,_>>(&keys, keys_pact, "Keys");

keys.join_core(&data, |_k, &(), &()| Option::<()>::None)
keys.join_core(&data, |_k, (), ()| Option::<()>::None)
.probe_with(&mut probe);

});
Expand Down Expand Up @@ -159,20 +157,23 @@
type BorrowedOf<'a, C> = <<C as Columnar>::Container as columnar::Container>::Borrowed<'a>;

impl<C: Columnar> Column<C> {
#[inline(always)]
pub fn borrow(&self) -> BorrowedOf<'_, C> {
match self {
Column::Typed(t) => t.borrow(),
Column::Bytes(b) => <BorrowedOf<C> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))),
Column::Align(a) => <BorrowedOf<C> as FromBytes>::from_bytes(&mut Indexed::decode(a)),
}
}
#[inline(always)]
pub fn get(&self, index: usize) -> columnar::Ref<'_, C> {
self.borrow().get(index)
}
}

use timely::Container;
impl<C: Columnar> Container for Column<C> {
#[inline(always)]
fn len(&self) -> usize {
match self {
Column::Typed(t) => t.len(),
Expand All @@ -181,6 +182,7 @@
}
}
// This sets the `Bytes` variant to be an empty `Typed` variant, appropriate for pushing into.
#[inline(always)]
fn clear(&mut self) {
match self {
Column::Typed(t) => t.clear(),
Expand All @@ -191,6 +193,7 @@

type ItemRef<'a> = columnar::Ref<'a, C>;
type Iter<'a> = IterOwn<BorrowedOf<'a, C>>;
#[inline(always)]
fn iter<'a>(&'a self) -> Self::Iter<'a> {
match self {
Column::Typed(t) => t.borrow().into_index_iter(),
Expand All @@ -201,6 +204,7 @@

type Item<'a> = columnar::Ref<'a, C>;
type DrainIter<'a> = IterOwn<BorrowedOf<'a, C>>;
#[inline(always)]
fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> {
match self {
Column::Typed(t) => t.borrow().into_index_iter(),
Expand All @@ -212,6 +216,7 @@

use timely::container::SizableContainer;
impl<C: Columnar> SizableContainer for Column<C> {
#[inline(always)]
fn at_capacity(&self) -> bool {
match self {
Self::Typed(t) => {
Expand All @@ -222,6 +227,7 @@
Self::Align(_) => true,
}
}
#[inline(always)]
fn ensure_capacity(&mut self, _stash: &mut Option<Self>) { }
}

Expand All @@ -242,6 +248,8 @@
}

use timely::dataflow::channels::ContainerBytes;
use differential_dataflow::trace::implementations::BatchContainer;

impl<C: Columnar> ContainerBytes for Column<C> {
fn from_bytes(bytes: timely::bytes::arc::Bytes) -> Self {
// Our expectation / hope is that `bytes` is `u64` aligned and sized.
Expand All @@ -259,6 +267,7 @@
}
}

#[inline(always)]
fn length_in_bytes(&self) -> usize {
match self {
// We'll need one u64 for the length, then the length rounded up to a multiple of 8.
Expand All @@ -276,6 +285,64 @@
}
}
}

impl<T: Columnar + Ord + Clone> BatchContainer for Column<T>
where
for<'a> columnar::Ref<'a, T>: Ord,
{
type Owned = T;
type ReadItem<'a> = columnar::Ref<'a, T>;

#[inline(always)]
fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
T::into_owned(item)
}

#[inline(always)]
fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
other.copy_from(item);
}

#[inline(always)]
fn push_ref(&mut self, item: Self::ReadItem<'_>) {
self.push(item);
}

#[inline(always)]
fn push_own(&mut self, item: &Self::Owned) {
self.push(item);
}

#[inline(always)]
fn clear(&mut self) {
Container::clear(self);
}

#[inline(always)]
fn with_capacity(_size: usize) -> Self {
Self::default()
}

#[inline(always)]
fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
Self::Typed(T::Container::with_capacity_for([cont1.borrow(), cont2.borrow()].into_iter()))
}

#[inline(always)]
fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
T::reborrow(item)
}

#[inline(always)]
fn index(&self, index: usize) -> Self::ReadItem<'_> {
self.get(index)
}

#[inline(always)]
fn len(&self) -> usize {
Container::len(self)
}
}
}


Expand Down Expand Up @@ -358,7 +425,7 @@
impl<C: Columnar<Container: Clone>> LengthPreservingContainerBuilder for ColumnBuilder<C> { }
}

use batcher::Col2KeyBatcher;
use batcher::{Col2KeyBatcher, Col2ValBatcher};

Check failure on line 428 in differential-dataflow/examples/columnar.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust stable

unused import: `Col2KeyBatcher`

Check failure on line 428 in differential-dataflow/examples/columnar.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust 1.79

unused import: `Col2KeyBatcher`

/// Types for consolidating, merging, and extracting columnar update collections.
pub mod batcher {
Expand Down Expand Up @@ -573,27 +640,56 @@

}

use dd_builder::ColKeyBuilder;
use dd_builder::{ColKeyBuilder, ColValBuilder, ColKeySpine, ColValSpine};

Check failure on line 643 in differential-dataflow/examples/columnar.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust stable

unused imports: `ColKeyBuilder` and `ColKeySpine`

Check failure on line 643 in differential-dataflow/examples/columnar.rs

View workflow job for this annotation

GitHub Actions / cargo test on ubuntu, rust 1.79

unused imports: `ColKeyBuilder`, `ColKeySpine`

pub mod dd_builder {

use std::rc::Rc;
use columnar::Columnar;

use differential_dataflow::trace::Builder;
use differential_dataflow::trace::Description;
use differential_dataflow::trace::implementations::Layout;
use differential_dataflow::trace::implementations::{Layout, Update};
use differential_dataflow::trace::implementations::layout;
use differential_dataflow::trace::implementations::BatchContainer;
use differential_dataflow::trace::implementations::ord_neu::{OrdValBatch, val_batch::OrdValStorage, OrdKeyBatch, Vals, Upds, layers::UpdsBuilder};
use differential_dataflow::trace::implementations::ord_neu::key_batch::OrdKeyStorage;
use differential_dataflow::trace::implementations::spine_fueled::Spine;
use crate::Column;


use differential_dataflow::trace::rc_blanket_impls::RcBuilder;
use differential_dataflow::trace::implementations::TStack;

pub type ColValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<TStack<((K,V),T,R)>>>;
pub type ColKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<TStack<((K,()),T,R)>>>;
pub type ColValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<CStack<((K,V),T,R)>>>;
pub type ColKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<CStack<((K,()),T,R)>>>;
pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<CStack<((K,()),T,R)>>>>;
pub type ColValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<CStack<((K,V),T,R)>>>>;


/// A layout based on columns.
pub struct CStack<U: Update> {
phantom: std::marker::PhantomData<U>,
}

impl<U> Layout for CStack<U>
where
U: Update<
Key: Columnar,
Val: Columnar,
Time: Columnar,
Diff: Columnar + Ord,
>,
for<'a> columnar::Ref<'a, U::Key>: Ord,
for<'a> columnar::Ref<'a, U::Val>: Ord,
for<'a> columnar::Ref<'a, U::Time>: Ord,
for<'a> columnar::Ref<'a, U::Diff>: Ord,
{
type KeyContainer = Column<U::Key>;
type ValContainer = Column<U::Val>;
type TimeContainer = Column<U::Time>;
type DiffContainer = Column<U::Diff>;
type OffsetContainer = differential_dataflow::trace::implementations::OffsetList;
}


/// A builder for creating layers from unsorted update tuples.
pub struct OrdValBuilder<L: Layout> {
Expand Down Expand Up @@ -640,11 +736,25 @@

let mut key_con = L::KeyContainer::with_capacity(1);
let mut val_con = L::ValContainer::with_capacity(1);
let mut owned_key = None;
let mut owned_val = None;

for ((key,val),time,diff) in chunk.drain() {
// It would be great to avoid.
let key = <layout::Key<L> as Columnar>::into_owned(key);
let val = <layout::Val<L> as Columnar>::into_owned(val);
let key = if let Some(owned_key) = &mut owned_key {
Columnar::copy_from(owned_key, key);
&*owned_key
} else {
owned_key = Some(<layout::Key<L> as Columnar>::into_owned(key));
owned_key.as_ref().unwrap()
};
let val = if let Some(owned_val) = &mut owned_val {
Columnar::copy_from(owned_val, val);
&*owned_val
} else {
owned_val = Some(<layout::Val<L> as Columnar>::into_owned(val));
owned_val.as_ref().unwrap()
};
// These feel fine (wrt the other versions)
let time = <layout::Time<L> as Columnar>::into_owned(time);
let diff = <layout::Diff<L> as Columnar>::into_owned(diff);
Expand Down
Loading