Skip to content

Commit 5095ff4

Browse files
committed
Split Container into Container, IterContainer, DrainContainer
Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 922902c commit 5095ff4

File tree

11 files changed

+96
-98
lines changed

11 files changed

+96
-98
lines changed

container/src/lib.rs

Lines changed: 38 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,6 @@ use std::collections::VecDeque;
2020
// We can only access the type if all requirements for the `ContainerBuilder` implementation are
2121
// satisfied.
2222
pub trait Container: Default {
23-
/// The type of elements when reading non-destructively from the container.
24-
type ItemRef<'a> where Self: 'a;
25-
26-
/// The type of elements when draining the container.
27-
type Item<'a> where Self: 'a;
28-
2923
/// The number of elements in this container
3024
///
3125
/// This number is used in progress tracking to confirm the receipt of some number
@@ -38,16 +32,24 @@ pub trait Container: Default {
3832
fn is_empty(&self) -> bool {
3933
self.len() == 0
4034
}
35+
}
4136

37+
/// TODO
38+
pub trait IterContainer {
39+
/// The type of elements when reading non-destructively from the container.
40+
type ItemRef<'a> where Self: 'a;
4241
/// Iterator type when reading from the container.
4342
type Iter<'a>: Iterator<Item=Self::ItemRef<'a>> where Self: 'a;
44-
4543
/// Returns an iterator that reads the contents of this container.
4644
fn iter(&self) -> Self::Iter<'_>;
45+
}
4746

47+
/// TODO
48+
pub trait DrainContainer {
49+
/// The type of elements when draining the container.
50+
type Item<'a> where Self: 'a;
4851
/// Iterator type when draining the container.
4952
type DrainIter<'a>: Iterator<Item=Self::Item<'a>> where Self: 'a;
50-
5153
/// Returns an iterator that drains the contents of this container.
5254
/// Drain leaves the container in an undefined state.
5355
fn drain(&mut self) -> Self::DrainIter<'_>;
@@ -112,8 +114,9 @@ pub trait ContainerBuilder: Default + 'static {
112114
/// Partitions `container` among `builders`, using the function `index` to direct items.
113115
fn partition<I>(container: &mut Self::Container, builders: &mut [Self], mut index: I)
114116
where
115-
Self: for<'a> PushInto<<Self::Container as Container>::Item<'a>>,
116-
I: for<'a> FnMut(&<Self::Container as Container>::Item<'a>) -> usize,
117+
Self::Container: DrainContainer,
118+
Self: for<'a> PushInto<<Self::Container as DrainContainer>::Item<'a>>,
119+
I: for<'a> FnMut(&<Self::Container as DrainContainer>::Item<'a>) -> usize,
117120
{
118121
for datum in container.drain() {
119122
let index = index(&datum);
@@ -195,25 +198,25 @@ impl<C: Container + Clone + 'static> ContainerBuilder for CapacityContainerBuild
195198
impl<C: Container + Clone + 'static> LengthPreservingContainerBuilder for CapacityContainerBuilder<C> { }
196199

197200
impl<T> Container for Vec<T> {
198-
type ItemRef<'a> = &'a T where T: 'a;
199-
type Item<'a> = T where T: 'a;
200-
201201
fn len(&self) -> usize {
202202
Vec::len(self)
203203
}
204-
205204
fn is_empty(&self) -> bool {
206205
Vec::is_empty(self)
207206
}
207+
}
208208

209+
impl<T> IterContainer for Vec<T> {
210+
type ItemRef<'a> = &'a T where T: 'a;
209211
type Iter<'a> = std::slice::Iter<'a, T> where Self: 'a;
210-
211212
fn iter(&self) -> Self::Iter<'_> {
212213
self.as_slice().iter()
213214
}
215+
}
214216

217+
impl<T> DrainContainer for Vec<T> {
218+
type Item<'a> = T where T: 'a;
215219
type DrainIter<'a> = std::vec::Drain<'a, T> where Self: 'a;
216-
217220
fn drain(&mut self) -> Self::DrainIter<'_> {
218221
self.drain(..)
219222
}
@@ -261,63 +264,47 @@ mod rc {
261264
use std::ops::Deref;
262265
use std::rc::Rc;
263266

264-
use crate::Container;
267+
use crate::{Container, IterContainer, DrainContainer};
265268

266269
impl<T: Container> Container for Rc<T> {
267-
type ItemRef<'a> = T::ItemRef<'a> where Self: 'a;
268-
type Item<'a> = T::ItemRef<'a> where Self: 'a;
269-
270270
fn len(&self) -> usize {
271271
std::ops::Deref::deref(self).len()
272272
}
273-
274273
fn is_empty(&self) -> bool {
275274
std::ops::Deref::deref(self).is_empty()
276275
}
277-
276+
}
277+
impl<T: IterContainer> IterContainer for Rc<T> {
278+
type ItemRef<'a> = T::ItemRef<'a> where Self: 'a;
278279
type Iter<'a> = T::Iter<'a> where Self: 'a;
279-
280-
fn iter(&self) -> Self::Iter<'_> {
281-
self.deref().iter()
282-
}
283-
280+
fn iter(&self) -> Self::Iter<'_> { self.deref().iter() }
281+
}
282+
impl<T: IterContainer> DrainContainer for Rc<T> {
283+
type Item<'a> = T::ItemRef<'a> where Self: 'a;
284284
type DrainIter<'a> = T::Iter<'a> where Self: 'a;
285-
286-
fn drain(&mut self) -> Self::DrainIter<'_> {
287-
self.iter()
288-
}
285+
fn drain(&mut self) -> Self::DrainIter<'_> { self.iter() }
289286
}
290287
}
291288

