Skip to content

Commit 0d03378

Browse files
committed
Progress container with length
Splits the Container trait into a progress container and a container. The progress container only exposes a length, which is enough for most parts of Timely. The container trait extends the progress container and carries the usual functions to iterate and clear the contents. I'm open to renaming and moving types, and primarily wanted to check whether it's possible to split the trait. Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 85ce6cd commit 0d03378

File tree

26 files changed

+217
-173
lines changed

26 files changed

+217
-173
lines changed

container/src/lib.rs

Lines changed: 78 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -7,27 +7,11 @@ use std::collections::VecDeque;
77
/// A container transferring data through dataflow edges
88
///
99
/// A container stores a number of elements and thus is able to describe it length (`len()`) and
10-
/// whether it is empty (`is_empty()`). It supports removing all elements (`clear`).
10+
/// whether it is empty (`is_empty()`).
1111
///
1212
/// A container must implement default. The default implementation is not required to allocate
1313
/// memory for variable-length components.
14-
///
15-
/// We require the container to be cloneable to enable efficient copies when providing references
16-
/// of containers to operators. Care must be taken that the type's `clone_from` implementation
17-
/// is efficient (which is not necessarily the case when deriving `Clone`.)
18-
pub trait Container: Default {
19-
/// The type of elements when reading non-destructively from the container.
20-
type ItemRef<'a> where Self: 'a;
21-
22-
/// The type of elements when draining the container.
23-
type Item<'a> where Self: 'a;
24-
25-
/// Push `item` into self
26-
#[inline]
27-
fn push<T>(&mut self, item: T) where Self: PushInto<T> {
28-
self.push_into(item)
29-
}
30-
14+
pub trait ProgressContainer: Default {
3115
/// The number of elements in this container
3216
///
3317
/// This number is used in progress tracking to confirm the receipt of some number
@@ -37,9 +21,27 @@ pub trait Container: Default {
3721
fn len(&self) -> usize;
3822

3923
/// Determine if the container contains any elements, corresponding to `len() == 0`.
24+
#[inline(always)]
4025
fn is_empty(&self) -> bool {
4126
self.len() == 0
4227
}
28+
}
29+
30+
/// A container that can reveal its contents through iterating by reference and draining.
31+
///
32+
/// It supports removing all elements (`clear`).
33+
pub trait Container: ProgressContainer {
34+
/// The type of elements when reading non-destructively from the container.
35+
type ItemRef<'a> where Self: 'a;
36+
37+
/// The type of elements when draining the container.
38+
type Item<'a> where Self: 'a;
39+
40+
/// Push `item` into self
41+
#[inline]
42+
fn push<T>(&mut self, item: T) where Self: PushInto<T> {
43+
self.push_into(item)
44+
}
4345

4446
/// Remove all contents from `self` while retaining allocated memory.
4547
/// After calling `clear`, `is_empty` must return `true` and `len` 0.
@@ -102,7 +104,7 @@ pub trait PushInto<T> {
102104
/// decide to represent a push order for `extract` and `finish`, or not.
103105
pub trait ContainerBuilder: Default + 'static {
104106
/// The container type we're building.
105-
type Container: Container + Clone + 'static;
107+
type Container: ProgressContainer + Clone + 'static;
106108
/// Extract assembled containers, potentially leaving unfinished data behind. Can
107109
/// be called repeatedly, for example while the caller can send data.
108110
///
@@ -118,6 +120,7 @@ pub trait ContainerBuilder: Default + 'static {
118120
where
119121
Self: for<'a> PushInto<<Self::Container as Container>::Item<'a>>,
120122
I: for<'a> FnMut(&<Self::Container as Container>::Item<'a>) -> usize,
123+
Self::Container: Container,
121124
{
122125
for datum in container.drain() {
123126
let index = index(&datum);
@@ -142,6 +145,35 @@ pub trait ContainerBuilder: Default + 'static {
142145
/// If you have any questions about this trait you are best off not implementing it.
143146
pub trait LengthPreservingContainerBuilder : ContainerBuilder { }
144147

148+
/// A container builder that never produces any outputs, and can be used to pass through data in
149+
/// operators.
150+
#[derive(Debug, Clone)]
151+
pub struct PassthroughContainerBuilder<C>(std::marker::PhantomData<C>);
152+
153+
impl<C> Default for PassthroughContainerBuilder<C> {
154+
#[inline(always)]
155+
fn default() -> Self {
156+
PassthroughContainerBuilder(std::marker::PhantomData)
157+
}
158+
}
159+
160+
impl<C> ContainerBuilder for PassthroughContainerBuilder<C>
161+
where
162+
C: ProgressContainer + Clone + 'static,
163+
{
164+
type Container = C;
165+
166+
#[inline(always)]
167+
fn extract(&mut self) -> Option<&mut Self::Container> {
168+
None
169+
}
170+
171+
#[inline(always)]
172+
fn finish(&mut self) -> Option<&mut Self::Container> {
173+
None
174+
}
175+
}
176+
145177
/// A default container builder that uses length and preferred capacity to chunk data.
146178
///
147179
/// Maintains a single empty allocation between [`Self::push_into`] and [`Self::extract`], but not
@@ -165,7 +197,7 @@ impl<T, C: SizableContainer + PushInto<T>> PushInto<T> for CapacityContainerBuil
165197
self.current.ensure_capacity(&mut self.empty);
166198

167199
// Push item
168-
self.current.push(item);
200+
self.current.push_into(item);
169201

170202
// Maybe flush
171203
if self.current.at_capacity() {
@@ -199,18 +231,18 @@ impl<C: Container + Clone + 'static> ContainerBuilder for CapacityContainerBuild
199231

200232
impl<C: Container + Clone + 'static> LengthPreservingContainerBuilder for CapacityContainerBuilder<C> { }
201233

234+
impl<T> ProgressContainer for Vec<T> {
235+
#[inline(always)]
236+
fn len(&self) -> usize { Vec::len(self) }
237+
238+
#[inline(always)]
239+
fn is_empty(&self) -> bool { Vec::is_empty(self) }
240+
}
241+
202242
impl<T> Container for Vec<T> {
203243
type ItemRef<'a> = &'a T where T: 'a;
204244
type Item<'a> = T where T: 'a;
205245

206-
fn len(&self) -> usize {
207-
Vec::len(self)
208-
}
209-
210-
fn is_empty(&self) -> bool {
211-
Vec::is_empty(self)
212-
}
213-
214246
fn clear(&mut self) { Vec::clear(self) }
215247

216248
type Iter<'a> = std::slice::Iter<'a, T> where Self: 'a;
@@ -268,20 +300,20 @@ mod rc {
268300
use std::ops::Deref;
269301
use std::rc::Rc;
270302

271-
use crate::Container;
303+
use crate::{Container, ProgressContainer};
304+
305+
impl<T: ProgressContainer> ProgressContainer for Rc<T> {
306+
#[inline(always)]
307+
fn len(&self) -> usize { std::ops::Deref::deref(self).len() }
308+
309+
#[inline(always)]
310+
fn is_empty(&self) -> bool { std::ops::Deref::deref(self).is_empty() }
311+
}
272312

273313
impl<T: Container> Container for Rc<T> {
274314
type ItemRef<'a> = T::ItemRef<'a> where Self: 'a;
275315
type Item<'a> = T::ItemRef<'a> where Self: 'a;
276316

277-
fn len(&self) -> usize {
278-
std::ops::Deref::deref(self).len()
279-
}
280-
281-
fn is_empty(&self) -> bool {
282-
std::ops::Deref::deref(self).is_empty()
283-
}
284-
285317
fn clear(&mut self) {
286318
// Try to reuse the allocation if possible
287319
if let Some(inner) = Rc::get_mut(self) {
@@ -309,20 +341,20 @@ mod arc {
309341
use std::ops::Deref;
310342
use std::sync::Arc;
311343

312-
use crate::Container;
344+
use crate::{Container, ProgressContainer};
345+
346+
impl<T: ProgressContainer> ProgressContainer for Arc<T> {
347+
#[inline(always)]
348+
fn len(&self) -> usize { std::ops::Deref::deref(self).len() }
349+
350+
#[inline(always)]
351+
fn is_empty(&self) -> bool { std::ops::Deref::deref(self).is_empty() }
352+
}
313353

314354
impl<T: Container> Container for Arc<T> {
315355
type ItemRef<'a> = T::ItemRef<'a> where Self: 'a;
316356
type Item<'a> = T::ItemRef<'a> where Self: 'a;
317357

318-
fn len(&self) -> usize {
319-
std::ops::Deref::deref(self).len()
320-
}
321-
322-
fn is_empty(&self) -> bool {
323-
std::ops::Deref::deref(self).is_empty()
324-
}
325-
326358
fn clear(&mut self) {
327359
// Try to reuse the allocation if possible
328360
if let Some(inner) = Arc::get_mut(self) {

timely/examples/columnar.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,11 @@ mod container {
165165
}
166166
}
167167

168-
impl<C: columnar::ContainerBytes> timely::Container for Column<C> {
168+
impl<C: columnar::ContainerBytes> timely::container::ProgressContainer for Column<C> {
169+
#[inline(always)]
169170
fn len(&self) -> usize { self.borrow().len() }
171+
}
172+
impl<C: columnar::ContainerBytes> timely::Container for Column<C> {
170173
// This sets `self` to be an empty `Typed` variant, appropriate for pushing into.
171174
fn clear(&mut self) {
172175
match self {

timely/src/dataflow/channels/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
33
use serde::{Deserialize, Serialize};
44
use crate::communication::Push;
5-
use crate::Container;
5+
use crate::container::ProgressContainer;
66

77
/// A collection of types that may be pushed at.
88
pub mod pushers;
@@ -32,14 +32,15 @@ impl<T, C> Message<T, C> {
3232
}
3333
}
3434

35-
impl<T, C: Container> Message<T, C> {
35+
impl<T, C: ProgressContainer> Message<T, C> {
3636
/// Creates a new message instance from arguments.
3737
pub fn new(time: T, data: C, from: usize, seq: usize) -> Self {
3838
Message { time, data, from, seq }
3939
}
4040

4141
/// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher
42-
/// leaves in place, or the container's default element. The buffer is cleared.
42+
/// leaves in place, or the container's default element. The buffer's contents are left in an
43+
/// undefined state, specifically the caller cannot rely on this function clearing the buffer.
4344
#[inline]
4445
pub fn push_at<P: Push<Message<T, C>>>(buffer: &mut C, time: T, pusher: &mut P) {
4546

@@ -51,7 +52,6 @@ impl<T, C: Container> Message<T, C> {
5152

5253
if let Some(message) = bundle {
5354
*buffer = message.data;
54-
buffer.clear();
5555
}
5656
}
5757
}

timely/src/dataflow/channels/pact.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
use std::{fmt::{self, Debug}, marker::PhantomData};
1111
use std::rc::Rc;
1212

13-
use crate::{Container, container::{ContainerBuilder, LengthPreservingContainerBuilder, SizableContainer, CapacityContainerBuilder, PushInto}};
13+
use crate::{Container, container::{ContainerBuilder, LengthPreservingContainerBuilder, ProgressContainer, SizableContainer, CapacityContainerBuilder, PushInto}};
1414
use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
1515
use crate::communication::{Push, Pull};
1616
use crate::dataflow::channels::pushers::Exchange as ExchangePusher;
@@ -34,7 +34,7 @@ pub trait ParallelizationContract<T, C> {
3434
#[derive(Debug)]
3535
pub struct Pipeline;
3636

37-
impl<T: 'static, C: Container + 'static> ParallelizationContract<T, C> for Pipeline {
37+
impl<T: 'static, C: ProgressContainer + 'static> ParallelizationContract<T, C> for Pipeline {
3838
type Pusher = LogPusher<T, C, ThreadPusher<Message<T, C>>>;
3939
type Puller = LogPuller<T, C, ThreadPuller<Message<T, C>>>;
4040
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
@@ -52,7 +52,7 @@ pub type Exchange<D, F> = ExchangeCore<CapacityContainerBuilder<Vec<D>>, F>;
5252

5353
impl<CB, F> ExchangeCore<CB, F>
5454
where
55-
CB: LengthPreservingContainerBuilder,
55+
CB: LengthPreservingContainerBuilder<Container: Container>,
5656
for<'a> F: FnMut(&<CB::Container as Container>::Item<'a>)->u64
5757
{
5858
/// Allocates a new `Exchange` pact from a distribution function.
@@ -81,7 +81,7 @@ where
8181
// Exchange uses a `Box<Pushable>` because it cannot know what type of pushable will return from the allocator.
8282
impl<T: Timestamp, CB, H: 'static> ParallelizationContract<T, CB::Container> for ExchangeCore<CB, H>
8383
where
84-
CB: ContainerBuilder,
84+
CB: ContainerBuilder<Container: Container>,
8585
CB: for<'a> PushInto<<CB::Container as Container>::Item<'a>>,
8686
CB::Container: Data + Send + crate::dataflow::channels::ContainerBytes,
8787
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
@@ -129,7 +129,7 @@ impl<T, C, P: Push<Message<T, C>>> LogPusher<T, C, P> {
129129
}
130130
}
131131

132-
impl<T, C: Container, P: Push<Message<T, C>>> Push<Message<T, C>> for LogPusher<T, C, P> {
132+
impl<T, C: ProgressContainer, P: Push<Message<T, C>>> Push<Message<T, C>> for LogPusher<T, C, P> {
133133
#[inline]
134134
fn push(&mut self, pair: &mut Option<Message<T, C>>) {
135135
if let Some(bundle) = pair {
@@ -179,7 +179,7 @@ impl<T, C, P: Pull<Message<T, C>>> LogPuller<T, C, P> {
179179
}
180180
}
181181

182-
impl<T, C: Container, P: Pull<Message<T, C>>> Pull<Message<T, C>> for LogPuller<T, C, P> {
182+
impl<T, C: ProgressContainer, P: Pull<Message<T, C>>> Pull<Message<T, C>> for LogPuller<T, C, P> {
183183
#[inline]
184184
fn pull(&mut self) -> &mut Option<Message<T, C>> {
185185
let result = self.puller.pull();

timely/src/dataflow/channels/pullers/counter.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::cell::RefCell;
66
use crate::dataflow::channels::Message;
77
use crate::progress::ChangeBatch;
88
use crate::communication::Pull;
9-
use crate::Container;
9+
use crate::container::ProgressContainer;
1010

1111
/// A wrapper which accounts records pulled past in a shared count map.
1212
pub struct Counter<T: Ord+Clone+'static, C, P: Pull<Message<T, C>>> {
@@ -36,7 +36,7 @@ impl<T:Ord+Clone+'static> Drop for ConsumedGuard<T> {
3636
}
3737
}
3838

39-
impl<T:Ord+Clone+'static, C: Container, P: Pull<Message<T, C>>> Counter<T, C, P> {
39+
impl<T:Ord+Clone+'static, C: ProgressContainer, P: Pull<Message<T, C>>> Counter<T, C, P> {
4040
/// Retrieves the next timestamp and batch of data.
4141
#[inline]
4242
pub fn next(&mut self) -> Option<&mut Message<T, C>> {

timely/src/dataflow/channels/pushers/buffer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//! with the performance of batched sends.
33
44
use crate::communication::Push;
5-
use crate::container::{ContainerBuilder, CapacityContainerBuilder, PushInto};
5+
use crate::container::{ContainerBuilder, CapacityContainerBuilder, ProgressContainer, PushInto};
66
use crate::dataflow::channels::Message;
77
use crate::dataflow::operators::Capability;
88
use crate::progress::Timestamp;

timely/src/dataflow/channels/pushers/counter.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::cell::RefCell;
77
use crate::progress::{ChangeBatch, Timestamp};
88
use crate::dataflow::channels::Message;
99
use crate::communication::Push;
10-
use crate::Container;
10+
use crate::container::ProgressContainer;
1111

1212
/// A wrapper which updates shared `produced` based on the number of records pushed.
1313
#[derive(Debug)]
@@ -17,7 +17,7 @@ pub struct Counter<T, C, P: Push<Message<T, C>>> {
1717
phantom: PhantomData<C>,
1818
}
1919

20-
impl<T: Timestamp, C: Container, P> Push<Message<T, C>> for Counter<T, C, P> where P: Push<Message<T, C>> {
20+
impl<T: Timestamp, C: ProgressContainer, P> Push<Message<T, C>> for Counter<T, C, P> where P: Push<Message<T, C>> {
2121
#[inline]
2222
fn push(&mut self, message: &mut Option<Message<T, C>>) {
2323
if let Some(message) = message {

timely/src/dataflow/channels/pushers/exchange.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::{Container, Data};
99
/// Distributes records among target pushees according to a distribution function.
1010
pub struct Exchange<T, CB, P, H>
1111
where
12-
CB: ContainerBuilder,
12+
CB: ContainerBuilder<Container: Container>,
1313
P: Push<Message<T, CB::Container>>,
1414
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
1515
{
@@ -21,7 +21,7 @@ where
2121

2222
impl<T: Clone, CB, P, H> Exchange<T, CB, P, H>
2323
where
24-
CB: ContainerBuilder,
24+
CB: ContainerBuilder<Container: Container>,
2525
P: Push<Message<T, CB::Container>>,
2626
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
2727
{
@@ -50,7 +50,7 @@ where
5050

5151
impl<T: Eq+Data, CB, P, H> Push<Message<T, CB::Container>> for Exchange<T, CB, P, H>
5252
where
53-
CB: ContainerBuilder,
53+
CB: ContainerBuilder<Container: Container>,
5454
CB: for<'a> PushInto<<CB::Container as Container>::Item<'a>>,
5555
P: Push<Message<T, CB::Container>>,
5656
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64

0 commit comments

Comments
 (0)