Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 0 additions & 13 deletions container/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,19 +113,6 @@ pub trait ContainerBuilder: Default + 'static {
/// be called repeatedly until it returns `None`.
#[must_use]
fn finish(&mut self) -> Option<&mut Self::Container>;
/// Partitions `container` among `builders`, using the function `index` to direct items.
fn partition<I>(container: &mut Self::Container, builders: &mut [Self], mut index: I)
where
Self: for<'a> PushInto<<Self::Container as Container>::Item<'a>>,
I: for<'a> FnMut(&<Self::Container as Container>::Item<'a>) -> usize,
{
for datum in container.drain() {
let index = index(&datum);
builders[index].push_into(datum);
}
container.clear();
}

/// Indicates a good moment to release resources.
///
/// By default, does nothing. Callers first needs to drain the contents using [`Self::finish`]
Expand Down
17 changes: 11 additions & 6 deletions timely/src/dataflow/channels/pact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::{Container, container::{ContainerBuilder, LengthPreservingContainerBu
use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
use crate::communication::{Push, Pull};
use crate::dataflow::channels::pushers::Exchange as ExchangePusher;
use crate::dataflow::channels::pushers::exchange::DrainContainerDistributor;
use crate::dataflow::channels::Message;
use crate::logging::{TimelyLogger as Logger, MessagesEvent};
use crate::progress::Timestamp;
Expand Down Expand Up @@ -79,20 +80,24 @@ where
}

// Exchange uses a `Box<Pushable>` because it cannot know what type of pushable will return from the allocator.
impl<T: Timestamp, CB, H: 'static> ParallelizationContract<T, CB::Container> for ExchangeCore<CB, H>
impl<T: Timestamp, CB, H> ParallelizationContract<T, CB::Container> for ExchangeCore<CB, H>
where
CB: ContainerBuilder,
CB: for<'a> PushInto<<CB::Container as Container>::Item<'a>>,
CB: ContainerBuilder + for<'a> PushInto<<CB::Container as Container>::Item<'a>>,
CB::Container: Data + Send + crate::dataflow::channels::ContainerBytes,
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64 + 'static,
{
type Pusher = ExchangePusher<T, CB, LogPusher<Box<dyn Push<Message<T, CB::Container>>>>, H>;
type Pusher = ExchangePusher<
T,
LogPusher<Box<dyn Push<Message<T, CB::Container>>>>,
DrainContainerDistributor<CB, H>
>;
type Puller = LogPuller<Box<dyn Pull<Message<T, CB::Container>>>>;

fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: Rc<[usize]>, logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
let (senders, receiver) = allocator.allocate::<Message<T, CB::Container>>(identifier, address);
let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::<Vec<_>>();
(ExchangePusher::new(senders, self.hash_func), LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))
let distributor = DrainContainerDistributor::new(self.hash_func, allocator.peers());
(ExchangePusher::new(senders, distributor), LogPuller::new(receiver, allocator.index(), identifier, logging.clone()))
}
}

Expand Down
162 changes: 105 additions & 57 deletions timely/src/dataflow/channels/pushers/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,57 +3,114 @@
use crate::communication::Push;
use crate::container::{ContainerBuilder, PushInto};
use crate::dataflow::channels::Message;
use crate::{Container, Data};
use crate::Container;

