Skip to content

Commit 486c988

Browse files
authored
Remove SizableContainer requirement from partition (#612)
Removes the SizableContainer requirement from the partition operator. Also removes the same requirement from some places where it's not longer needed. Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent f45b2aa commit 486c988

File tree

4 files changed

+9
-14
lines changed

4 files changed

+9
-14
lines changed

container/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,6 @@ pub trait ContainerBuilder: Default + 'static {
120120
fn partition<I>(container: &mut Self::Container, builders: &mut [Self], mut index: I)
121121
where
122122
Self: for<'a> PushInto<<Self::Container as Container>::Item<'a>>,
123-
Self::Container: SizableContainer,
124123
I: for<'a> FnMut(&<Self::Container as Container>::Item<'a>) -> usize,
125124
{
126125
for datum in container.drain() {

timely/src/dataflow/channels/pact.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ pub type Exchange<D, F> = ExchangeCore<CapacityContainerBuilder<Vec<D>>, F>;
5353
impl<CB, F> ExchangeCore<CB, F>
5454
where
5555
CB: LengthPreservingContainerBuilder,
56-
CB::Container: SizableContainer,
5756
for<'a> F: FnMut(&<CB::Container as Container>::Item<'a>)->u64
5857
{
5958
/// Allocates a new `Exchange` pact from a distribution function.
@@ -84,7 +83,7 @@ impl<T: Timestamp, CB, H: 'static> ParallelizationContract<T, CB::Container> for
8483
where
8584
CB: ContainerBuilder,
8685
CB: for<'a> PushInto<<CB::Container as Container>::Item<'a>>,
87-
CB::Container: Data + Send + SizableContainer + crate::dataflow::channels::ContainerBytes,
86+
CB::Container: Data + Send + crate::dataflow::channels::ContainerBytes,
8887
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
8988
{
9089
type Pusher = ExchangePusher<T, CB, LogPusher<T, CB::Container, Box<dyn Push<Message<T, CB::Container>>>>, H>;

timely/src/dataflow/channels/pushers/exchange.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! The exchange pattern distributes pushed data between many target pushees.
22
33
use crate::communication::Push;
4-
use crate::container::{ContainerBuilder, SizableContainer, PushInto};
4+
use crate::container::{ContainerBuilder, PushInto};
55
use crate::dataflow::channels::Message;
66
use crate::{Container, Data};
77

@@ -10,7 +10,6 @@ use crate::{Container, Data};
1010
pub struct Exchange<T, CB, P, H>
1111
where
1212
CB: ContainerBuilder,
13-
CB::Container: SizableContainer,
1413
P: Push<Message<T, CB::Container>>,
1514
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
1615
{
@@ -23,7 +22,6 @@ where
2322
impl<T: Clone, CB, P, H> Exchange<T, CB, P, H>
2423
where
2524
CB: ContainerBuilder,
26-
CB::Container: SizableContainer,
2725
P: Push<Message<T, CB::Container>>,
2826
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
2927
{
@@ -53,7 +51,6 @@ where
5351
impl<T: Eq+Data, CB, P, H> Push<Message<T, CB::Container>> for Exchange<T, CB, P, H>
5452
where
5553
CB: ContainerBuilder,
56-
CB::Container: SizableContainer,
5754
CB: for<'a> PushInto<<CB::Container as Container>::Item<'a>>,
5855
P: Push<Message<T, CB::Container>>,
5956
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64

timely/src/dataflow/operators/core/partition.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! Partition a stream of records into multiple streams.
22
3-
use timely_container::{Container, ContainerBuilder, PushInto, SizableContainer};
3+
use timely_container::{Container, ContainerBuilder, PushInto};
44

55
use crate::dataflow::channels::pact::Pipeline;
66
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
@@ -29,16 +29,16 @@ pub trait Partition<G: Scope, C: Container> {
2929
/// ```
3030
fn partition<CB, D2, F>(&self, parts: u64, route: F) -> Vec<StreamCore<G, CB::Container>>
3131
where
32-
CB: ContainerBuilder,
33-
CB::Container: SizableContainer + PushInto<D2> + Data,
32+
CB: ContainerBuilder + PushInto<D2>,
33+
CB::Container: Data,
3434
F: FnMut(C::Item<'_>) -> (u64, D2) + 'static;
3535
}
3636

3737
impl<G: Scope, C: Container + Data> Partition<G, C> for StreamCore<G, C> {
3838
fn partition<CB, D2, F>(&self, parts: u64, mut route: F) -> Vec<StreamCore<G, CB::Container>>
3939
where
40-
CB: ContainerBuilder,
41-
CB::Container: SizableContainer + PushInto<D2> + Data,
40+
CB: ContainerBuilder + PushInto<D2>,
41+
CB::Container: Data,
4242
F: FnMut(C::Item<'_>) -> (u64, D2) + 'static,
4343
{
4444
let mut builder = OperatorBuilder::new("Partition".to_owned(), self.scope());
@@ -48,7 +48,7 @@ impl<G: Scope, C: Container + Data> Partition<G, C> for StreamCore<G, C> {
4848
let mut streams = Vec::with_capacity(parts as usize);
4949

5050
for _ in 0..parts {
51-
let (output, stream) = builder.new_output();
51+
let (output, stream) = builder.new_output::<CB>();
5252
outputs.push(output);
5353
streams.push(stream);
5454
}
@@ -59,7 +59,7 @@ impl<G: Scope, C: Container + Data> Partition<G, C> for StreamCore<G, C> {
5959
input.for_each(|time, data| {
6060
let mut sessions = handles
6161
.iter_mut()
62-
.map(|h| h.session(&time))
62+
.map(|h| h.session_with_builder(&time))
6363
.collect::<Vec<_>>();
6464

6565
for datum in data.drain() {

0 commit comments

Comments
 (0)