Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 68 additions & 71 deletions container/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,46 +4,32 @@

use std::collections::VecDeque;

/// A container transferring data through dataflow edges
/// A type representing progress, with an update count.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A thing I want to discuss, but am not sure I'll find a better place to comment:

My guess is we'd eventually like to close in on a representation that presents its progress tracking information as a Map<T, usize>, allowing multiple counts for multiple times. This might be in that direction, I think it is, but also I'm trying to think through how this ends up looking and whether there are helpful steps to take as we go.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. We currently have the separation into Message and the data it's transferring, and the Container trait in the past was just the data, mostly for historic reasons. It might make sense to make messages the interface to sending data, which would give more control about the associated timestamps to the users.

///
/// A container stores a number of elements and thus is able to describe it length (`len()`) and
/// whether it is empty (`is_empty()`). It supports removing all elements (`clear`).
/// It describes its update count (`count()`).
///
/// A container must implement default. The default implementation is not required to allocate
/// memory for variable-length components.
///
/// We require the container to be cloneable to enable efficient copies when providing references
/// of containers to operators. Care must be taken that the type's `clone_from` implementation
/// is efficient (which is not necessarily the case when deriving `Clone`.)
/// We require [`Default`] for convenience purposes.
pub trait Container: Default {
/// The type of elements when reading non-destructively from the container.
type ItemRef<'a> where Self: 'a;

/// The type of elements when draining the container.
type Item<'a> where Self: 'a;

/// Push `item` into self
#[inline]
fn push<T>(&mut self, item: T) where Self: PushInto<T> {
self.push_into(item)
}
Comment on lines -27 to -29
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

push wasn't used, removed.


/// The number of elements in this container
///
/// This number is used in progress tracking to confirm the receipt of some number
/// of outstanding records, and it is highly load bearing. The main restriction is
/// imposed on the `LengthPreservingContainerBuilder` trait, whose implementors
/// imposed on the [`LengthPreservingContainerBuilder`] trait, whose implementors
/// must preserve the number of items.
fn len(&self) -> usize;

/// Determine if the container contains any elements, corresponding to `len() == 0`.
fn is_empty(&self) -> bool {
self.len() == 0
}
fn count(&self) -> usize;

/// Remove all contents from `self` while retaining allocated memory.
/// After calling `clear`, `is_empty` must return `true` and `len` 0.
fn clear(&mut self);
}