// TODO : Software write combining
/// Distributes records among target pushees according to a distribution function.
pub struct Exchange<T, CB, P, H>
where
CB: ContainerBuilder,
P: Push<Message<T, CB::Container>>,
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
{
pushers: Vec<P>,
/// Distribute containers to several pushers.
///
/// A distributor sits behind an exchange pusher, and partitions containers at a given time
/// into several pushers. It can use [`Message::push_at`] to push its outputs at the desired
/// pusher.
///
/// It needs to uphold progress tracking requirements. The count of the input container
/// must be preserved across the output containers, from the first call to `partition` until the
/// call to `flush` for a specific time stamp.
pub trait Distributor<C> {
/// Partition the contents of `container` at `time` into the `pushers`.
fn partition<T: Clone, P: Push<Message<T, C>>>(&mut self, container: &mut C, time: &T, pushers: &mut [P]);
/// Flush any remaining contents into the `pushers` at time `time`.
fn flush<T: Clone, P: Push<Message<T, C>>>(&mut self, time: &T, pushers: &mut [P]);
/// Optionally release resources, such as memory.
fn relax(&mut self);
}

/// A distributor creating containers from a drainable container based
/// on a hash function of the container's item.
pub struct DrainContainerDistributor<CB, H> {
builders: Vec<CB>,
current: Option<T>,
hash_func: H,
}

impl<T: Clone, CB, P, H> Exchange<T, CB, P, H>
impl<CB: Default, H> DrainContainerDistributor<CB, H> {
/// Constructs a new `DrainContainerDistributor` with the given hash function for a number of
/// peers.
pub fn new(hash_func: H, peers: usize) -> Self {
Self {
builders: std::iter::repeat_with(Default::default).take(peers).collect(),
hash_func,
}
}
}

impl<CB, H> Distributor<CB::Container> for DrainContainerDistributor<CB, H>
where
CB: ContainerBuilder,
P: Push<Message<T, CB::Container>>,
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
CB: ContainerBuilder + for<'a> PushInto<<CB::Container as Container>::Item<'a>>,
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64,
{
/// Allocates a new `Exchange` from a supplied set of pushers and a distribution function.
pub fn new(pushers: Vec<P>, key: H) -> Exchange<T, CB, P, H> {
let builders = std::iter::repeat_with(Default::default).take(pushers.len()).collect();
Exchange {
pushers,
hash_func: key,
builders,
current: None,
fn partition<T: Clone, P: Push<Message<T, CB::Container>>>(&mut self, container: &mut CB::Container, time: &T, pushers: &mut [P]) {
debug_assert_eq!(self.builders.len(), pushers.len());
if pushers.len().is_power_of_two() {
let mask = (pushers.len() - 1) as u64;
for datum in container.drain() {
let index = ((self.hash_func)(&datum) & mask) as usize;
self.builders[index].push_into(datum);
while let Some(produced) = self.builders[index].extract() {
Message::push_at(produced, time.clone(), &mut pushers[index]);
}
}
}
else {
let num_pushers = pushers.len() as u64;
for datum in container.drain() {
let index = ((self.hash_func)(&datum) % num_pushers) as usize;
self.builders[index].push_into(datum);
while let Some(produced) = self.builders[index].extract() {
Message::push_at(produced, time.clone(), &mut pushers[index]);
}
}
}
}
#[inline]
fn flush(&mut self, index: usize) {
while let Some(container) = self.builders[index].finish() {
if let Some(ref time) = self.current {
Message::push_at(container, time.clone(), &mut self.pushers[index]);

fn flush<T: Clone, P: Push<Message<T, CB::Container>>>(&mut self, time: &T, pushers: &mut [P]) {
for (builder, pusher) in self.builders.iter_mut().zip(pushers.iter_mut()) {
while let Some(container) = builder.finish() {
Message::push_at(container, time.clone(), pusher);
}
}
}

fn relax(&mut self) {
for builder in &mut self.builders {
builder.relax();
}
}
}

impl<T: Eq+Data, CB, P, H> Push<Message<T, CB::Container>> for Exchange<T, CB, P, H>
// TODO : Software write combining
/// Distributes records among target pushees according to a distributor.
pub struct Exchange<T, P, D> {
pushers: Vec<P>,
current: Option<T>,
distributor: D,
}

impl<T: Clone, P, D> Exchange<T, P, D> {
/// Allocates a new `Exchange` from a supplied set of pushers and a distributor.
pub fn new(pushers: Vec<P>, distributor: D) -> Exchange<T, P, D> {
Exchange {
pushers,
current: None,
distributor,
}
}
}

impl<T: Eq+Clone, C, P, D> Push<Message<T, C>> for Exchange<T, P, D>
where
CB: ContainerBuilder,
CB: for<'a> PushInto<<CB::Container as Container>::Item<'a>>,
P: Push<Message<T, CB::Container>>,
for<'a> H: FnMut(&<CB::Container as Container>::Item<'a>) -> u64
P: Push<Message<T, C>>,
D: Distributor<C>,
{
#[inline(never)]
fn push(&mut self, message: &mut Option<Message<T, CB::Container>>) {
fn push(&mut self, message: &mut Option<Message<T, C>>) {
// if only one pusher, no exchange
if self.pushers.len() == 1 {
self.pushers[0].push(message);
Expand All @@ -64,36 +121,27 @@ where
let data = &mut message.data;

// if the time isn't right, flush everything.
if self.current.as_ref().is_some_and(|x| x != time) {
for index in 0..self.pushers.len() {
self.flush(index);
match self.current.as_ref() {
// We have a current time, and it is different from the new time.
Some(current_time) if current_time != time => {
self.distributor.flush(current_time, &mut self.pushers);
self.current = Some(time.clone());
}
// We had no time before, or flushed.
None => self.current = Some(time.clone()),
// Time didn't change since last call.
_ => {}
}
self.current = Some(time.clone());

let hash_func = &mut self.hash_func;

// if the number of pushers is a power of two, use a mask
if self.pushers.len().is_power_of_two() {
let mask = (self.pushers.len() - 1) as u64;
CB::partition(data, &mut self.builders, |datum| ((hash_func)(datum) & mask) as usize);
}
// as a last resort, use mod (%)
else {
let num_pushers = self.pushers.len() as u64;
CB::partition(data, &mut self.builders, |datum| ((hash_func)(datum) % num_pushers) as usize);
}
for (buffer, pusher) in self.builders.iter_mut().zip(self.pushers.iter_mut()) {
while let Some(container) = buffer.extract() {
Message::push_at(container, time.clone(), pusher);
}
}
self.distributor.partition(data, time, &mut self.pushers);
}
else {
// flush
if let Some(time) = self.current.take() {
self.distributor.flush(&time, &mut self.pushers);
}
self.distributor.relax();
for index in 0..self.pushers.len() {
self.flush(index);
self.builders[index].relax();
self.pushers[index].push(&mut None);
}
}
Expand Down
Loading