diff --git a/timely/Cargo.toml b/timely/Cargo.toml index 0b550e53c..01f0fc326 100644 --- a/timely/Cargo.toml +++ b/timely/Cargo.toml @@ -21,6 +21,7 @@ bincode= ["timely_communication/bincode"] getopts = ["getopts-dep", "timely_communication/getopts"] [dependencies] +fnv="1.0.2" getopts-dep = { package = "getopts", version = "0.2.14", optional = true } serde = "1.0" serde_derive = "1.0" diff --git a/timely/src/dataflow/channels/pact.rs b/timely/src/dataflow/channels/pact.rs index 976023a19..3cfd37601 100644 --- a/timely/src/dataflow/channels/pact.rs +++ b/timely/src/dataflow/channels/pact.rs @@ -8,6 +8,7 @@ //! The progress tracking logic assumes that this number is independent of the pact used. use std::{fmt::{self, Debug}, marker::PhantomData}; +use std::hash::{Hash, Hasher}; use timely_container::PushPartitioned; use crate::communication::{Push, Pull, Data}; @@ -52,6 +53,46 @@ impl ParallelizationContractCore for Pipeline { } } +/// A connection that dynamically distributes records to all workers +#[derive(Debug)] +pub struct Distribute; + +impl ParallelizationContractCore for Distribute { + type Pusher = DistributePusher>>>>; + type Puller = LogPuller>>>; + + fn connect(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option) -> (Self::Pusher, Self::Puller) { + let (senders, receiver) = allocator.allocate::>(identifier, address); + let senders = senders.into_iter().enumerate().map(|(i,x)| LogPusher::new(x, allocator.index(), i, identifier, logging.clone())).collect::>(); + (DistributePusher::new(senders), LogPuller::new(receiver, allocator.index(), identifier, logging.clone())) + } +} + +/// Distributes records among target pushees. +/// +/// It is more efficient than `Exchange` when the target worker doesn't matter +pub struct DistributePusher

{ + pushers: Vec

, +} + +impl

DistributePusher

{ + /// Allocates a new `DistributePusher` from a supplied set of pushers + pub fn new(pushers: Vec

) -> DistributePusher

{ + DistributePusher { + pushers, + } + } +} + +impl>> Push> for DistributePusher

{ + fn push(&mut self, message: &mut Option>) { + let mut state: fnv::FnvHasher = Default::default(); + std::time::Instant::now().hash(&mut state); + let worker_idx = (state.finish() as usize) % self.pushers.len(); + self.pushers[worker_idx].push(message); + } +} + /// An exchange between multiple observers by data pub struct ExchangeCore { hash_func: F, phantom: PhantomData<(C, D)> }