Skip to content

Commit af70245

Browse files
Merge pull request #697 from antiguru/container_rework
Rework container abstractions
2 parents 066bd14 + dfc9012 commit af70245

33 files changed

+245
-293
lines changed

container/src/lib.rs

Lines changed: 78 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -4,71 +4,66 @@
44

55
use std::collections::VecDeque;
66

7-
/// A container transferring data through dataflow edges
7+
/// An type containing a number of records accounted for by progress tracking.
88
///
9-
/// A container stores a number of elements and thus is able to describe it length (`len()`) and
10-
/// whether it is empty (`is_empty()`). It supports removing all elements (`clear`).
11-
///
12-
/// A container must implement default. The default implementation is not required to allocate
13-
/// memory for variable-length components.
14-
///
15-
/// We require the container to be cloneable to enable efficient copies when providing references
16-
/// of containers to operators. Care must be taken that the type's `clone_from` implementation
17-
/// is efficient (which is not necessarily the case when deriving `Clone`.)
18-
pub trait Container: Default {
19-
/// The type of elements when reading non-destructively from the container.
20-
type ItemRef<'a> where Self: 'a;
21-
22-
/// The type of elements when draining the container.
23-
type Item<'a> where Self: 'a;
24-
25-
/// Push `item` into self
26-
#[inline]
27-
fn push<T>(&mut self, item: T) where Self: PushInto<T> {
28-
self.push_into(item)
29-
}
30-
31-
/// The number of elements in this container
9+
/// The object stores a number of updates and thus is able to describe it count
10+
/// (`update_count()`) and whether it is empty (`is_empty()`). It is empty if the
11+
/// update count is zero.
12+
pub trait Accountable {
13+
/// The number of records
3214
///
3315
/// This number is used in progress tracking to confirm the receipt of some number
3416
/// of outstanding records, and it is highly load bearing. The main restriction is
3517
/// imposed on the `LengthPreservingContainerBuilder` trait, whose implementors
36-
/// must preserve the number of items.
37-
fn len(&self) -> usize;
18+
/// must preserve the number of records.
19+
fn record_count(&self) -> i64;
3820

39-
/// Determine if the container contains any elements, corresponding to `len() == 0`.
40-
fn is_empty(&self) -> bool {
41-
self.len() == 0
42-
}
43-
44-
/// Remove all contents from `self` while retaining allocated memory.
45-
/// After calling `clear`, `is_empty` must return `true` and `len` 0.
46-
fn clear(&mut self);
21+
/// Determine if this contains any updates, corresponding to `update_count() == 0`.
22+
/// It is a correctness error for this to by anything other than `self.record_count() == 0`.
23+
#[inline] fn is_empty(&self) -> bool { self.record_count() == 0 }
24+
}
4725

26+
/// A container that allows iteration morally equivalent to [`IntoIterator`].
27+
///
28+
/// Iterating the container presents items in an implementation-specific order.
29+
/// The container's contents are not changed.
30+
pub trait IterContainer {
31+
/// The type of elements when reading non-destructively from the container.
32+
type ItemRef<'a> where Self: 'a;
4833
/// Iterator type when reading from the container.
4934
type Iter<'a>: Iterator<Item=Self::ItemRef<'a>> where Self: 'a;
50-
5135
/// Returns an iterator that reads the contents of this container.
5236
fn iter(&self) -> Self::Iter<'_>;
37+
}
5338

39+
/// A container that can drain itself.
40+
///
41+
/// Draining the container presents items in an implementation-specific order.
42+
/// The container is in an undefined state after calling [`drain`]. Dropping
43+
/// the iterator also leaves the container in an undefined state.
44+
pub trait DrainContainer {
45+
/// The type of elements when draining the container.
46+
type Item<'a> where Self: 'a;
5447
/// Iterator type when draining the container.
5548
type DrainIter<'a>: Iterator<Item=Self::Item<'a>> where Self: 'a;
56-
5749
/// Returns an iterator that drains the contents of this container.
5850
/// Drain leaves the container in an undefined state.
5951
fn drain(&mut self) -> Self::DrainIter<'_>;
6052
}
6153

6254
/// A container that can be sized and reveals its capacity.
63-
pub trait SizableContainer: Container {
55+
pub trait SizableContainer {
6456
/// Indicates that the container is "full" and should be shipped.
6557
fn at_capacity(&self) -> bool;
6658
/// Restores `self` to its desired capacity, if it has one.
6759
///
6860
/// The `stash` argument is available, and may have the intended capacity.
6961
/// However, it may be non-empty, and may be of the wrong capacity. The
7062
/// method should guard against these cases.
71-
fn ensure_capacity(&mut self, stash: &mut Option<Self>);
63+
///
64+
/// Assume that the `stash` is in an undefined state, and properly clear it
65+
/// before re-using it.
66+
fn ensure_capacity(&mut self, stash: &mut Option<Self>) where Self: Sized;
7267
}
7368

7469
/// A container that can absorb items of a specific type.
@@ -95,14 +90,19 @@ pub trait PushInto<T> {
9590
/// [`Self::finish`]. Implementations can recycle buffers, but should ensure that they clear
9691
/// any remaining elements.
9792
///
93+
/// Implementations are allowed to re-use the contents of the mutable references left by the caller,
94+
/// but they should ensure that they clear the contents before doing so.
95+
///
9896
/// For example, a consolidating builder can aggregate differences in-place, but it has
9997
/// to ensure that it preserves the intended information.
10098
///
10199
/// The trait does not prescribe any specific ordering guarantees, and each implementation can
102100
/// decide to represent a push order for `extract` and `finish`, or not.
103101
pub trait ContainerBuilder: Default + 'static {
104102
/// The container type we're building.
105-
type Container: Container + Clone + 'static;
103+
// The container is `Clone` because `Tee` requires it, otherwise we need to repeat it
104+
// all over Timely. `'static` because we don't want lifetimes everywhere.
105+
type Container: Accountable + Default + Clone + 'static;
106106
/// Extract assembled containers, potentially leaving unfinished data behind. Can
107107
/// be called repeatedly, for example while the caller can send data.
108108
///
@@ -124,8 +124,8 @@ pub trait ContainerBuilder: Default + 'static {
124124

125125
/// A wrapper trait indicating that the container building will preserve the number of records.
126126
///
127-
/// Specifically, the sum of lengths of all extracted and finished containers must equal the
128-
/// number of times that `push_into` is called on the container builder.
127+
/// Specifically, the sum of record counts of all extracted and finished containers must equal the
128+
/// number of accounted records that are pushed into the container builder.
129129
/// If you have any questions about this trait you are best off not implementing it.
130130
pub trait LengthPreservingContainerBuilder : ContainerBuilder { }
131131

@@ -145,14 +145,14 @@ pub struct CapacityContainerBuilder<C>{
145145
pending: VecDeque<C>,
146146
}
147147

148-
impl<T, C: SizableContainer + PushInto<T>> PushInto<T> for CapacityContainerBuilder<C> {
148+
impl<T, C: SizableContainer + Default + PushInto<T>> PushInto<T> for CapacityContainerBuilder<C> {
149149
#[inline]
150150
fn push_into(&mut self, item: T) {
151151
// Ensure capacity
152152
self.current.ensure_capacity(&mut self.empty);
153153

154154
// Push item
155-
self.current.push(item);
155+
self.current.push_into(item);
156156

157157
// Maybe flush
158158
if self.current.at_capacity() {
@@ -161,7 +161,7 @@ impl<T, C: SizableContainer + PushInto<T>> PushInto<T> for CapacityContainerBuil
161161
}
162162
}
163163

164-
impl<C: Container + Clone + 'static> ContainerBuilder for CapacityContainerBuilder<C> {
164+
impl<C: Accountable + Default + Clone + 'static> ContainerBuilder for CapacityContainerBuilder<C> {
165165
type Container = C;
166166

167167
#[inline]
@@ -184,31 +184,25 @@ impl<C: Container + Clone + 'static> ContainerBuilder for CapacityContainerBuild
184184
}
185185
}
186186

187-
impl<C: Container + Clone + 'static> LengthPreservingContainerBuilder for CapacityContainerBuilder<C> { }
187+
impl<C: Accountable + SizableContainer + Default + Clone + 'static> LengthPreservingContainerBuilder for CapacityContainerBuilder<C> { }
188188

189-
impl<T> Container for Vec<T> {
190-
type ItemRef<'a> = &'a T where T: 'a;
191-
type Item<'a> = T where T: 'a;
192-
193-
fn len(&self) -> usize {
194-
Vec::len(self)
195-
}
196-
197-
fn is_empty(&self) -> bool {
198-
Vec::is_empty(self)
199-
}
200-
201-
fn clear(&mut self) { Vec::clear(self) }
189+
impl<T> Accountable for Vec<T> {
190+
#[inline] fn record_count(&self) -> i64 { i64::try_from(Vec::len(self)).unwrap() }
191+
#[inline] fn is_empty(&self) -> bool { Vec::is_empty(self) }
192+
}
202193

194+
impl<T> IterContainer for Vec<T> {
195+
type ItemRef<'a> = &'a T where T: 'a;
203196
type Iter<'a> = std::slice::Iter<'a, T> where Self: 'a;
204-
205-
fn iter(&self) -> Self::Iter<'_> {
197+
#[inline] fn iter(&self) -> Self::Iter<'_> {
206198
self.as_slice().iter()
207199
}
200+
}
208201

202+
impl<T> DrainContainer for Vec<T> {
203+
type Item<'a> = T where T: 'a;
209204
type DrainIter<'a> = std::vec::Drain<'a, T> where Self: 'a;
210-
211-
fn drain(&mut self) -> Self::DrainIter<'_> {
205+
#[inline] fn drain(&mut self) -> Self::DrainIter<'_> {
212206
self.drain(..)
213207
}
214208
}
@@ -255,81 +249,43 @@ mod rc {
255249
use std::ops::Deref;
256250
use std::rc::Rc;
257251

258-
use crate::Container;
252+
use crate::{IterContainer, DrainContainer};
259253

260-
impl<T: Container> Container for Rc<T> {
254+
impl<T: crate::Accountable> crate::Accountable for Rc<T> {
255+
#[inline] fn record_count(&self) -> i64 { std::ops::Deref::deref(self).record_count() }
256+
#[inline] fn is_empty(&self) -> bool { std::ops::Deref::deref(self).is_empty() }
257+
}
258+
impl<T: IterContainer> IterContainer for Rc<T> {
261259
type ItemRef<'a> = T::ItemRef<'a> where Self: 'a;
262-
type Item<'a> = T::ItemRef<'a> where Self: 'a;
263-
264-
fn len(&self) -> usize {
265-
std::ops::Deref::deref(self).len()
266-
}
267-
268-
fn is_empty(&self) -> bool {
269-
std::ops::Deref::deref(self).is_empty()
270-
}
271-
272-
fn clear(&mut self) {
273-
// Try to reuse the allocation if possible
274-
if let Some(inner) = Rc::get_mut(self) {
275-
inner.clear();
276-
} else {
277-
*self = Self::default();
278-
}
279-
}
280-
281260
type Iter<'a> = T::Iter<'a> where Self: 'a;
282-
283-
fn iter(&self) -> Self::Iter<'_> {
284-
self.deref().iter()
285-
}
286-
261+
#[inline] fn iter(&self) -> Self::Iter<'_> { self.deref().iter() }
262+
}
263+
impl<T: IterContainer> DrainContainer for Rc<T> {
264+
type Item<'a> = T::ItemRef<'a> where Self: 'a;
287265
type DrainIter<'a> = T::Iter<'a> where Self: 'a;
288-
289-
fn drain(&mut self) -> Self::DrainIter<'_> {
290-
self.iter()
291-
}
266+
#[inline] fn drain(&mut self) -> Self::DrainIter<'_> { self.iter() }
292267
}
293268
}
294269

295270
mod arc {
296271
use std::ops::Deref;
297272
use std::sync::Arc;
298273

299-
use crate::Container;
274+
use crate::{IterContainer, DrainContainer};
300275

301-
impl<T: Container> Container for Arc<T> {
276+
impl<T: crate::Accountable> crate::Accountable for Arc<T> {
277+
#[inline] fn record_count(&self) -> i64 { std::ops::Deref::deref(self).record_count() }
278+
#[inline] fn is_empty(&self) -> bool { std::ops::Deref::deref(self).is_empty() }
279+
}
280+
impl<T: IterContainer> IterContainer for Arc<T> {
302281
type ItemRef<'a> = T::ItemRef<'a> where Self: 'a;
303-
type Item<'a> = T::ItemRef<'a> where Self: 'a;
304-
305-
fn len(&self) -> usize {
306-
std::ops::Deref::deref(self).len()
307-
}
308-
309-
fn is_empty(&self) -> bool {
310-
std::ops::Deref::deref(self).is_empty()
311-
}
312-
313-
fn clear(&mut self) {
314-
// Try to reuse the allocation if possible
315-
if let Some(inner) = Arc::get_mut(self) {
316-
inner.clear();
317-
} else {
318-
*self = Self::default();
319-
}
320-
}
321-
322282
type Iter<'a> = T::Iter<'a> where Self: 'a;
323-
324-
fn iter(&self) -> Self::Iter<'_> {
325-
self.deref().iter()
326-
}
327-
283+
#[inline] fn iter(&self) -> Self::Iter<'_> { self.deref().iter() }
284+
}
285+
impl<T: IterContainer> DrainContainer for Arc<T> {
286+
type Item<'a> = T::ItemRef<'a> where Self: 'a;
328287
type DrainIter<'a> = T::Iter<'a> where Self: 'a;
329-
330-
fn drain(&mut self) -> Self::DrainIter<'_> {
331-
self.iter()
332-
}
288+
#[inline] fn drain(&mut self) -> Self::DrainIter<'_> { self.iter() }
333289
}
334290
}
335291