292289
mod arc {
293290
use std::ops::Deref;
294291
use std::sync::Arc;
295292

296-
use crate::Container;
293+
use crate::{Container, IterContainer, DrainContainer};
297294

298295
impl<T: Container> Container for Arc<T> {
296+
fn len(&self) -> usize { std::ops::Deref::deref(self).len() }
297+
fn is_empty(&self) -> bool { std::ops::Deref::deref(self).is_empty() }
298+
}
299+
impl<T: IterContainer> IterContainer for Arc<T> {
299300
type ItemRef<'a> = T::ItemRef<'a> where Self: 'a;
300-
type Item<'a> = T::ItemRef<'a> where Self: 'a;
301-
302-
fn len(&self) -> usize {
303-
std::ops::Deref::deref(self).len()
304-
}
305-
306-
fn is_empty(&self) -> bool {
307-
std::ops::Deref::deref(self).is_empty()
308-
}
309-
310301
type Iter<'a> = T::Iter<'a> where Self: 'a;
311-
312-
fn iter(&self) -> Self::Iter<'_> {
313-
self.deref().iter()
314-
}
315-
302+
fn iter(&self) -> Self::Iter<'_> { self.deref().iter() }
303+
}
304+
impl<T: IterContainer> DrainContainer for Arc<T> {
305+
type Item<'a> = T::ItemRef<'a> where Self: 'a;
316306
type DrainIter<'a> = T::Iter<'a> where Self: 'a;
317-
318-
fn drain(&mut self) -> Self::DrainIter<'_> {
319-
self.iter()
320-
}
307+
fn drain(&mut self) -> Self::DrainIter<'_> { self.iter() }
321308
}
322309
}
323310

timely/examples/columnar.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
//! Wordcount based on the `columnar` crate.
22
3-
use {
4-
std::collections::HashMap,
5-
timely::{Container, container::CapacityContainerBuilder},
6-
timely::dataflow::channels::pact::{ExchangeCore, Pipeline},
7-
timely::dataflow::InputHandleCore,
8-
timely::dataflow::operators::{Inspect, Operator, Probe},
9-
timely::dataflow::ProbeHandle,
10-
};
3+
use std::collections::HashMap;
4+
5+
use timely::container::{IterContainer, CapacityContainerBuilder};
6+
use timely::dataflow::channels::pact::{ExchangeCore, Pipeline};
7+
use timely::dataflow::InputHandleCore;
8+
use timely::dataflow::operators::{Inspect, Operator, Probe};
9+
use timely::dataflow::ProbeHandle;
1110

