Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ resolver = "2"

[workspace.dependencies]
differential-dataflow = { path = "differential-dataflow", default-features = false, version = "0.16.2" }
timely = { version = "0.23", default-features = false }
#timely = { version = "0.23", default-features = false }
columnar = { version = "0.10", default-features = false }
#timely = { path = "../timely-dataflow/timely/", default-features = false }
timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false }

[profile.release]
opt-level = 3
Expand Down
73 changes: 36 additions & 37 deletions differential-dataflow/examples/columnar.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
//! Wordcount based on `columnar`.

use {
timely::container::{Container, CapacityContainerBuilder},
timely::dataflow::channels::pact::ExchangeCore,
timely::dataflow::InputHandleCore,
timely::dataflow::ProbeHandle,
};
use timely::container::{CapacityContainerBuilder, PushInto};
use timely::dataflow::channels::pact::ExchangeCore;
use timely::dataflow::InputHandleCore;
use timely::dataflow::ProbeHandle;

use differential_dataflow::trace::implementations::ord_neu::ColKeySpine;

Expand Down Expand Up @@ -65,7 +63,7 @@ fn main() {
while i < size {
let val = (counter + i) % keys;
write!(buffer, "{:?}", val).unwrap();
container.push(((&buffer, ()), time, 1));
container.push_into(((&buffer, ()), time, 1));
buffer.clear();
i += worker.peers();
}
Expand All @@ -88,7 +86,7 @@ fn main() {
while i < size {
let val = (queries + i) % keys;
write!(buffer, "{:?}", val).unwrap();
container.push(((&buffer, ()), time, 1));
container.push_into(((&buffer, ()), time, 1));
buffer.clear();
i += worker.peers();
}
Expand Down Expand Up @@ -169,26 +167,29 @@ mod container {
pub fn get(&self, index: usize) -> columnar::Ref<'_, C> {
self.borrow().get(index)
}
}

use timely::Container;
impl<C: Columnar> Container for Column<C> {
fn len(&self) -> usize {
match self {
Column::Typed(t) => t.len(),
Column::Bytes(b) => <BorrowedOf<C> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).len(),
Column::Align(a) => <BorrowedOf<C> as FromBytes>::from_bytes(&mut Indexed::decode(a)).len(),
}
}
// This sets the `Bytes` variant to be an empty `Typed` variant, appropriate for pushing into.
fn clear(&mut self) {
pub(crate) fn clear(&mut self) {
match self {
Column::Typed(t) => t.clear(),
Column::Bytes(_) => *self = Column::Typed(Default::default()),
Column::Align(_) => *self = Column::Typed(Default::default()),
}
}
#[inline]
pub fn len(&self) -> usize {
match self {
Column::Typed(t) => t.len(),
Column::Bytes(b) => <BorrowedOf<C> as FromBytes>::from_bytes(&mut Indexed::decode(bytemuck::cast_slice(b))).len(),
Column::Align(a) => <BorrowedOf<C> as FromBytes>::from_bytes(&mut Indexed::decode(a)).len(),
}
}
}

impl<C: Columnar> timely::Accountable for Column<C> {
#[inline] fn record_count(&self) -> i64 { self.len() as i64 }
}

impl<C: Columnar> timely::container::IterContainer for Column<C> {
type ItemRef<'a> = columnar::Ref<'a, C>;
type Iter<'a> = IterOwn<BorrowedOf<'a, C>>;
fn iter<'a>(&'a self) -> Self::Iter<'a> {
Expand All @@ -198,7 +199,9 @@ mod container {
Column::Align(a) => <BorrowedOf<C> as FromBytes>::from_bytes(&mut Indexed::decode(a)).into_index_iter(),
}
}
}