timely/examples/columnar.rs

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
//! Wordcount based on the `columnar` crate.
22
3-
use {
4-
std::collections::HashMap,
5-
timely::{Container, container::CapacityContainerBuilder},
6-
timely::dataflow::channels::pact::{ExchangeCore, Pipeline},
7-
timely::dataflow::InputHandleCore,
8-
timely::dataflow::operators::{Inspect, Operator, Probe},
9-
timely::dataflow::ProbeHandle,
10-
};
3+
use std::collections::HashMap;
4+
5+
use timely::container::{IterContainer, CapacityContainerBuilder};
6+
use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
7+
use timely::dataflow::InputHandleCore;
8+
use timely::dataflow::operators::{Inspect, Operator, Probe};
9+
use timely::dataflow::ProbeHandle;
1110

1211
// Creates `WordCountContainer` and `WordCountReference` structs,
1312
// as well as various implementations relating them to `WordCount`.
@@ -177,21 +176,16 @@ mod container {
177176
}
178177
}
179178

180-
impl<C: columnar::ContainerBytes> timely::Container for Column<C> {
181-
fn len(&self) -> usize { self.borrow().len() }
182-
// This sets `self` to be an empty `Typed` variant, appropriate for pushing into.
183-
fn clear(&mut self) {
184-
match self {
185-
Column::Typed(t) => t.clear(),
186-
Column::Bytes(_) => *self = Column::Typed(Default::default()),
187-
Column::Align(_) => *self = Column::Typed(Default::default()),
188-
}
189-
}
190-
179+
impl<C: columnar::ContainerBytes> timely::Accountable for Column<C> {
180+
#[inline] fn record_count(&self) -> i64 { i64::try_from(self.borrow().len()).unwrap() }
181+
#[inline] fn is_empty(&self) -> bool { self.borrow().is_empty() }
182+
}
183+
impl<C: columnar::ContainerBytes> timely::container::IterContainer for Column<C> {
191184
type ItemRef<'a> = C::Ref<'a>;
192185
type Iter<'a> = IterOwn<C::Borrowed<'a>>;
193186
fn iter<'a>(&'a self) -> Self::Iter<'a> { self.borrow().into_index_iter() }
194-
187+
}
188+
impl<C: columnar::ContainerBytes> timely::container::DrainContainer for Column<C> {
195189
type Item<'a> = C::Ref<'a>;
196190
type DrainIter<'a> = IterOwn<C::Borrowed<'a>>;
197191
fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> { self.borrow().into_index_iter() }

timely/src/dataflow/channels/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ impl<T, C: Container> Message<T, C> {
3939
}
4040

4141
/// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher
42-
/// leaves in place, or the container's default element. The buffer is cleared.
42+
/// leaves in place, or the container's default element. The buffer is left in an undefined state.
4343
#[inline]
4444
pub fn push_at<P: Push<Message<T, C>>>(buffer: &mut C, time: T, pusher: &mut P) {
4545

@@ -51,7 +51,6 @@ impl<T, C: Container> Message<T, C> {
5151

5252
if let Some(message) = bundle {
5353
*buffer = message.data;
54-
buffer.clear();
5554
}
5655
}
5756
}

0 commit comments

Comments
 (0)