1211
// Creates `WordCountContainer` and `WordCountReference` structs,
1312
// as well as various implementations relating them to `WordCount`.
@@ -179,10 +178,13 @@ mod container {
179178

180179
impl<C: columnar::ContainerBytes> timely::Container for Column<C> {
181180
fn len(&self) -> usize { self.borrow().len() }
181+
}
182+
impl<C: columnar::ContainerBytes> timely::container::IterContainer for Column<C> {
182183
type ItemRef<'a> = C::Ref<'a>;
183184
type Iter<'a> = IterOwn<C::Borrowed<'a>>;
184185
fn iter<'a>(&'a self) -> Self::Iter<'a> { self.borrow().into_index_iter() }
185-
186+
}
187+
impl<C: columnar::ContainerBytes> timely::container::DrainContainer for Column<C> {
186188
type Item<'a> = C::Ref<'a>;
187189
type DrainIter<'a> = IterOwn<C::Borrowed<'a>>;
188190
fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> { self.borrow().into_index_iter() }

timely/src/dataflow/channels/pact.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@
1010
use std::{fmt::{self, Debug}, marker::PhantomData};
1111
use std::rc::Rc;
1212

13-
use crate::{Container, container::{ContainerBuilder, LengthPreservingContainerBuilder, SizableContainer, CapacityContainerBuilder, PushInto}};
13+
use crate::Container;
14+
use crate::container::{ContainerBuilder, DrainContainer, LengthPreservingContainerBuilder, SizableContainer, CapacityContainerBuilder, PushInto};
1415
use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
1516
use crate::communication::{Push, Pull};
1617
use crate::dataflow::channels::pushers::Exchange as ExchangePusher;
@@ -53,7 +54,8 @@ pub type Exchange<D, F> = ExchangeCore<CapacityContainerBuilder<Vec<D>>, F>;
5354
impl<CB, F> ExchangeCore<CB, F>
5455
where
5556
CB: LengthPreservingContainerBuilder,
56-
for<'a> F: FnMut(&<CB::Container as Container>::Item<'a>)->u64
57+
CB::Container: DrainContainer,
58+
for<'a> F: FnMut(&<CB::Container as DrainContainer>::Item<'a>)->u64
5759
{
5860
/// Allocates a new `Exchange` pact from a distribution function.
5961
pub fn new_core(func: F) -> ExchangeCore<CB, F> {
@@ -66,7 +68,7 @@ where
6668

6769
impl<C, F> ExchangeCore<CapacityContainerBuilder<C>, F>
6870
where
69-
C: SizableContainer,
71+
C: SizableContainer + DrainContainer,
7072
for<'a> F: FnMut(&C::Item<'a>)->u64
7173
{
7274
/// Allocates a new `Exchange` pact from a distribution function.
@@ -82,9 +84,10 @@ where
8284
impl<T: Timestamp, CB, H: 'static> ParallelizationContract<T, CB::Container> for ExchangeCore<CB, H>
8385
where
8486
CB: ContainerBuilder,
85-
CB: for<'a> PushInto<<CB::Container as Container>::Item<'a>>,
87+
CB::Container: DrainContainer,
88+
CB: for<'a> PushInto<<CB::Container as DrainContainer>::Item<'a>>,
8689
CB::Container: Data + Send + crate::dataflow::channels::ContainerBytes,
87-
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
90+
for<'a> H: FnMut(&<CB::Container as DrainContainer>::Item<'a>) -> u64
8891
{
8992
type Pusher = ExchangePusher<T, CB, LogPusher<Box<dyn Push<Message<T, CB::Container>>>>, H>;
9093
type Puller = LogPuller<Box<dyn Pull<Message<T, CB::Container>>>>;

timely/src/dataflow/channels/pushers/exchange.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,17 @@
33
use crate::communication::Push;
44
use crate::container::{ContainerBuilder, PushInto};
55
use crate::dataflow::channels::Message;
6-
use crate::{Container, Data};
6+
use crate::Data;
7+
use crate::container::DrainContainer;
78

89
// TODO : Software write combining
910
/// Distributes records among target pushees according to a distribution function.
1011
pub struct Exchange<T, CB, P, H>
1112
where
1213
CB: ContainerBuilder,
14+
CB::Container: DrainContainer,
1315
P: Push<Message<T, CB::Container>>,
14-
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
16+
for<'a> H: FnMut(&<CB::Container as DrainContainer>::Item<'a>) -> u64
1517
{
1618
pushers: Vec<P>,
1719
builders: Vec<CB>,
@@ -22,8 +24,9 @@ where
2224
impl<T: Clone, CB, P, H> Exchange<T, CB, P, H>
2325
where
2426
CB: ContainerBuilder,
27+
CB::Container: DrainContainer,
2528
P: Push<Message<T, CB::Container>>,
26-
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
29+
for<'a> H: FnMut(&<CB::Container as DrainContainer>::Item<'a>) -> u64
2730
{
2831
/// Allocates a new `Exchange` from a supplied set of pushers and a distribution function.
2932
pub fn new(pushers: Vec<P>, key: H) -> Exchange<T, CB, P, H> {
@@ -48,9 +51,10 @@ where
4851
impl<T: Eq+Data, CB, P, H> Push<Message<T, CB::Container>> for Exchange<T, CB, P, H>
4952
where
5053
CB: ContainerBuilder,
51-
CB: for<'a> PushInto<<CB::Container as Container>::Item<'a>>,
54+
CB::Container: DrainContainer,
55+
CB: for<'a> PushInto<<CB::Container as DrainContainer>::Item<'a>>,
5256
P: Push<Message<T, CB::Container>>,
53-
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
57+
for<'a> H: FnMut(&<CB::Container as DrainContainer>::Item<'a>) -> u64
5458
{
5559
#[inline(never)]
5660
fn push(&mut self, message: &mut Option<Message<T, CB::Container>>) {

timely/src/dataflow/operators/core/capture/extract.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! Traits and types for extracting captured timely dataflow streams.
22
33
use super::Event;
4-
use crate::{container::{SizableContainer, PushInto}};
4+
use crate::container::{SizableContainer, DrainContainer, PushInto};
55

66
/// Supports extracting a sequence of timestamp and data.
77
pub trait Extract<T, C> {
@@ -48,10 +48,10 @@ pub trait Extract<T, C> {
4848
fn extract(self) -> Vec<(T, C)>;
4949
}
5050

51-
impl<T: Ord, C: SizableContainer> Extract<T, C> for ::std::sync::mpsc::Receiver<Event<T, C>>
51+
impl<T, C> Extract<T, C> for ::std::sync::mpsc::Receiver<Event<T, C>>
5252
where
53-
for<'a> C: PushInto<C::Item<'a>>,
54-
for<'a> C::Item<'a>: Ord,
53+
for<'a> C: SizableContainer + DrainContainer<Item<'a>: Ord> + PushInto<C::Item<'a>>,
54+
T: Ord,
5555
{
5656
fn extract(self) -> Vec<(T, C)> {
5757
let mut staged = std::collections::BTreeMap::new();

timely/src/dataflow/operators/core/exchange.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
//! Exchange records between workers.
22
33
use crate::ExchangeData;
4-
use crate::container::{Container, SizableContainer, PushInto};
4+
use crate::container::{DrainContainer, SizableContainer, PushInto};
55
use crate::dataflow::channels::pact::ExchangeCore;
66
use crate::dataflow::operators::generic::operator::Operator;
77
use crate::dataflow::{Scope, StreamCore};
88

99
/// Exchange records between workers.
10-
pub trait Exchange<C: Container> {
10+
pub trait Exchange<C: DrainContainer> {
1111
/// Exchange records between workers.
1212
///
1313
/// The closure supplied should map a reference to a record to a `u64`,
@@ -30,9 +30,11 @@ pub trait Exchange<C: Container> {
3030

3131
impl<G: Scope, C> Exchange<C> for StreamCore<G, C>
3232
where
33-
C: SizableContainer + ExchangeData + crate::dataflow::channels::ContainerBytes,
34-
C: for<'a> PushInto<C::Item<'a>>,
35-
33+
C: SizableContainer
34+
+ DrainContainer
35+
+ ExchangeData
36+
+ crate::dataflow::channels::ContainerBytes
37+
+ for<'a> PushInto<C::Item<'a>>,
3638
{
3739
fn exchange<F>(&self, route: F) -> StreamCore<G, C>
3840
where

timely/src/dataflow/operators/core/filter.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
//! Filters a stream by a predicate.
2-
use crate::container::{Container, SizableContainer, PushInto};
2+
use crate::container::{DrainContainer, SizableContainer, PushInto};
33
use crate::Data;
44
use crate::dataflow::channels::pact::Pipeline;
55
use crate::dataflow::{Scope, StreamCore};
66
use crate::dataflow::operators::generic::operator::Operator;
77

88
/// Extension trait for filtering.
9-
pub trait Filter<C: Container> {
9+
pub trait Filter<C: DrainContainer> {
1010
/// Returns a new instance of `self` containing only records satisfying `predicate`.
1111
///
1212
/// # Examples
@@ -23,7 +23,7 @@ pub trait Filter<C: Container> {
2323
fn filter<P: FnMut(&C::Item<'_>)->bool+'static>(&self, predicate: P) -> Self;
2424
}
2525

26-
impl<G: Scope, C: SizableContainer + Data> Filter<C> for StreamCore<G, C>
26+
impl<G: Scope, C: SizableContainer + DrainContainer + Data> Filter<C> for StreamCore<G, C>
2727
where
2828
for<'a> C: PushInto<C::Item<'a>>
2929
{

0 commit comments

Comments
 (0)