impl<C: Columnar> timely::container::DrainContainer for Column<C> {
type Item<'a> = columnar::Ref<'a, C>;
type DrainIter<'a> = IterOwn<BorrowedOf<'a, C>>;
fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> {
Expand Down Expand Up @@ -366,7 +369,7 @@ pub mod batcher {
use std::collections::VecDeque;
use columnar::Columnar;
use timely::Container;
use timely::container::{ContainerBuilder, PushInto};
use timely::container::{ContainerBuilder, DrainContainer, PushInto, SizableContainer};
use differential_dataflow::difference::Semigroup;
use crate::Column;

Expand All @@ -389,7 +392,7 @@ pub mod batcher {
ready: VecDeque<C>,
}

impl<C: Container + Clone + 'static> ContainerBuilder for Chunker<C> {
impl<C: Container> ContainerBuilder for Chunker<C> {
type Container = C;

fn extract(&mut self) -> Option<&mut Self::Container> {
Expand All @@ -414,9 +417,11 @@ pub mod batcher {
for<'b> columnar::Ref<'b, T>: Ord,
R: for<'b> Columnar + for<'b> Semigroup<columnar::Ref<'b, R>>,
for<'b> columnar::Ref<'b, R>: Ord,
C2: Container + for<'b, 'c> PushInto<(columnar::Ref<'b, D>, columnar::Ref<'b, T>, &'c R)>,
C2: Container + SizableContainer + for<'b, 'c> PushInto<(columnar::Ref<'b, D>, columnar::Ref<'b, T>, &'c R)>,
{
fn push_into(&mut self, container: &'a mut Column<(D, T, R)>) {
let mut target: C2 = Default::default();
target.ensure_capacity(&mut Some(std::mem::take(&mut self.empty)));

// Scoped to let borrow through `permutation` drop.
{
Expand All @@ -426,7 +431,6 @@ pub mod batcher {
permutation.extend(container.drain());
permutation.sort();

self.empty.clear();
// Iterate over the data, accumulating diffs for like keys.
let mut iter = permutation.drain(..);
if let Some((data, time, diff)) = iter.next() {
Expand All @@ -442,7 +446,7 @@ pub mod batcher {
else {
if !prev_diff.is_zero() {
let tuple = (prev_data, prev_time, &prev_diff);
self.empty.push_into(tuple);
target.push_into(tuple);
}
prev_data = data;
prev_time = time;
Expand All @@ -452,13 +456,13 @@ pub mod batcher {

if !prev_diff.is_zero() {
let tuple = (prev_data, prev_time, &prev_diff);
self.empty.push_into(tuple);
target.push_into(tuple);
}
}
}

if !self.empty.is_empty() {
self.ready.push_back(std::mem::take(&mut self.empty));
if !target.is_empty() {
self.ready.push_back(target);
}
}
}
Expand All @@ -468,7 +472,7 @@ pub mod batcher {

use timely::progress::{Antichain, frontier::AntichainRef};
use columnar::Columnar;

use timely::container::PushInto;
use crate::container::Column;
use differential_dataflow::difference::Semigroup;

Expand Down Expand Up @@ -502,7 +506,6 @@ pub mod batcher {
}
}
fn is_empty(&self) -> bool {
use timely::Container;
self.head == self.list.len()
}
fn cmp_heads(&self, other: &Self) -> std::cmp::Ordering {
Expand Down Expand Up @@ -551,8 +554,7 @@ pub mod batcher {
let stash2: R = R::into_owned(diff2);
stash.plus_equals(&stash2);
if !stash.is_zero() {
use timely::Container;
self.push((data, time, &*stash));
self.push_into((data, time, &*stash));
}
}
fn account(&self) -> (usize, usize, usize, usize) {
Expand All @@ -568,6 +570,7 @@ pub mod batcher {
// self.heap_size(cb);
// (self.len(), size, capacity, allocations)
}
#[inline] fn clear(&mut self) { self.clear() }
}
}

Expand All @@ -578,7 +581,7 @@ use dd_builder::ColKeyBuilder;
pub mod dd_builder {

use columnar::Columnar;

use timely::container::DrainContainer;
use differential_dataflow::trace::Builder;
use differential_dataflow::trace::Description;
use differential_dataflow::trace::implementations::Layout;
Expand Down Expand Up @@ -630,8 +633,6 @@ pub mod dd_builder {

#[inline]
fn push(&mut self, chunk: &mut Self::Input) {
use timely::Container;

// NB: Maintaining owned key and val across iterations to track the "last", which we clone into,
// is somewhat appealing from an ease point of view. Might still allocate, do work we don't need,
// but avoids e.g. calls into `last()` and breaks horrid trait requirements.
Expand Down Expand Up @@ -737,8 +738,6 @@ pub mod dd_builder {

#[inline]
fn push(&mut self, chunk: &mut Self::Input) {
use timely::Container;

// NB: Maintaining owned key and val across iterations to track the "last", which we clone into,
// is somewhat appealing from an ease point of view. Might still allocate, do work we don't need,
// but avoids e.g. calls into `last()` and breaks horrid trait requirements.
Expand Down
6 changes: 4 additions & 2 deletions differential-dataflow/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use std::hash::Hash;

use timely::Container;
use timely::container::IterContainer;
use timely::Data;
use timely::progress::Timestamp;
use timely::order::Product;
Expand Down Expand Up @@ -67,7 +68,7 @@ impl<G: Scope, D, R, C> Collection<G, D, R, C> {
Collection { inner: stream, phantom: std::marker::PhantomData }
}
}
impl<G: Scope, D, R, C: Container + Clone + 'static> Collection<G, D, R, C> {
impl<G: Scope, D, R, C: Container> Collection<G, D, R, C> {
/// Creates a new collection accumulating the contents of the two collections.
///
/// Despite the name, differential dataflow collections are unordered. This method is so named because the
Expand Down Expand Up @@ -155,6 +156,7 @@ impl<G: Scope, D, R, C: Container + Clone + 'static> Collection<G, D, R, C> {
pub fn inspect_container<F>(&self, func: F) -> Self
where
F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static,
C: IterContainer,
{
self.inner
.inspect_container(func)
Expand Down Expand Up @@ -684,7 +686,7 @@ where
G: Scope,
D: Data,
R: Semigroup + 'static,
C: Container + Clone + 'static,
C: Container,
I: IntoIterator<Item=Collection<G, D, R, C>>,
{
scope
Expand Down
14 changes: 11 additions & 3 deletions differential-dataflow/src/consolidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@

use std::cmp::Ordering;
use std::collections::VecDeque;
use timely::Container;
use timely::container::{ContainerBuilder, PushInto};
use timely::container::{ContainerBuilder, DrainContainer, PushInto};
use crate::Data;
use crate::difference::{IsZero, Semigroup};

Expand Down Expand Up @@ -239,7 +238,7 @@ where
/// items. Consolidation accumulates the diffs per key.
///
/// The trait requires `Container` to have access to its `Item` GAT.
pub trait ConsolidateLayout: Container {
pub trait ConsolidateLayout: DrainContainer {
/// Key portion of data, essentially everything minus the diff
type Key<'a>: Eq where Self: 'a;

Expand Down Expand Up @@ -269,6 +268,12 @@ pub trait ConsolidateLayout: Container {
/// Compare two items by key to sort containers.
fn cmp(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering;

/// Returns the number of items in the container.
fn len(&self) -> usize;

/// Clear the container. Afterwards, `len()` should return 0.
fn clear(&mut self);

/// Consolidate the supplied container.
fn consolidate_into(&mut self, target: &mut Self) {
// Sort input data
Expand Down Expand Up @@ -329,6 +334,9 @@ where
self.push((data, time, diff));
}

#[inline] fn len(&self) -> usize { Vec::len(self) }
#[inline] fn clear(&mut self) { Vec::clear(self) }

/// Consolidate the supplied container.
fn consolidate_into(&mut self, target: &mut Self) {
consolidate_updates(self);
Expand Down
36 changes: 11 additions & 25 deletions differential-dataflow/src/containers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,41 +271,27 @@ mod container {
use std::ops::Deref;

use columnation::Columnation;
use timely::Container;
use timely::container::SizableContainer;

use crate::containers::TimelyStack;

impl<T: Columnation> Container for TimelyStack<T> {
impl<T: Columnation> timely::container::Accountable for TimelyStack<T> {
#[inline] fn record_count(&self) -> i64 { i64::try_from(self.local.len()).unwrap() }
#[inline] fn is_empty(&self) -> bool { self.local.is_empty() }
}
impl<T: Columnation> timely::container::IterContainer for TimelyStack<T> {
type ItemRef<'a> = &'a T where Self: 'a;
type Item<'a> = &'a T where Self: 'a;

fn len(&self) -> usize {
self.local.len()
}

fn is_empty(&self) -> bool {
self.local.is_empty()
}

fn clear(&mut self) {
TimelyStack::clear(self)
}

type Iter<'a> = std::slice::Iter<'a, T> where Self: 'a;

fn iter(&self) -> Self::Iter<'_> {
self.deref().iter()
}

#[inline] fn iter(&self) -> Self::Iter<'_> { self.deref().iter() }
}
impl<T: Columnation> timely::container::DrainContainer for TimelyStack<T> {
type Item<'a> = &'a T where Self: 'a;
type DrainIter<'a> = std::slice::Iter<'a, T> where Self: 'a;

fn drain(&mut self) -> Self::DrainIter<'_> {
#[inline] fn drain(&mut self) -> Self::DrainIter<'_> {
(*self).iter()
}
}

impl<T: Columnation> SizableContainer for TimelyStack<T> {
impl<T: Columnation> timely::container::SizableContainer for TimelyStack<T> {
fn at_capacity(&self) -> bool {
self.len() == self.capacity()
}
Expand Down
7 changes: 4 additions & 3 deletions differential-dataflow/src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::difference::Semigroup;
use crate::lattice::Lattice;
use crate::trace::{self, Trace, TraceReader, BatchReader, Batcher, Builder, Cursor};
use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine, ValBatcher, ValBuilder, ValSpine};
use crate::trace::implementations::merge_batcher::container::MergerChunk;

use trace::wrappers::enter::{TraceEnter, BatchEnter,};
use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
Expand Down Expand Up @@ -293,7 +294,7 @@ where
Time=T1::Time,
Diff: Abelian,
>+'static,
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: Container + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: MergerChunk + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
{
self.reduce_core::<_,Bu,T2>(name, move |key, input, output, change| {
Expand All @@ -315,7 +316,7 @@ where
ValOwn: Data,
Time=T1::Time,
>+'static,
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: Container + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
Bu: Builder<Time=G::Timestamp, Output = T2::Batch, Input: MergerChunk + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>, &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
{
use crate::operators::reduce::reduce_trace;
Expand Down Expand Up @@ -393,7 +394,7 @@ pub fn arrange_core<G, P, Ba, Bu, Tr>(stream: &StreamCore<G, Ba::Input>, pact: P
where
G: Scope<Timestamp: Lattice>,
P: ParallelizationContract<G::Timestamp, Ba::Input>,
Ba: Batcher<Time=G::Timestamp,Input: Container + Clone + 'static> + 'static,
Ba: Batcher<Time=G::Timestamp,Input: Container> + 'static,
Bu: Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
Tr: Trace<Time=G::Timestamp>+'static,
{
Expand Down
Loading
Loading