/// A container that can reveal its contents through iterating by reference and draining.
pub trait IterContainer: Container {
/// The type of elements when reading non-destructively from the container.
type ItemRef<'a> where Self: 'a;

/// The type of elements when draining the container.
type Item<'a> where Self: 'a;

/// Iterator type when reading from the container.
type Iter<'a>: Iterator<Item=Self::ItemRef<'a>> where Self: 'a;
Expand Down Expand Up @@ -116,8 +102,9 @@ pub trait ContainerBuilder: Default + 'static {
/// Partitions `container` among `builders`, using the function `index` to direct items.
fn partition<I>(container: &mut Self::Container, builders: &mut [Self], mut index: I)
where
Self: for<'a> PushInto<<Self::Container as Container>::Item<'a>>,
I: for<'a> FnMut(&<Self::Container as Container>::Item<'a>) -> usize,
Self: for<'a> PushInto<<Self::Container as IterContainer>::Item<'a>>,
I: for<'a> FnMut(&<Self::Container as IterContainer>::Item<'a>) -> usize,
Self::Container: IterContainer,
{
for datum in container.drain() {
let index = index(&datum);
Expand All @@ -142,6 +129,33 @@ pub trait ContainerBuilder: Default + 'static {
/// If you have any questions about this trait you are best off not implementing it.
pub trait LengthPreservingContainerBuilder : ContainerBuilder { }

/// A container builder that never produces any outputs, and can be used to pass through data in
/// operators.
#[derive(Debug, Clone)]
pub struct PassthroughContainerBuilder<C>(std::marker::PhantomData<C>);

impl<C> Default for PassthroughContainerBuilder<C> {
#[inline(always)]
fn default() -> Self {
PassthroughContainerBuilder(std::marker::PhantomData)
}
}

impl<C: Container + Clone + 'static> ContainerBuilder for PassthroughContainerBuilder<C>
{
type Container = C;

#[inline(always)]
fn extract(&mut self) -> Option<&mut Self::Container> {
None
}

#[inline(always)]
fn finish(&mut self) -> Option<&mut Self::Container> {
None
}
}

/// A default container builder that uses length and preferred capacity to chunk data.
///
/// Maintains a single empty allocation between [`Self::push_into`] and [`Self::extract`], but not
Expand All @@ -165,7 +179,7 @@ impl<T, C: SizableContainer + PushInto<T>> PushInto<T> for CapacityContainerBuil
self.current.ensure_capacity(&mut self.empty);

// Push item
self.current.push(item);
self.current.push_into(item);

// Maybe flush
if self.current.at_capacity() {
Expand All @@ -189,30 +203,24 @@ impl<C: Container + Clone + 'static> ContainerBuilder for CapacityContainerBuild

#[inline]
fn finish(&mut self) -> Option<&mut C> {
if !self.current.is_empty() {
if self.current.count() > 0 {
self.pending.push_back(std::mem::take(&mut self.current));
}
self.empty = self.pending.pop_front();
self.empty.as_mut()
}
}

impl<C: Container + Clone + 'static> LengthPreservingContainerBuilder for CapacityContainerBuilder<C> { }
impl<C: Container + SizableContainer + Clone + 'static> LengthPreservingContainerBuilder for CapacityContainerBuilder<C> { }

impl<T> Container for Vec<T> {
#[inline(always)] fn count(&self) -> usize { Vec::len(self) }
#[inline(always)] fn clear(&mut self) { Vec::clear(self) }
}

impl<T> IterContainer for Vec<T> {
type ItemRef<'a> = &'a T where T: 'a;
type Item<'a> = T where T: 'a;

fn len(&self) -> usize {
Vec::len(self)
}

fn is_empty(&self) -> bool {
Vec::is_empty(self)
}

fn clear(&mut self) { Vec::clear(self) }

type Iter<'a> = std::slice::Iter<'a, T> where Self: 'a;

fn iter(&self) -> Self::Iter<'_> {
Expand Down Expand Up @@ -268,29 +276,24 @@ mod rc {
use std::ops::Deref;
use std::rc::Rc;

use crate::Container;
use crate::{Container, IterContainer};

impl<T: Container> Container for Rc<T> {
type ItemRef<'a> = T::ItemRef<'a> where Self: 'a;
type Item<'a> = T::ItemRef<'a> where Self: 'a;

fn len(&self) -> usize {
std::ops::Deref::deref(self).len()
}

fn is_empty(&self) -> bool {
std::ops::Deref::deref(self).is_empty()
}

fn clear(&mut self) {
#[inline(always)] fn count(&self) -> usize { self.as_ref().count() }
#[inline(always)] fn clear(&mut self) {
// Try to reuse the allocation if possible
if let Some(inner) = Rc::get_mut(self) {
inner.clear();
} else {
*self = Self::default();
}
}
}


impl<T: IterContainer> IterContainer for Rc<T> {
type ItemRef<'a> = T::ItemRef<'a> where Self: 'a;
type Item<'a> = T::ItemRef<'a> where Self: 'a;
type Iter<'a> = T::Iter<'a> where Self: 'a;

fn iter(&self) -> Self::Iter<'_> {
Expand All @@ -309,29 +312,23 @@ mod arc {
use std::ops::Deref;
use std::sync::Arc;

use crate::Container;

impl<T: Container> Container for Arc<T> {
type ItemRef<'a> = T::ItemRef<'a> where Self: 'a;
type Item<'a> = T::ItemRef<'a> where Self: 'a;

fn len(&self) -> usize {
std::ops::Deref::deref(self).len()
}

fn is_empty(&self) -> bool {
std::ops::Deref::deref(self).is_empty()
}
use crate::{Container, IterContainer};

fn clear(&mut self) {
impl<T: Container> Container for std::sync::Arc<T> {
#[inline(always)] fn count(&self) -> usize { self.as_ref().count() }
#[inline(always)] fn clear(&mut self) {
// Try to reuse the allocation if possible
if let Some(inner) = Arc::get_mut(self) {
inner.clear();
} else {
*self = Self::default();
}
}
}

impl<T: IterContainer> IterContainer for Arc<T> {
type ItemRef<'a> = T::ItemRef<'a> where Self: 'a;
type Item<'a> = T::ItemRef<'a> where Self: 'a;
type Iter<'a> = T::Iter<'a> where Self: 'a;

fn iter(&self) -> Self::Iter<'_> {
Expand Down
10 changes: 6 additions & 4 deletions timely/examples/columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use {
std::collections::HashMap,
timely::{Container, container::CapacityContainerBuilder},
timely::container::{CapacityContainerBuilder, IterContainer},
timely::dataflow::channels::pact::{ExchangeCore, Pipeline},
timely::dataflow::InputHandleCore,
timely::dataflow::operators::{Inspect, Operator, Probe},
Expand Down Expand Up @@ -165,17 +165,19 @@ mod container {
}
}

impl<C: columnar::ContainerBytes> timely::Container for Column<C> {
fn len(&self) -> usize { self.borrow().len() }
impl<C: columnar::ContainerBytes> timely::container::Container for Column<C> {
#[inline(always)] fn count(&self) -> usize { self.borrow().len() }
// This sets `self` to be an empty `Typed` variant, appropriate for pushing into.
#[inline(always)]
fn clear(&mut self) {
match self {
Column::Typed(t) => t.clear(),
Column::Bytes(_) => *self = Column::Typed(Default::default()),
Column::Align(_) => *self = Column::Typed(Default::default()),
}
}

}
impl<C: columnar::ContainerBytes> timely::container::IterContainer for Column<C> {
type ItemRef<'a> = C::Ref<'a>;
type Iter<'a> = IterOwn<C::Borrowed<'a>>;
fn iter<'a>(&'a self) -> Self::Iter<'a> { self.borrow().into_index_iter() }
Expand Down
18 changes: 9 additions & 9 deletions timely/src/dataflow/channels/pact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use std::{fmt::{self, Debug}, marker::PhantomData};
use std::rc::Rc;

use crate::{Container, container::{ContainerBuilder, LengthPreservingContainerBuilder, SizableContainer, CapacityContainerBuilder, PushInto}};
use crate::container::{ContainerBuilder, LengthPreservingContainerBuilder, IterContainer, Container, SizableContainer, CapacityContainerBuilder, PushInto};
use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
use crate::communication::{Push, Pull};
use crate::dataflow::channels::pushers::Exchange as ExchangePusher;
Expand Down Expand Up @@ -52,8 +52,8 @@ pub type Exchange<D, F> = ExchangeCore<CapacityContainerBuilder<Vec<D>>, F>;

impl<CB, F> ExchangeCore<CB, F>
where
CB: LengthPreservingContainerBuilder,
for<'a> F: FnMut(&<CB::Container as Container>::Item<'a>)->u64
CB: LengthPreservingContainerBuilder<Container: IterContainer>,
for<'a> F: FnMut(&<CB::Container as IterContainer>::Item<'a>)->u64
{
/// Allocates a new `Exchange` pact from a distribution function.
pub fn new_core(func: F) -> ExchangeCore<CB, F> {
Expand All @@ -66,7 +66,7 @@ where

impl<C, F> ExchangeCore<CapacityContainerBuilder<C>, F>
where
C: SizableContainer,
C: SizableContainer + IterContainer,
for<'a> F: FnMut(&C::Item<'a>)->u64
{
/// Allocates a new `Exchange` pact from a distribution function.
Expand All @@ -81,10 +81,10 @@ where
// Exchange uses a `Box<Pushable>` because it cannot know what type of pushable will return from the allocator.
impl<T: Timestamp, CB, H: 'static> ParallelizationContract<T, CB::Container> for ExchangeCore<CB, H>
where
CB: ContainerBuilder,
CB: for<'a> PushInto<<CB::Container as Container>::Item<'a>>,
CB: ContainerBuilder<Container: IterContainer>,
CB: for<'a> PushInto<<CB::Container as IterContainer>::Item<'a>>,
CB::Container: Data + Send + crate::dataflow::channels::ContainerBytes,
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
for<'a> H: FnMut(&<CB::Container as IterContainer>::Item<'a>) -> u64
{
type Pusher = ExchangePusher<T, CB, LogPusher<T, CB::Container, Box<dyn Push<Message<T, CB::Container>>>>, H>;
type Puller = LogPuller<T, CB::Container, Box<dyn Pull<Message<T, CB::Container>>>>;
Expand Down Expand Up @@ -147,7 +147,7 @@ impl<T, C: Container, P: Push<Message<T, C>>> Push<Message<T, C>> for LogPusher<
source: self.source,
target: self.target,
seq_no: self.counter - 1,
length: bundle.data.len(),
length: bundle.data.count(),
})
}
}
Expand Down Expand Up @@ -194,7 +194,7 @@ impl<T, C: Container, P: Pull<Message<T, C>>> Pull<Message<T, C>> for LogPuller<
source: bundle.from,
target,
seq_no: bundle.seq,
length: bundle.data.len(),
length: bundle.data.count(),
});
}
}
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/channels/pullers/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl<T:Ord+Clone+'static, C: Container, P: Pull<Message<T, C>>> Counter<T, C, P>
let guard = ConsumedGuard {
consumed: Rc::clone(&self.consumed),
time: Some(message.time.clone()),
len: message.data.len(),
len: message.data.count(),
};
Some((guard, message))
}
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/channels/pushers/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl<T, CB: ContainerBuilder, P: Push<Message<T, CB::Container>>> Buffer<T, CB,
// buffer always requires a container builder. We could expose the buffer's underlying pusher
// directly, but this would bypass the buffer's time tracking.
fn give_container(&mut self, container: &mut CB::Container) {
if !container.is_empty() {
if container.count() > 0 {
self.flush();
let time = self.time.as_ref().unwrap().clone();
Message::push_at(container, time, &mut self.pusher);
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/channels/pushers/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl<T: Timestamp, C: Container, P> Push<Message<T, C>> for Counter<T, C, P> whe
#[inline]
fn push(&mut self, message: &mut Option<Message<T, C>>) {
if let Some(message) = message {
self.produced.borrow_mut().update(message.time.clone(), message.data.len() as i64);
self.produced.borrow_mut().update(message.time.clone(), message.data.count() as i64);
}

// only propagate `None` if dirty (indicates flush)
Expand Down
18 changes: 9 additions & 9 deletions timely/src/dataflow/channels/pushers/exchange.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
//! The exchange pattern distributes pushed data between many target pushees.

use crate::communication::Push;
use crate::container::{ContainerBuilder, PushInto};
use crate::container::{ContainerBuilder, IterContainer, PushInto};
use crate::dataflow::channels::Message;
use crate::{Container, Data};
use crate::Data;

// TODO : Software write combining
/// Distributes records among target pushees according to a distribution function.
pub struct Exchange<T, CB, P, H>
where
CB: ContainerBuilder,
CB: ContainerBuilder<Container: IterContainer>,
P: Push<Message<T, CB::Container>>,
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
for<'a> H: FnMut(&<CB::Container as IterContainer>::Item<'a>) -> u64
{
pushers: Vec<P>,
builders: Vec<CB>,
Expand All @@ -21,9 +21,9 @@ where

impl<T: Clone, CB, P, H> Exchange<T, CB, P, H>
where
CB: ContainerBuilder,
CB: ContainerBuilder<Container: IterContainer>,
P: Push<Message<T, CB::Container>>,
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
for<'a> H: FnMut(&<CB::Container as IterContainer>::Item<'a>) -> u64
{
/// Allocates a new `Exchange` from a supplied set of pushers and a distribution function.
pub fn new(pushers: Vec<P>, key: H) -> Exchange<T, CB, P, H> {
Expand All @@ -50,10 +50,10 @@ where

impl<T: Eq+Data, CB, P, H> Push<Message<T, CB::Container>> for Exchange<T, CB, P, H>
where
CB: ContainerBuilder,
CB: for<'a> PushInto<<CB::Container as Container>::Item<'a>>,
CB: ContainerBuilder<Container: IterContainer>,
CB: for<'a> PushInto<<CB::Container as IterContainer>::Item<'a>>,
P: Push<Message<T, CB::Container>>,
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
for<'a> H: FnMut(&<CB::Container as IterContainer>::Item<'a>) -> u64
{
#[inline(never)]
fn push(&mut self, message: &mut Option<Message<T, CB::Container>>) {
Expand Down
Loading
Loading