diff --git a/communication/src/allocator/mod.rs b/communication/src/allocator/mod.rs index 0b04c348f..a30aac203 100644 --- a/communication/src/allocator/mod.rs +++ b/communication/src/allocator/mod.rs @@ -96,4 +96,32 @@ pub trait Allocate { { thread::Thread::new_from(identifier, self.events().clone()) } + + /// Allocates a broadcast channel, where each pushed message is received by all. + fn broadcast(&mut self, identifier: usize) -> (Box>, Box>) { + let (pushers, pull) = self.allocate(identifier); + (Box::new(Broadcaster { spare: None, pushers }), pull) + } +} + +/// An adapter to broadcast any pushed element. +struct Broadcaster { + /// Spare element for defensive copies. + spare: Option, + /// Destinations to which pushed elements should be broadcast. + pushers: Vec>>, +} + +impl Push for Broadcaster { + fn push(&mut self, element: &mut Option) { + // Push defensive copies to pushers after the first. + for pusher in self.pushers.iter_mut().skip(1) { + self.spare.clone_from(element); + pusher.push(&mut self.spare); + } + // Push the element itself at the first pusher. + for pusher in self.pushers.iter_mut().take(1) { + pusher.push(element); + } + } } diff --git a/timely/src/dataflow/scopes/child.rs b/timely/src/dataflow/scopes/child.rs index 751545a31..19dddd7e6 100644 --- a/timely/src/dataflow/scopes/child.rs +++ b/timely/src/dataflow/scopes/child.rs @@ -64,6 +64,9 @@ where fn pipeline(&mut self, identifier: usize, address: Rc<[usize]>) -> (ThreadPusher, ThreadPuller) { self.parent.pipeline(identifier, address) } + fn broadcast(&mut self, identifier: usize, address: Rc<[usize]>) -> (Box>, Box>) { + self.parent.broadcast(identifier, address) + } fn new_identifier(&mut self) -> usize { self.parent.new_identifier() } diff --git a/timely/src/lib.rs b/timely/src/lib.rs index 742b10c7c..1cb35b8ab 100644 --- a/timely/src/lib.rs +++ b/timely/src/lib.rs @@ -130,6 +130,7 @@ mod encoding { implDeserialize<'a>> Data for T { } /// A wrapper that indicates `bincode` as the serialization/deserialization strategy. + #[derive(Clone)] pub struct Bincode { /// Bincode contents. pub payload: T, diff --git a/timely/src/progress/broadcast.rs b/timely/src/progress/broadcast.rs index 81d129403..3c5a7f00f 100644 --- a/timely/src/progress/broadcast.rs +++ b/timely/src/progress/broadcast.rs @@ -8,16 +8,15 @@ use crate::logging::TimelyLogger as Logger; use crate::logging::TimelyProgressLogger as ProgressLogger; use crate::Bincode; -/// A list of progress updates corresponding to `((child_scope, [in/out]_port, timestamp), delta)` -pub type ProgressVec = Vec<((Location, T), i64)>; /// A progress update message consisting of source worker id, sequence number and lists of /// message and internal updates -pub type ProgressMsg = Bincode<(usize, usize, ProgressVec)>; +pub type ProgressMsg = Bincode<(usize, usize, ChangeBatch<(Location, T)>)>; /// Manages broadcasting of progress updates to and receiving updates from workers. pub struct Progcaster { - to_push: Option>, - pushers: Vec>>>, + /// Pusher into which we send progress updates. + pusher: Box>>, + /// Puller from which we recv progress updates. puller: Box>>, /// Source worker index source: usize, @@ -27,7 +26,7 @@ pub struct Progcaster { identifier: usize, /// Communication channel identifier channel_identifier: usize, - + /// An optional logger to record progress messages. progress_logging: Option>, } @@ -36,15 +35,14 @@ impl Progcaster { pub fn new(worker: &mut A, addr: Rc<[usize]>, identifier: usize, mut logging: Option, progress_logging: Option>) -> Progcaster { let channel_identifier = worker.new_identifier(); - let (pushers, puller) = worker.allocate(channel_identifier, addr); + let (pusher, puller) = worker.broadcast(channel_identifier, addr); logging.as_mut().map(|l| l.log(crate::logging::CommChannelsEvent { identifier: channel_identifier, kind: crate::logging::CommChannelKind::Progress, })); let worker_index = worker.index(); Progcaster { - to_push: None, - pushers, + pusher, puller, source: worker_index, counter: 0, @@ -90,31 +88,17 @@ impl Progcaster { }); }); - for pusher in self.pushers.iter_mut() { - - // Attempt to reuse allocations, if possible. - if let Some(tuple) = &mut self.to_push { - tuple.payload.0 = self.source; - tuple.payload.1 = self.counter; - tuple.payload.2.clear(); - tuple.payload.2.extend(changes.iter().cloned()); - } - // If we don't have an allocation ... - if self.to_push.is_none() { - self.to_push = Some(Bincode::from(( - self.source, - self.counter, - changes.clone().into_inner().into_vec(), - ))); - } + let payload = (self.source, self.counter, std::mem::take(changes)); + let mut to_push = Some(Bincode { payload }); + self.pusher.push(&mut to_push); + self.pusher.done(); - // TODO: This should probably use a broadcast channel. - pusher.push(&mut self.to_push); - pusher.done(); + if let Some(pushed) = to_push { + *changes = pushed.payload.2; + changes.clear(); } self.counter += 1; - changes.clear(); } } @@ -125,7 +109,7 @@ impl Progcaster { let source = message.0; let counter = message.1; - let recv_changes = &message.2; + let recv_changes = &mut message.2; let channel = self.channel_identifier; diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 439cad7e6..31fbdeef5 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -193,6 +193,9 @@ pub trait AsWorker : Scheduler { /// that this behavior will be overridden to be more efficient. fn pipeline(&mut self, identifier: usize, address: Rc<[usize]>) -> (ThreadPusher, ThreadPuller); + /// Allocates a broadcast channel, where each pushed message is received by all. + fn broadcast(&mut self, identifier: usize, address: Rc<[usize]>) -> (Box>, Box>); + /// Allocates a new worker-unique identifier. fn new_identifier(&mut self) -> usize; /// The next worker-unique identifier to be allocated. @@ -242,6 +245,13 @@ impl AsWorker for Worker { self.temp_channel_ids.borrow_mut().push(identifier); self.allocator.borrow_mut().pipeline(identifier) } + fn broadcast(&mut self, identifier: usize, address: Rc<[usize]>) -> (Box>, Box>) { + if address.is_empty() { panic!("Unacceptable address: Length zero"); } + let mut paths = self.paths.borrow_mut(); + paths.insert(identifier, address); + self.temp_channel_ids.borrow_mut().push(identifier); + self.allocator.borrow_mut().broadcast(identifier) + } fn new_identifier(&mut self) -> usize { self.new_identifier() } fn peek_identifier(&self) -> usize { self.peek_identifier() }