diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index 96aaed605..4362a4113 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -7,18 +7,14 @@ //! The only requirement of a pact is that it not alter the number of `D` records at each time `T`. //! The progress tracking logic assumes that this number is independent of the pact used. -use std::{fmt::{self, Debug}, marker::PhantomData}; +use std::fmt::Debug; use std::rc::Rc; use crate::Accountable; -use crate::container::{ContainerBuilder, DrainContainer, LengthPreservingContainerBuilder, SizableContainer, CapacityContainerBuilder, PushInto}; use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller}; use crate::communication::{Push, Pull}; -use crate::dataflow::channels::pushers::Exchange as ExchangePusher; -use crate::dataflow::channels::pushers::exchange::DrainContainerDistributor; use crate::dataflow::channels::Message; -use crate::logging::{TimelyLogger as Logger, MessagesEvent}; -use crate::progress::Timestamp; +use crate::logging::TimelyLogger as Logger; use crate::worker::AsWorker; /// A `ParallelizationContract` allocates paired `Push` and `Pull` implementors. @@ -45,162 +41,234 @@ impl ParallelizationContract for Pip } } -/// An exchange between multiple observers by data -pub struct ExchangeCore { hash_func: F, phantom: PhantomData } - -/// [ExchangeCore] specialized to vector-based containers. -pub type Exchange = ExchangeCore>, F>; - -impl ExchangeCore -where - CB: LengthPreservingContainerBuilder, - CB::Container: DrainContainer, - for<'a> F: FnMut(&::Item<'a>)->u64 -{ - /// Allocates a new `Exchange` pact from a distribution function. - pub fn new_core(func: F) -> ExchangeCore { - ExchangeCore { - hash_func: func, - phantom: PhantomData, +pub use exchange::{ExchangeCore, Exchange}; +mod exchange { + + use std::{fmt::{self, Debug}, marker::PhantomData}; + use std::rc::Rc; + + use crate::container::{ContainerBuilder, DrainContainer, LengthPreservingContainerBuilder, SizableContainer, CapacityContainerBuilder, PushInto}; + use crate::communication::{Push, Pull}; + use crate::dataflow::channels::pushers::Exchange as ExchangePusher; + use crate::dataflow::channels::pushers::exchange::DrainContainerDistributor; + use crate::dataflow::channels::Message; + use crate::logging::TimelyLogger as Logger; + use crate::progress::Timestamp; + use crate::worker::AsWorker; + + use super::{ParallelizationContract, LogPusher, LogPuller}; + + /// An exchange between multiple observers by data + pub struct ExchangeCore { hash_func: F, phantom: PhantomData } + + /// [ExchangeCore] specialized to vector-based containers. + pub type Exchange = ExchangeCore>, F>; + + impl ExchangeCore + where + CB: LengthPreservingContainerBuilder, + CB::Container: DrainContainer, + for<'a> F: FnMut(&::Item<'a>)->u64 + { + /// Allocates a new `Exchange` pact from a distribution function. + pub fn new_core(func: F) -> ExchangeCore { + ExchangeCore { + hash_func: func, + phantom: PhantomData, + } } } -} -impl ExchangeCore, F> -where - C: SizableContainer + DrainContainer, - for<'a> F: FnMut(&C::Item<'a>)->u64 -{ - /// Allocates a new `Exchange` pact from a distribution function. - pub fn new(func: F) -> ExchangeCore, F> { - ExchangeCore { - hash_func: func, - phantom: PhantomData, + impl ExchangeCore, F> + where + C: SizableContainer + DrainContainer, + for<'a> F: FnMut(&C::Item<'a>)->u64 + { + /// Allocates a new `Exchange` pact from a distribution function. + pub fn new(func: F) -> ExchangeCore, F> { + ExchangeCore { + hash_func: func, + phantom: PhantomData, + } } } -} -// Exchange uses a `Box` because it cannot know what type of pushable will return from the allocator. -impl ParallelizationContract for ExchangeCore -where - CB: ContainerBuilder + for<'a> PushInto<::Item<'a>>, - CB::Container: Send + crate::dataflow::channels::ContainerBytes, - for<'a> H: FnMut(&::Item<'a>) -> u64 + 'static, -{ - type Pusher = ExchangePusher< - T, - LogPusher>>>, - DrainContainerDistributor - >; - type Puller = LogPuller>>>; + // Exchange uses a `Box` because it cannot know what type of pushable will return from the allocator. + impl ParallelizationContract for ExchangeCore + where + CB: ContainerBuilder + for<'a> PushInto<::Item<'a>>, + CB::Container: Send + crate::dataflow::channels::ContainerBytes, + for<'a> H: FnMut(&::Item<'a>) -> u64 + 'static, + { + type Pusher = ExchangePusher< + T, + LogPusher>>>, + DrainContainerDistributor + >; + type Puller = LogPuller>>>; - fn connect(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option) -> (Self::Pusher, Self::Puller) { - let (senders, receiver) = allocator.allocate::>(identifier, address); - let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::>(); - let distributor = DrainContainerDistributor::new(self.hash_func, allocator.peers()); - (ExchangePusher::new(senders, distributor), LogPuller::new(receiver, allocator.index(), identifier, logging.clone())) + fn connect(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option) -> (Self::Pusher, Self::Puller) { + let (senders, receiver) = allocator.allocate::>(identifier, address); + let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::>(); + let distributor = DrainContainerDistributor::new(self.hash_func, allocator.peers()); + (ExchangePusher::new(senders, distributor), LogPuller::new(receiver, allocator.index(), identifier, logging.clone())) + } } -} -impl Debug for ExchangeCore { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("Exchange").finish() + impl Debug for ExchangeCore { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Exchange").finish() + } } } -/// Wraps a `Message` pusher to provide a `Push<(T, Content)>`. -#[derive(Debug)] -pub struct LogPusher

