Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 73 additions & 90 deletions container/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,46 +4,34 @@

use std::collections::VecDeque;

/// A container transferring data through dataflow edges
/// A type representing progress, with an update count.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A thing I want to discuss, but am not sure I'll find a better place to comment:

My guess is we'd eventually like to close in on a representation that presents its progress tracking information as a Map<T, usize>, allowing multiple counts for multiple times. This might be in that direction, I think it is, but also I'm trying to think through how this ends up looking and whether there are helpful steps to take as we go.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. We currently have the separation into Message and the data it's transferring, and the Container trait in the past was just the data, mostly for historic reasons. It might make sense to make messages the interface to sending data, which would give more control about the associated timestamps to the users.

///
/// A container stores a number of elements and thus is able to describe it length (`len()`) and
/// whether it is empty (`is_empty()`). It supports removing all elements (`clear`).
/// It describes its update count (`count()`) and whether it is empty (`is_empty()`).
///
/// A container must implement default. The default implementation is not required to allocate
/// memory for variable-length components.
///
/// We require the container to be cloneable to enable efficient copies when providing references
/// of containers to operators. Care must be taken that the type's `clone_from` implementation
/// is efficient (which is not necessarily the case when deriving `Clone`.)
pub trait Container: Default {
/// The type of elements when reading non-destructively from the container.
type ItemRef<'a> where Self: 'a;

/// The type of elements when draining the container.
type Item<'a> where Self: 'a;

/// Push `item` into self
#[inline]
fn push<T>(&mut self, item: T) where Self: PushInto<T> {
self.push_into(item)
}
Comment on lines -27 to -29
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

push wasn't used, removed.


/// We require [`Default`] for convenience purposes.
pub trait WithProgress: Default {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's discuss the name! I don't have anything better, but this name is surprising to me (there is no progress that it is with, and it's unclear what it means).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tbh, I kinda like Container as the name here, in that from timely's point of view containers hold numbers of things that should be tracked, and it has no other opinion. The other trait being ItemContainer or IterContainer or whathaveyou makes lots of sense, but partly because it isn't as "core timely" function.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to Container!

/// The number of elements in this container
///
/// This number is used in progress tracking to confirm the receipt of some number
/// of outstanding records, and it is highly load bearing. The main restriction is
/// imposed on the `LengthPreservingContainerBuilder` trait, whose implementors
/// imposed on the [`CountPreservingContainerBuilder`] trait, whose implementors
/// must preserve the number of items.
fn len(&self) -> usize;
fn count(&self) -> usize;

/// Determine if the container contains any elements, corresponding to `len() == 0`.
/// Determine if the container contains any elements, corresponding to `count() == 0`.
#[inline(always)]
fn is_empty(&self) -> bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should be careful here because throughout Rust-land is_empty() matches len() rather than some other count() function. I'd be interested to figure out where we use this, i.e. where we ask "hey, is that container utterly devoid of anything worth tracking?".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checked the places where we use this, and it is mostly to guard against sending empty containers. Is it correct to send empty containers, i.e., should the count for something within progress tracking be non-zero? It doesn't make much sense to me to send containers with length 0, so it's probably good to guard against it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Removed is_empty and replaced all calls with .count() > 0).

self.len() == 0
self.count() == 0
}
}

/// Remove all contents from `self` while retaining allocated memory.
/// After calling `clear`, `is_empty` must return `true` and `len` 0.
fn clear(&mut self);
/// A container that can reveal its contents through iterating by reference and draining.
pub trait IterableContainer: WithProgress {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I foresee this being deleted / unified with columnar's container traits. No rush, but curious if that checks out too. This is now I think not timely specific as much as "can I read data out of here?". At least, if not columnar then some random container traits somewhere, unrelated to core timely, in timely only as long as there aren't better ones somewhere else.

/// The type of elements when reading non-destructively from the container.
type ItemRef<'a> where Self: 'a;

/// The type of elements when draining the container.
type Item<'a> where Self: 'a;

/// Iterator type when reading from the container.
type Iter<'a>: Iterator<Item=Self::ItemRef<'a>> where Self: 'a;
Expand All @@ -60,7 +48,7 @@ pub trait Container: Default {
}

/// A container that can be sized and reveals its capacity.
pub trait SizableContainer: Container {
pub trait SizableContainer: WithProgress {
/// Indicates that the container is "full" and should be shipped.
fn at_capacity(&self) -> bool;
/// Restores `self` to its desired capacity, if it has one.
Expand Down Expand Up @@ -102,7 +90,7 @@ pub trait PushInto<T> {
/// decide to represent a push order for `extract` and `finish`, or not.
pub trait ContainerBuilder: Default + 'static {
/// The container type we're building.
type Container: Container + Clone + 'static;
type Container: WithProgress + Default + Clone + 'static;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WithProgress : Default, so is the + Default redundant, or perhaps helpful because that WithProgress constraint isn't locked down?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed, was redundant.

/// Extract assembled containers, potentially leaving unfinished data behind. Can
/// be called repeatedly, for example while the caller can send data.
///
Expand All @@ -116,14 +104,14 @@ pub trait ContainerBuilder: Default + 'static {
/// Partitions `container` among `builders`, using the function `index` to direct items.
fn partition<I>(container: &mut Self::Container, builders: &mut [Self], mut index: I)
where
Self: for<'a> PushInto<<Self::Container as Container>::Item<'a>>,
I: for<'a> FnMut(&<Self::Container as Container>::Item<'a>) -> usize,
Self: for<'a> PushInto<<Self::Container as IterableContainer>::Item<'a>>,
I: for<'a> FnMut(&<Self::Container as IterableContainer>::Item<'a>) -> usize,
Self::Container: IterableContainer,
{
for datum in container.drain() {
let index = index(&datum);
builders[index].push_into(datum);
}
container.clear();
}

/// Indicates a good moment to release resources.
Expand All @@ -140,7 +128,34 @@ pub trait ContainerBuilder: Default + 'static {
/// Specifically, the sum of lengths of all extracted and finished containers must equal the
/// number of times that `push_into` is called on the container builder.
/// If you have any questions about this trait you are best off not implementing it.
pub trait LengthPreservingContainerBuilder : ContainerBuilder { }
pub trait CountPreservingContainerBuilder: ContainerBuilder { }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Names are a bit weird here, because "count" is a method on WithProgress and not obviously a top-level concept. I wonder if it makes sense to require all container builders to be count preserving? As in, you now need to implement both count() and len(), and there's no room for containers whose count() method is incorrect?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't the idea of the trait to indicate that it's compatible with partition, i.e., after partitioning, the sum length of the parts equals the length of the input? From this perspective, it seems worth keeping around.

I restored the old name.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the goal of the trait historically was to say, in the absence of a count function, "the len function correctly represents the intent of count", but .. I'm not sure we need that distinction any more? At least, I can't tell what would implement ContainerBuilder and not CountPreservingContainerBuilder and we wouldn't immediately call buggy.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might still be useful for a container builder that compacts in-place, like we have in Differential. The consolidating container builder certainly can only be used before we fix counts for progress tracking. It can't be used for an exchange because data might consolidate differently.

I see two ways out: Either we have a container that stores the count independently of the contents, and sets the count to the number of push_into calls outside progress tracking, or rework the exchange logic to make it clear that container builders inside have less flexibility than the ones before we take the count.

For this reason, I think it makes sense to leave this trait in Timely for the time being.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still not following, but maybe I need a few to think and understand. Can you point at a thing that implements ContainerBuilder and not CountPreservingContainerBuilder? I'm pretty sure I will then conclude we should fix / delete it. A differential builder than has a count() method and just uses e.g. Vec::len for this even with consolidation .. is .. just wrong going forward. It is incorrect, and we should disallow it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a trait for both Container and ImplementsContainerCorrectly is what seems weird.


/// A container builder that never produces any outputs, and can be used to pass through data in
/// operators.
#[derive(Debug, Clone)]
pub struct PassthroughContainerBuilder<C>(std::marker::PhantomData<C>);

impl<C> Default for PassthroughContainerBuilder<C> {
#[inline(always)]
fn default() -> Self {
PassthroughContainerBuilder(std::marker::PhantomData)
}
}

impl<C: WithProgress + Clone + 'static> ContainerBuilder for PassthroughContainerBuilder<C>
{
type Container = C;

#[inline(always)]
fn extract(&mut self) -> Option<&mut Self::Container> {
None
}

#[inline(always)]
fn finish(&mut self) -> Option<&mut Self::Container> {
None
}
}

/// A default container builder that uses length and preferred capacity to chunk data.
///
Expand All @@ -165,7 +180,7 @@ impl<T, C: SizableContainer + PushInto<T>> PushInto<T> for CapacityContainerBuil
self.current.ensure_capacity(&mut self.empty);

// Push item
self.current.push(item);
self.current.push_into(item);

// Maybe flush
if self.current.at_capacity() {
Expand All @@ -174,7 +189,7 @@ impl<T, C: SizableContainer + PushInto<T>> PushInto<T> for CapacityContainerBuil
}
}

impl<C: Container + Clone + 'static> ContainerBuilder for CapacityContainerBuilder<C> {
impl<C: WithProgress + Clone + 'static> ContainerBuilder for CapacityContainerBuilder<C> {
type Container = C;

#[inline]
Expand All @@ -197,22 +212,11 @@ impl<C: Container + Clone + 'static> ContainerBuilder for CapacityContainerBuild
}
}

