Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 10 additions & 52 deletions timely/src/dataflow/channels/pact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,22 +44,14 @@ impl<T: 'static, C: Accountable + 'static> ParallelizationContract<T, C> for Pip
pub use exchange::{ExchangeCore, Exchange};
mod exchange {

use std::{fmt::{self, Debug}, marker::PhantomData};
use std::rc::Rc;

use crate::container::{ContainerBuilder, DrainContainer, LengthPreservingContainerBuilder, SizableContainer, CapacityContainerBuilder, PushInto};
use crate::communication::{Push, Pull};
use crate::dataflow::channels::pushers::Exchange as ExchangePusher;
use crate::Container;
use crate::container::{DrainContainer, LengthPreservingContainerBuilder, SizableContainer, CapacityContainerBuilder};
use crate::dataflow::channels::pushers::exchange::DrainContainerDistributor;
use crate::dataflow::channels::Message;
use crate::logging::TimelyLogger as Logger;
use crate::progress::Timestamp;
use crate::worker::AsWorker;

use super::{ParallelizationContract, LogPusher, LogPuller};
use super::DistributorPact;

/// An exchange between multiple observers by data
pub struct ExchangeCore<CB, F> { hash_func: F, phantom: PhantomData<CB> }
pub type ExchangeCore<CB, F> = DistributorPact<Box<dyn FnOnce(usize) -> DrainContainerDistributor<CB, F>>>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A thing I'm uncertain about, but maybe it's fine, is that when I did pub type Foo = DistributorPact; I didn't have the ability to use Foo as a constructor, like let x = Foo(bar);. I'm not entirely certain when type aliases can be used and cannot, and we might find out that this alias results in making it harder to use ExchangeCore if you need to invoke DistributorPact instead. Maybe it's fine though, and I'm inventing problems that don't exist.


/// [ExchangeCore] specialized to vector-based containers.
pub type Exchange<D, F> = ExchangeCore<CapacityContainerBuilder<Vec<D>>, F>;
Expand All @@ -68,56 +60,22 @@ mod exchange {
where
CB: LengthPreservingContainerBuilder,
CB::Container: DrainContainer,
for<'a> F: FnMut(&<CB::Container as DrainContainer>::Item<'a>)->u64
for<'a> F: FnMut(&<CB::Container as DrainContainer>::Item<'a>)->u64 + 'static
{
/// Allocates a new `Exchange` pact from a distribution function.
pub fn new_core(func: F) -> ExchangeCore<CB, F> {
ExchangeCore {
hash_func: func,
phantom: PhantomData,
}
DistributorPact(Box::new(move |peers| DrainContainerDistributor::new(func, peers)))
}
}

impl<C, F> ExchangeCore<CapacityContainerBuilder<C>, F>
where
C: SizableContainer + DrainContainer,
for<'a> F: FnMut(&C::Item<'a>)->u64
C: Container + SizableContainer + DrainContainer,
for<'a> F: FnMut(&C::Item<'a>)->u64 + 'static
{
/// Allocates a new `Exchange` pact from a distribution function.
pub fn new(func: F) -> ExchangeCore<CapacityContainerBuilder<C>, F> {
ExchangeCore {
hash_func: func,
phantom: PhantomData,
}
}
}

// Exchange uses a `Box<Pushable>` because it cannot know what type of pushable will return from the allocator.
impl<T: Timestamp, CB, H> ParallelizationContract<T, CB::Container> for ExchangeCore<CB, H>
where
CB: ContainerBuilder<Container: DrainContainer> + for<'a> PushInto<<CB::Container as DrainContainer>::Item<'a>>,
CB::Container: Send + crate::dataflow::channels::ContainerBytes,
for<'a> H: FnMut(&<CB::Container as DrainContainer>::Item<'a>) -> u64 + 'static,
{
type Pusher = ExchangePusher<
T,
LogPusher<Box<dyn Push<Message<T, CB::Container>>>>,
DrainContainerDistributor<CB, H>
>;
type Puller = LogPuller<Box<dyn Pull<Message<T, CB::Container>>>>;

fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
let (senders, receiver) = allocator.allocate::<Message<T, CB::Container>>(identifier, address);
let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
let distributor = DrainContainerDistributor::new(self.hash_func, allocator.peers());
(ExchangePusher::new(senders, distributor), LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))
}
}

impl<C, F> Debug for ExchangeCore<C, F> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Exchange").finish()
DistributorPact(Box::new(move |peers| DrainContainerDistributor::new(func, peers)))
}
}
}
Expand Down Expand Up @@ -271,4 +229,4 @@ mod push_pull {
result
}
}
}
}
Loading