{ - pusher: P, - channel: usize, - counter: usize, - source: usize, - target: usize, - logging: Option, -} +pub use distributor::DistributorPact; +/// Parallelization contract based on a `Distributor` implementation. +mod distributor { + + use std::rc::Rc; + + use crate::Accountable; + use crate::communication::{Push, Pull}; + use crate::dataflow::channels::pushers::{Exchange, exchange::Distributor}; + use crate::dataflow::channels::{ContainerBytes, Message}; + use crate::logging::TimelyLogger; + use crate::progress::Timestamp; + use crate::worker::AsWorker; + + use super::{ParallelizationContract, LogPusher, LogPuller}; + + /// Intended to wrap a function from a `usize` to an `impl Distributor`. + /// + /// For a `D: Distributor` and an appropriate builder `D::new(peers: usize)`, + /// a `DistributorPact(|peers| D::new(peers))` acts as a pact that will use `D` + /// and distribute containers of type `C`. + pub struct DistributorPact(pub B); -impl

LogPusher

{ - /// Allocates a new pusher. - pub fn new(pusher: P, source: usize, target: usize, channel: usize, logging: Option) -> Self { - LogPusher { - pusher, - channel, - counter: 0, - source, - target, - logging, + impl ParallelizationContract for DistributorPact + where + T: Timestamp, + B: FnOnce(usize) -> D, + C: Accountable + ContainerBytes + Send + 'static, + D: Distributor + 'static, + { + type Pusher = Exchange< + T, + LogPusher>>>, + D + >; + type Puller = LogPuller>>>; + fn connect(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option) -> (Self::Pusher, Self::Puller) { + let (senders, receiver) = allocator.allocate::>(identifier, address); + let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::>(); + let distributor = (self.0)(allocator.peers()); + (Exchange::new(senders, distributor), LogPuller::new(receiver, allocator.index(), identifier, logging.clone())) } } } -impl>> Push> for LogPusher

{ - #[inline] - fn push(&mut self, pair: &mut Option>) { - if let Some(bundle) = pair { - self.counter += 1; - - // Stamp the sequence number and source. - // FIXME: Awkward moment/logic. - bundle.seq = self.counter - 1; - bundle.from = self.source; - - if let Some(logger) = self.logging.as_ref() { - logger.log(MessagesEvent { - is_send: true, - channel: self.channel, - source: self.source, - target: self.target, - seq_no: self.counter - 1, - record_count: bundle.data.record_count(), - }) +pub use push_pull::{LogPusher, LogPuller}; +mod push_pull { + + use crate::Accountable; + use crate::communication::{Push, Pull}; + use crate::dataflow::channels::Message; + use crate::logging::{TimelyLogger as Logger, MessagesEvent}; + + /// Wraps a `Message` pusher to provide a `Push<(T, Content)>`. + #[derive(Debug)] + pub struct LogPusher

{ + pusher: P, + channel: usize, + counter: usize, + source: usize, + target: usize, + logging: Option, + } + + impl

LogPusher

{ + /// Allocates a new pusher. + pub fn new(pusher: P, source: usize, target: usize, channel: usize, logging: Option) -> Self { + LogPusher { + pusher, + channel, + counter: 0, + source, + target, + logging, } } - - self.pusher.push(pair); } -} -/// Wraps a `Message` puller to provide a `Pull<(T, Content)>`. -#[derive(Debug)] -pub struct LogPuller

{ - puller: P, - channel: usize, - index: usize, - logging: Option, -} + impl>> Push> for LogPusher

{ + #[inline] + fn push(&mut self, pair: &mut Option>) { + if let Some(bundle) = pair { + self.counter += 1; + + // Stamp the sequence number and source. + // FIXME: Awkward moment/logic. + bundle.seq = self.counter - 1; + bundle.from = self.source; + + if let Some(logger) = self.logging.as_ref() { + logger.log(MessagesEvent { + is_send: true, + channel: self.channel, + source: self.source, + target: self.target, + seq_no: self.counter - 1, + record_count: bundle.data.record_count(), + }) + } + } -impl

LogPuller

{ - /// Allocates a new `Puller`. - pub fn new(puller: P, index: usize, channel: usize, logging: Option) -> Self { - LogPuller { - puller, - channel, - index, - logging, + self.pusher.push(pair); } } -} -impl>> Pull> for LogPuller

{ - #[inline] - fn pull(&mut self) -> &mut Option> { - let result = self.puller.pull(); - if let Some(bundle) = result { - let channel = self.channel; - let target = self.index; - - if let Some(logger) = self.logging.as_ref() { - logger.log(MessagesEvent { - is_send: false, - channel, - source: bundle.from, - target, - seq_no: bundle.seq, - record_count: bundle.data.record_count(), - }); + /// Wraps a `Message` puller to provide a `Pull<(T, Content)>`. + #[derive(Debug)] + pub struct LogPuller

{ + puller: P, + channel: usize, + index: usize, + logging: Option, + } + + impl

LogPuller

{ + /// Allocates a new `Puller`. + pub fn new(puller: P, index: usize, channel: usize, logging: Option) -> Self { + LogPuller { + puller, + channel, + index, + logging, } } + } - result + impl>> Pull> for LogPuller

{ + #[inline] + fn pull(&mut self) -> &mut Option> { + let result = self.puller.pull(); + if let Some(bundle) = result { + let channel = self.channel; + let target = self.index; + + if let Some(logger) = self.logging.as_ref() { + logger.log(MessagesEvent { + is_send: false, + channel, + source: bundle.from, + target, + seq_no: bundle.seq, + record_count: bundle.data.record_count(), + }); + } + } + + result + } } -} +} \ No newline at end of file diff --git a/timely/src/dataflow/channels/pushers/exchange.rs b/timely/src/dataflow/channels/pushers/exchange.rs index a9ea9b4fa..81e168c32 100644 --- a/timely/src/dataflow/channels/pushers/exchange.rs +++ b/timely/src/dataflow/channels/pushers/exchange.rs @@ -19,7 +19,7 @@ pub trait Distributor { /// Flush any remaining contents into the `pushers` at time `time`. fn flush>>(&mut self, time: &T, pushers: &mut [P]); /// Optionally release resources, such as memory. - fn relax(&mut self); + fn relax(&mut self) { } } /// A distributor creating containers from a drainable container based