impl<C: Container + Clone + 'static> LengthPreservingContainerBuilder for CapacityContainerBuilder<C> { }
impl<C: WithProgress + SizableContainer + Clone + 'static> CountPreservingContainerBuilder for CapacityContainerBuilder<C> { }

impl<T> Container for Vec<T> {
impl<T> IterableContainer for Vec<T> {
type ItemRef<'a> = &'a T where T: 'a;
type Item<'a> = T where T: 'a;

fn len(&self) -> usize {
Vec::len(self)
}

fn is_empty(&self) -> bool {
Vec::is_empty(self)
}

fn clear(&mut self) { Vec::clear(self) }

type Iter<'a> = std::slice::Iter<'a, T> where Self: 'a;

fn iter(&self) -> Self::Iter<'_> {
Expand Down Expand Up @@ -268,29 +272,11 @@ mod rc {
use std::ops::Deref;
use std::rc::Rc;

use crate::Container;
use crate::IterableContainer;

impl<T: Container> Container for Rc<T> {
impl<T: IterableContainer> IterableContainer for Rc<T> {
type ItemRef<'a> = T::ItemRef<'a> where Self: 'a;
type Item<'a> = T::ItemRef<'a> where Self: 'a;

fn len(&self) -> usize {
std::ops::Deref::deref(self).len()
}

fn is_empty(&self) -> bool {
std::ops::Deref::deref(self).is_empty()
}

fn clear(&mut self) {
// Try to reuse the allocation if possible
if let Some(inner) = Rc::get_mut(self) {
inner.clear();
} else {
*self = Self::default();
}
}

type Iter<'a> = T::Iter<'a> where Self: 'a;

fn iter(&self) -> Self::Iter<'_> {
Expand All @@ -309,29 +295,11 @@ mod arc {
use std::ops::Deref;
use std::sync::Arc;

use crate::Container;
use crate::IterableContainer;

impl<T: Container> Container for Arc<T> {
impl<T: IterableContainer> IterableContainer for Arc<T> {
type ItemRef<'a> = T::ItemRef<'a> where Self: 'a;
type Item<'a> = T::ItemRef<'a> where Self: 'a;

fn len(&self) -> usize {
std::ops::Deref::deref(self).len()
}

fn is_empty(&self) -> bool {
std::ops::Deref::deref(self).is_empty()
}

fn clear(&mut self) {
// Try to reuse the allocation if possible
if let Some(inner) = Arc::get_mut(self) {
inner.clear();
} else {
*self = Self::default();
}
}

type Iter<'a> = T::Iter<'a> where Self: 'a;

fn iter(&self) -> Self::Iter<'_> {
Expand Down Expand Up @@ -366,3 +334,18 @@ pub mod buffer {
}
}
}

impl<T> WithProgress for Vec<T> {
#[inline(always)] fn count(&self) -> usize { self.len() }
#[inline(always)] fn is_empty(&self) -> bool { Vec::is_empty(self) }
}

impl<T: WithProgress> WithProgress for std::rc::Rc<T> {
#[inline(always)] fn count(&self) -> usize { self.as_ref().count() }
#[inline(always)] fn is_empty(&self) -> bool { self.as_ref().is_empty() }
}

impl<T: WithProgress> WithProgress for std::sync::Arc<T> {
#[inline(always)] fn count(&self) -> usize { self.as_ref().count() }
#[inline(always)] fn is_empty(&self) -> bool { self.as_ref().is_empty() }
}
22 changes: 8 additions & 14 deletions timely/examples/columnar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use {
std::collections::HashMap,
timely::{Container, container::CapacityContainerBuilder},
timely::container::{CapacityContainerBuilder, IterableContainer},
timely::dataflow::channels::pact::{ExchangeCore, Pipeline},
timely::dataflow::InputHandleCore,
timely::dataflow::operators::{Inspect, Operator, Probe},
Expand Down Expand Up @@ -165,17 +165,11 @@ mod container {
}
}

impl<C: columnar::ContainerBytes> timely::Container for Column<C> {
fn len(&self) -> usize { self.borrow().len() }
// This sets `self` to be an empty `Typed` variant, appropriate for pushing into.
fn clear(&mut self) {
match self {
Column::Typed(t) => t.clear(),
Column::Bytes(_) => *self = Column::Typed(Default::default()),
Column::Align(_) => *self = Column::Typed(Default::default()),
}
}

impl<C: columnar::ContainerBytes> timely::container::WithProgress for Column<C> {
#[inline(always)]
fn count(&self) -> usize { self.borrow().len() }
}
impl<C: columnar::ContainerBytes> timely::container::IterableContainer for Column<C> {
type ItemRef<'a> = C::Ref<'a>;
type Iter<'a> = IterOwn<C::Borrowed<'a>>;
fn iter<'a>(&'a self) -> Self::Iter<'a> { self.borrow().into_index_iter() }
Expand Down Expand Up @@ -284,7 +278,7 @@ mod builder {
}
}

use timely::container::{ContainerBuilder, LengthPreservingContainerBuilder};
use timely::container::{ContainerBuilder, CountPreservingContainerBuilder};
impl<C: columnar::ContainerBytes> ContainerBuilder for ColumnBuilder<C> {
type Container = Column<C>;

Expand Down Expand Up @@ -317,5 +311,5 @@ mod builder {
}
}

impl<C: columnar::ContainerBytes> LengthPreservingContainerBuilder for ColumnBuilder<C> { }
impl<C: columnar::ContainerBytes> CountPreservingContainerBuilder for ColumnBuilder<C> { }
}
8 changes: 4 additions & 4 deletions timely/src/dataflow/channels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use serde::{Deserialize, Serialize};
use crate::communication::Push;
use crate::Container;
use crate::container::WithProgress;

/// A collection of types that may be pushed at.
pub mod pushers;
Expand Down Expand Up @@ -32,14 +32,15 @@ impl<T, C> Message<T, C> {
}
}

impl<T, C: Container> Message<T, C> {
impl<T, C: WithProgress> Message<T, C> {
/// Creates a new message instance from arguments.
pub fn new(time: T, data: C, from: usize, seq: usize) -> Self {
Message { time, data, from, seq }
}

/// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher
/// leaves in place, or the container's default element. The buffer is cleared.
/// leaves in place, or the container's default element. The buffer's contents are left in an
/// undefined state, specifically the caller cannot rely on this function clearing the buffer.
#[inline]
pub fn push_at<P: Push<Message<T, C>>>(buffer: &mut C, time: T, pusher: &mut P) {

Expand All @@ -51,7 +52,6 @@ impl<T, C: Container> Message<T, C> {

if let Some(message) = bundle {
*buffer = message.data;
buffer.clear();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line prompted me to think I don't actually understand what is going on. It seems like a pretty substantial change to some contracts, and it's worth going through the reason for it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Backed out all the clear changes, and added a clear function to Container. I'm not sure we want to have it in the future, but seems easier to decouple the discussions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Elsewhere (e.g. columnar) you just have a Clear trait and require it when you want to be able to re-use a container.

}
}
}
Expand Down
Loading
Loading