Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions communication/src/allocator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,32 @@ pub trait Allocate {
{
thread::Thread::new_from(identifier, self.events().clone())
}

/// Allocates a broadcast channel, where each pushed message is received by all.
fn broadcast<T: Exchangeable + Clone>(&mut self, identifier: usize) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>) {
let (pushers, pull) = self.allocate(identifier);
(Box::new(Broadcaster { spare: None, pushers }), pull)
}
}

/// An adapter to broadcast any pushed element.
struct Broadcaster<T> {
/// Spare element for defensive copies.
spare: Option<T>,
/// Destinations to which pushed elements should be broadcast.
pushers: Vec<Box<dyn Push<T>>>,
}

impl<T: Clone> Push<T> for Broadcaster<T> {
fn push(&mut self, element: &mut Option<T>) {
// Push defensive copies to pushers after the first.
for pusher in self.pushers.iter_mut().skip(1) {
self.spare.clone_from(element);
pusher.push(&mut self.spare);
}
// Push the element itself at the first pusher.
for pusher in self.pushers.iter_mut().take(1) {
pusher.push(element);
}
}
}
3 changes: 3 additions & 0 deletions timely/src/dataflow/scopes/child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ where
fn pipeline<D: 'static>(&mut self, identifier: usize, address: Rc<[usize]>) -> (ThreadPusher<D>, ThreadPuller<D>) {
self.parent.pipeline(identifier, address)
}
fn broadcast<D: Exchangeable + Clone>(&mut self, identifier: usize, address: Rc<[usize]>) -> (Box<dyn Push<D>>, Box<dyn Pull<D>>) {
self.parent.broadcast(identifier, address)
}
fn new_identifier(&mut self) -> usize {
self.parent.new_identifier()
}
Expand Down
1 change: 1 addition & 0 deletions timely/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ mod encoding {
impl<T: Send+Any+Serialize+for<'a>Deserialize<'a>> Data for T { }

/// A wrapper that indicates `bincode` as the serialization/deserialization strategy.
#[derive(Clone)]
pub struct Bincode<T> {
/// Bincode contents.
pub payload: T,
Expand Down
46 changes: 15 additions & 31 deletions timely/src/progress/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,15 @@ use crate::logging::TimelyLogger as Logger;
use crate::logging::TimelyProgressLogger as ProgressLogger;
use crate::Bincode;

/// A list of progress updates corresponding to `((child_scope, [in/out]_port, timestamp), delta)`
pub type ProgressVec<T> = Vec<((Location, T), i64)>;
/// A progress update message consisting of source worker id, sequence number and lists of
/// message and internal updates
pub type ProgressMsg<T> = Bincode<(usize, usize, ProgressVec<T>)>;
pub type ProgressMsg<T> = Bincode<(usize, usize, ChangeBatch<(Location, T)>)>;

/// Manages broadcasting of progress updates to and receiving updates from workers.
pub struct Progcaster<T:Timestamp> {
to_push: Option<ProgressMsg<T>>,
pushers: Vec<Box<dyn Push<ProgressMsg<T>>>>,
/// Pusher into which we send progress updates.
pusher: Box<dyn Push<ProgressMsg<T>>>,
/// Puller from which we recv progress updates.
puller: Box<dyn Pull<ProgressMsg<T>>>,
/// Source worker index
source: usize,
Expand All @@ -27,7 +26,7 @@ pub struct Progcaster<T:Timestamp> {
identifier: usize,
/// Communication channel identifier
channel_identifier: usize,

/// An optional logger to record progress messages.
progress_logging: Option<ProgressLogger<T>>,
}

Expand All @@ -36,15 +35,14 @@ impl<T:Timestamp+Send> Progcaster<T> {
pub fn new<A: crate::worker::AsWorker>(worker: &mut A, addr: Rc<[usize]>, identifier: usize, mut logging: Option<Logger>, progress_logging: Option<ProgressLogger<T>>) -> Progcaster<T> {

let channel_identifier = worker.new_identifier();
let (pushers, puller) = worker.allocate(channel_identifier, addr);
let (pusher, puller) = worker.broadcast(channel_identifier, addr);
logging.as_mut().map(|l| l.log(crate::logging::CommChannelsEvent {
identifier: channel_identifier,
kind: crate::logging::CommChannelKind::Progress,
}));
let worker_index = worker.index();
Progcaster {
to_push: None,
pushers,
pusher,
puller,
source: worker_index,
counter: 0,
Expand Down Expand Up @@ -90,31 +88,17 @@ impl<T:Timestamp+Send> Progcaster<T> {
});
});

for pusher in self.pushers.iter_mut() {

// Attempt to reuse allocations, if possible.
if let Some(tuple) = &mut self.to_push {
tuple.payload.0 = self.source;
tuple.payload.1 = self.counter;
tuple.payload.2.clear();
tuple.payload.2.extend(changes.iter().cloned());
}
// If we don't have an allocation ...
if self.to_push.is_none() {
self.to_push = Some(Bincode::from((
self.source,
self.counter,
changes.clone().into_inner().into_vec(),
)));
}
let payload = (self.source, self.counter, std::mem::take(changes));
let mut to_push = Some(Bincode { payload });
self.pusher.push(&mut to_push);
self.pusher.done();

// TODO: This should probably use a broadcast channel.
pusher.push(&mut self.to_push);
pusher.done();
if let Some(pushed) = to_push {
*changes = pushed.payload.2;
changes.clear();
}

self.counter += 1;
changes.clear();
}
}

Expand All @@ -125,7 +109,7 @@ impl<T:Timestamp+Send> Progcaster<T> {

let source = message.0;
let counter = message.1;
let recv_changes = &message.2;
let recv_changes = &mut message.2;

let channel = self.channel_identifier;

Expand Down
10 changes: 10 additions & 0 deletions timely/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ pub trait AsWorker : Scheduler {
/// that this behavior will be overridden to be more efficient.
fn pipeline<T: 'static>(&mut self, identifier: usize, address: Rc<[usize]>) -> (ThreadPusher<T>, ThreadPuller<T>);

/// Allocates a broadcast channel, where each pushed message is received by all.
fn broadcast<T: Exchangeable + Clone>(&mut self, identifier: usize, address: Rc<[usize]>) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>);

/// Allocates a new worker-unique identifier.
fn new_identifier(&mut self) -> usize;
/// The next worker-unique identifier to be allocated.
Expand Down Expand Up @@ -242,6 +245,13 @@ impl<A: Allocate> AsWorker for Worker<A> {
self.temp_channel_ids.borrow_mut().push(identifier);
self.allocator.borrow_mut().pipeline(identifier)
}
fn broadcast<T: Exchangeable + Clone>(&mut self, identifier: usize, address: Rc<[usize]>) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>) {
if address.is_empty() { panic!("Unacceptable address: Length zero"); }
let mut paths = self.paths.borrow_mut();
paths.insert(identifier, address);
self.temp_channel_ids.borrow_mut().push(identifier);
self.allocator.borrow_mut().broadcast(identifier)
}

fn new_identifier(&mut self) -> usize { self.new_identifier() }
fn peek_identifier(&self) -> usize { self.peek_identifier() }
Expand Down