Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion container/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ pub trait ContainerBuilder: Default + 'static {
fn partition<I>(container: &mut Self::Container, builders: &mut [Self], mut index: I)
where
Self: for<'a> PushInto<<Self::Container as Container>::Item<'a>>,
Self::Container: SizableContainer,
I: for<'a> FnMut(&<Self::Container as Container>::Item<'a>) -> usize,
{
for datum in container.drain() {
Expand Down
3 changes: 1 addition & 2 deletions timely/src/dataflow/channels/pact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ pub type Exchange<D, F> = ExchangeCore<CapacityContainerBuilder<Vec<D>>, F>;
impl<CB, F> ExchangeCore<CB, F>
where
CB: LengthPreservingContainerBuilder,
CB::Container: SizableContainer,
for<'a> F: FnMut(&<CB::Container as Container>::Item<'a>)->u64
{
/// Allocates a new `Exchange` pact from a distribution function.
Expand Down Expand Up @@ -84,7 +83,7 @@ impl<T: Timestamp, CB, H: 'static> ParallelizationContract<T, CB::Container> for
where
CB: ContainerBuilder,
CB: for<'a> PushInto<<CB::Container as Container>::Item<'a>>,
CB::Container: Data + Send + SizableContainer + crate::dataflow::channels::ContainerBytes,
CB::Container: Data + Send + crate::dataflow::channels::ContainerBytes,
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
{
type Pusher = ExchangePusher<T, CB, LogPusher<T, CB::Container, Box<dyn Push<Message<T, CB::Container>>>>, H>;
Expand Down
5 changes: 1 addition & 4 deletions timely/src/dataflow/channels/pushers/exchange.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! The exchange pattern distributes pushed data between many target pushees.

use crate::communication::Push;
use crate::container::{ContainerBuilder, SizableContainer, PushInto};
use crate::container::{ContainerBuilder, PushInto};
use crate::dataflow::channels::Message;
use crate::{Container, Data};

Expand All @@ -10,7 +10,6 @@ use crate::{Container, Data};
pub struct Exchange<T, CB, P, H>
where
CB: ContainerBuilder,
CB::Container: SizableContainer,
P: Push<Message<T, CB::Container>>,
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
{
Expand All @@ -23,7 +22,6 @@ where
impl<T: Clone, CB, P, H> Exchange<T, CB, P, H>
where
CB: ContainerBuilder,
CB::Container: SizableContainer,
P: Push<Message<T, CB::Container>>,
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
{
Expand Down Expand Up @@ -53,7 +51,6 @@ where
impl<T: Eq+Data, CB, P, H> Push<Message<T, CB::Container>> for Exchange<T, CB, P, H>
where
CB: ContainerBuilder,
CB::Container: SizableContainer,
CB: for<'a> PushInto<<CB::Container as Container>::Item<'a>>,
P: Push<Message<T, CB::Container>>,
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
Expand Down
14 changes: 7 additions & 7 deletions timely/src/dataflow/operators/core/partition.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Partition a stream of records into multiple streams.

use timely_container::{Container, ContainerBuilder, PushInto, SizableContainer};
use timely_container::{Container, ContainerBuilder, PushInto};

use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
Expand Down Expand Up @@ -29,16 +29,16 @@ pub trait Partition<G: Scope, C: Container> {
/// ```
fn partition<CB, D2, F>(&self, parts: u64, route: F) -> Vec<StreamCore<G, CB::Container>>
where
CB: ContainerBuilder,
CB::Container: SizableContainer + PushInto<D2> + Data,
CB: ContainerBuilder + PushInto<D2>,
CB::Container: Data,
F: FnMut(C::Item<'_>) -> (u64, D2) + 'static;
}

impl<G: Scope, C: Container + Data> Partition<G, C> for StreamCore<G, C> {
fn partition<CB, D2, F>(&self, parts: u64, mut route: F) -> Vec<StreamCore<G, CB::Container>>
where
CB: ContainerBuilder,
CB::Container: SizableContainer + PushInto<D2> + Data,
CB: ContainerBuilder + PushInto<D2>,
CB::Container: Data,
F: FnMut(C::Item<'_>) -> (u64, D2) + 'static,
{
let mut builder = OperatorBuilder::new("Partition".to_owned(), self.scope());
Expand All @@ -48,7 +48,7 @@ impl<G: Scope, C: Container + Data> Partition<G, C> for StreamCore<G, C> {
let mut streams = Vec::with_capacity(parts as usize);

for _ in 0..parts {
let (output, stream) = builder.new_output();
let (output, stream) = builder.new_output::<CB>();
outputs.push(output);
streams.push(stream);
}
Expand All @@ -59,7 +59,7 @@ impl<G: Scope, C: Container + Data> Partition<G, C> for StreamCore<G, C> {
input.for_each(|time, data| {
let mut sessions = handles
.iter_mut()
.map(|h| h.session(&time))
.map(|h| h.session_with_builder(&time))
.collect::<Vec<_>>();

for datum in data.drain() {
Expand Down
Loading