diff --git a/container/src/lib.rs b/container/src/lib.rs index 839f80167..2ff6589fe 100644 --- a/container/src/lib.rs +++ b/container/src/lib.rs @@ -113,19 +113,6 @@ pub trait ContainerBuilder: Default + 'static { /// be called repeatedly until it returns `None`. #[must_use] fn finish(&mut self) -> Option<&mut Self::Container>; - /// Partitions `container` among `builders`, using the function `index` to direct items. - fn partition(container: &mut Self::Container, builders: &mut [Self], mut index: I) - where - Self: for<'a> PushInto<::Item<'a>>, - I: for<'a> FnMut(&::Item<'a>) -> usize, - { - for datum in container.drain() { - let index = index(&datum); - builders[index].push_into(datum); - } - container.clear(); - } - /// Indicates a good moment to release resources. /// /// By default, does nothing. Callers first needs to drain the contents using [`Self::finish`] diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index fb2f95e7d..bb99cd277 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -14,6 +14,7 @@ use crate::{Container, container::{ContainerBuilder, LengthPreservingContainerBu use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller}; use crate::communication::{Push, Pull}; use crate::dataflow::channels::pushers::Exchange as ExchangePusher; +use crate::dataflow::channels::pushers::exchange::DrainContainerDistributor; use crate::dataflow::channels::Message; use crate::logging::{TimelyLogger as Logger, MessagesEvent}; use crate::progress::Timestamp; @@ -79,20 +80,24 @@ where } // Exchange uses a `Box` because it cannot know what type of pushable will return from the allocator. -impl ParallelizationContract for ExchangeCore +impl ParallelizationContract for ExchangeCore where - CB: ContainerBuilder, - CB: for<'a> PushInto<::Item<'a>>, + CB: ContainerBuilder + for<'a> PushInto<::Item<'a>>, CB::Container: Data + Send + crate::dataflow::channels::ContainerBytes, - for<'a> H: FnMut(&::Item<'a>) -> u64 + for<'a> H: FnMut(&::Item<'a>) -> u64 + 'static, { - type Pusher = ExchangePusher>>>, H>; + type Pusher = ExchangePusher< + T, + LogPusher>>>, + DrainContainerDistributor + >; type Puller = LogPuller>>>; fn connect(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option) -> (Self::Pusher, Self::Puller) { let (senders, receiver) = allocator.allocate::>(identifier, address); let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::>(); - (ExchangePusher::new(senders, self.hash_func), LogPuller::new(receiver, allocator.index(), identifier, logging.clone())) + let distributor = DrainContainerDistributor::new(self.hash_func, allocator.peers()); + (ExchangePusher::new(senders, distributor), LogPuller::new(receiver, allocator.index(), identifier, logging.clone())) } } diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index 9b7baeece..e72085b4f 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -3,57 +3,114 @@ use crate::communication::Push; use crate::container::{ContainerBuilder, PushInto}; use crate::dataflow::channels::Message; -use crate::{Container, Data}; +use crate::Container; -// TODO : Software write combining -/// Distributes records among target pushees according to a distribution function. -pub struct Exchange -where - CB: ContainerBuilder, - P: Push>, - for<'a> H: FnMut(&::Item<'a>) -> u64 -{ - pushers: Vec

, +/// Distribute containers to several pushers. +/// +/// A distributor sits behind an exchange pusher, and partitions containers at a given time +/// into several pushers. It can use [`Message::push_at`] to push its outputs at the desired +/// pusher. +/// +/// It needs to uphold progress tracking requirements. The count of the input container +/// must be preserved across the output containers, from the first call to `partition` until the +/// call to `flush` for a specific time stamp. +pub trait Distributor { + /// Partition the contents of `container` at `time` into the `pushers`. + fn partition>>(&mut self, container: &mut C, time: &T, pushers: &mut [P]); + /// Flush any remaining contents into the `pushers` at time `time`. + fn flush>>(&mut self, time: &T, pushers: &mut [P]); + /// Optionally release resources, such as memory. + fn relax(&mut self); +} + +/// A distributor creating containers from a drainable container based +/// on a hash function of the container's item. +pub struct DrainContainerDistributor { builders: Vec, - current: Option, hash_func: H, } -impl Exchange +impl DrainContainerDistributor { + /// Constructs a new `DrainContainerDistributor` with the given hash function for a number of + /// peers. + pub fn new(hash_func: H, peers: usize) -> Self { + Self { + builders: std::iter::repeat_with(Default::default).take(peers).collect(), + hash_func, + } + } +} + +impl Distributor for DrainContainerDistributor where - CB: ContainerBuilder, - P: Push>, - for<'a> H: FnMut(&::Item<'a>) -> u64 + CB: ContainerBuilder + for<'a> PushInto<::Item<'a>>, + for<'a> H: FnMut(&::Item<'a>) -> u64, { - /// Allocates a new `Exchange` from a supplied set of pushers and a distribution function. - pub fn new(pushers: Vec

, key: H) -> Exchange { - let builders = std::iter::repeat_with(Default::default).take(pushers.len()).collect(); - Exchange { - pushers, - hash_func: key, - builders, - current: None, + fn partition>>(&mut self, container: &mut CB::Container, time: &T, pushers: &mut [P]) { + debug_assert_eq!(self.builders.len(), pushers.len()); + if pushers.len().is_power_of_two() { + let mask = (pushers.len() - 1) as u64; + for datum in container.drain() { + let index = ((self.hash_func)(&datum) & mask) as usize; + self.builders[index].push_into(datum); + while let Some(produced) = self.builders[index].extract() { + Message::push_at(produced, time.clone(), &mut pushers[index]); + } + } + } + else { + let num_pushers = pushers.len() as u64; + for datum in container.drain() { + let index = ((self.hash_func)(&datum) % num_pushers) as usize; + self.builders[index].push_into(datum); + while let Some(produced) = self.builders[index].extract() { + Message::push_at(produced, time.clone(), &mut pushers[index]); + } + } } } - #[inline] - fn flush(&mut self, index: usize) { - while let Some(container) = self.builders[index].finish() { - if let Some(ref time) = self.current { - Message::push_at(container, time.clone(), &mut self.pushers[index]); + + fn flush>>(&mut self, time: &T, pushers: &mut [P]) { + for (builder, pusher) in self.builders.iter_mut().zip(pushers.iter_mut()) { + while let Some(container) = builder.finish() { + Message::push_at(container, time.clone(), pusher); } } } + + fn relax(&mut self) { + for builder in &mut self.builders { + builder.relax(); + } + } } -impl Push> for Exchange +// TODO : Software write combining +/// Distributes records among target pushees according to a distributor. +pub struct Exchange { + pushers: Vec

, + current: Option, + distributor: D, +} + +impl Exchange { + /// Allocates a new `Exchange` from a supplied set of pushers and a distributor. + pub fn new(pushers: Vec

, distributor: D) -> Exchange { + Exchange { + pushers, + current: None, + distributor, + } + } +} + +impl Push> for Exchange where - CB: ContainerBuilder, - CB: for<'a> PushInto<::Item<'a>>, - P: Push>, - for<'a> H: FnMut(&::Item<'a>) -> u64 + P: Push>, + D: Distributor, { #[inline(never)] - fn push(&mut self, message: &mut Option>) { + fn push(&mut self, message: &mut Option>) { // if only one pusher, no exchange if self.pushers.len() == 1 { self.pushers[0].push(message); @@ -64,36 +121,27 @@ where let data = &mut message.data; // if the time isn't right, flush everything. - if self.current.as_ref().is_some_and(|x| x != time) { - for index in 0..self.pushers.len() { - self.flush(index); + match self.current.as_ref() { + // We have a current time, and it is different from the new time. + Some(current_time) if current_time != time => { + self.distributor.flush(current_time, &mut self.pushers); + self.current = Some(time.clone()); } + // We had no time before, or flushed. + None => self.current = Some(time.clone()), + // Time didn't change since last call. + _ => {} } - self.current = Some(time.clone()); - - let hash_func = &mut self.hash_func; - // if the number of pushers is a power of two, use a mask - if self.pushers.len().is_power_of_two() { - let mask = (self.pushers.len() - 1) as u64; - CB::partition(data, &mut self.builders, |datum| ((hash_func)(datum) & mask) as usize); - } - // as a last resort, use mod (%) - else { - let num_pushers = self.pushers.len() as u64; - CB::partition(data, &mut self.builders, |datum| ((hash_func)(datum) % num_pushers) as usize); - } - for (buffer, pusher) in self.builders.iter_mut().zip(self.pushers.iter_mut()) { - while let Some(container) = buffer.extract() { - Message::push_at(container, time.clone(), pusher); - } - } + self.distributor.partition(data, time, &mut self.pushers); } else { // flush + if let Some(time) = self.current.take() { + self.distributor.flush(&time, &mut self.pushers); + } + self.distributor.relax(); for index in 0..self.pushers.len() { - self.flush(index); - self.builders[index].relax(); self.pushers[index].push(&mut None); } }