Skip to content

Commit 42fe8bc

Browse files
Introduce foundation for broadcast channel (#633)
* Broadcast channel in communication * Broadcast channel in timely * Exchange ChangeBatch rather than Vec * Improve Broadcaster copies by one
1 parent 54f9b49 commit 42fe8bc

File tree

5 files changed

+57
-31
lines changed

5 files changed

+57
-31
lines changed

communication/src/allocator/mod.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,4 +96,32 @@ pub trait Allocate {
9696
{
9797
thread::Thread::new_from(identifier, self.events().clone())
9898
}
99+
100+
/// Allocates a broadcast channel, where each pushed message is received by all.
101+
fn broadcast<T: Exchangeable + Clone>(&mut self, identifier: usize) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>) {
102+
let (pushers, pull) = self.allocate(identifier);
103+
(Box::new(Broadcaster { spare: None, pushers }), pull)
104+
}
105+
}
106+
107+
/// An adapter to broadcast any pushed element.
108+
struct Broadcaster<T> {
109+
/// Spare element for defensive copies.
110+
spare: Option<T>,
111+
/// Destinations to which pushed elements should be broadcast.
112+
pushers: Vec<Box<dyn Push<T>>>,
113+
}
114+
115+
impl<T: Clone> Push<T> for Broadcaster<T> {
116+
fn push(&mut self, element: &mut Option<T>) {
117+
// Push defensive copies to pushers after the first.
118+
for pusher in self.pushers.iter_mut().skip(1) {
119+
self.spare.clone_from(element);
120+
pusher.push(&mut self.spare);
121+
}
122+
// Push the element itself at the first pusher.
123+
for pusher in self.pushers.iter_mut().take(1) {
124+
pusher.push(element);
125+
}
126+
}
99127
}

timely/src/dataflow/scopes/child.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ where
6464
fn pipeline<D: 'static>(&mut self, identifier: usize, address: Rc<[usize]>) -> (ThreadPusher<D>, ThreadPuller<D>) {
6565
self.parent.pipeline(identifier, address)
6666
}
67+
fn broadcast<D: Exchangeable + Clone>(&mut self, identifier: usize, address: Rc<[usize]>) -> (Box<dyn Push<D>>, Box<dyn Pull<D>>) {
68+
self.parent.broadcast(identifier, address)
69+
}
6770
fn new_identifier(&mut self) -> usize {
6871
self.parent.new_identifier()
6972
}

timely/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ mod encoding {
130130
impl<T: Send+Any+Serialize+for<'a>Deserialize<'a>> Data for T { }
131131

132132
/// A wrapper that indicates `bincode` as the serialization/deserialization strategy.
133+
#[derive(Clone)]
133134
pub struct Bincode<T> {
134135
/// Bincode contents.
135136
pub payload: T,

timely/src/progress/broadcast.rs

Lines changed: 15 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,15 @@ use crate::logging::TimelyLogger as Logger;
88
use crate::logging::TimelyProgressLogger as ProgressLogger;
99
use crate::Bincode;
1010

11-
/// A list of progress updates corresponding to `((child_scope, [in/out]_port, timestamp), delta)`
12-
pub type ProgressVec<T> = Vec<((Location, T), i64)>;
1311
/// A progress update message consisting of source worker id, sequence number and lists of
1412
/// message and internal updates
15-
pub type ProgressMsg<T> = Bincode<(usize, usize, ProgressVec<T>)>;
13+
pub type ProgressMsg<T> = Bincode<(usize, usize, ChangeBatch<(Location, T)>)>;
1614

1715
/// Manages broadcasting of progress updates to and receiving updates from workers.
1816
pub struct Progcaster<T:Timestamp> {
19-
to_push: Option<ProgressMsg<T>>,
20-
pushers: Vec<Box<dyn Push<ProgressMsg<T>>>>,
17+
/// Pusher into which we send progress updates.
18+
pusher: Box<dyn Push<ProgressMsg<T>>>,
19+
/// Puller from which we recv progress updates.
2120
puller: Box<dyn Pull<ProgressMsg<T>>>,
2221
/// Source worker index
2322
source: usize,
@@ -27,7 +26,7 @@ pub struct Progcaster<T:Timestamp> {
2726
identifier: usize,
2827
/// Communication channel identifier
2928
channel_identifier: usize,
30-
29+
/// An optional logger to record progress messages.
3130
progress_logging: Option<ProgressLogger<T>>,
3231
}
3332

@@ -36,15 +35,14 @@ impl<T:Timestamp+Send> Progcaster<T> {
3635
pub fn new<A: crate::worker::AsWorker>(worker: &mut A, addr: Rc<[usize]>, identifier: usize, mut logging: Option<Logger>, progress_logging: Option<ProgressLogger<T>>) -> Progcaster<T> {
3736

3837
let channel_identifier = worker.new_identifier();
39-
let (pushers, puller) = worker.allocate(channel_identifier, addr);
38+
let (pusher, puller) = worker.broadcast(channel_identifier, addr);
4039
logging.as_mut().map(|l| l.log(crate::logging::CommChannelsEvent {
4140
identifier: channel_identifier,
4241
kind: crate::logging::CommChannelKind::Progress,
4342
}));
4443
let worker_index = worker.index();
4544
Progcaster {
46-
to_push: None,
47-
pushers,
45+
pusher,
4846
puller,
4947
source: worker_index,
5048
counter: 0,
@@ -90,31 +88,17 @@ impl<T:Timestamp+Send> Progcaster<T> {
9088
});
9189
});
9290

93-
for pusher in self.pushers.iter_mut() {
94-
95-
// Attempt to reuse allocations, if possible.
96-
if let Some(tuple) = &mut self.to_push {
97-
tuple.payload.0 = self.source;
98-
tuple.payload.1 = self.counter;
99-
tuple.payload.2.clear();
100-
tuple.payload.2.extend(changes.iter().cloned());
101-
}
102-
// If we don't have an allocation ...
103-
if self.to_push.is_none() {
104-
self.to_push = Some(Bincode::from((
105-
self.source,
106-
self.counter,
107-
changes.clone().into_inner().into_vec(),
108-
)));
109-
}
91+
let payload = (self.source, self.counter, std::mem::take(changes));
92+
let mut to_push = Some(Bincode { payload });
93+
self.pusher.push(&mut to_push);
94+
self.pusher.done();
11095

111-
// TODO: This should probably use a broadcast channel.
112-
pusher.push(&mut self.to_push);
113-
pusher.done();
96+
if let Some(pushed) = to_push {
97+
*changes = pushed.payload.2;
98+
changes.clear();
11499
}
115100

116101
self.counter += 1;
117-
changes.clear();
118102
}
119103
}
120104

@@ -125,7 +109,7 @@ impl<T:Timestamp+Send> Progcaster<T> {
125109

126110
let source = message.0;
127111
let counter = message.1;
128-
let recv_changes = &message.2;
112+
let recv_changes = &mut message.2;
129113

130114
let channel = self.channel_identifier;
131115

timely/src/worker.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,9 @@ pub trait AsWorker : Scheduler {
193193
/// that this behavior will be overridden to be more efficient.
194194
fn pipeline<T: 'static>(&mut self, identifier: usize, address: Rc<[usize]>) -> (ThreadPusher<T>, ThreadPuller<T>);
195195

196+
/// Allocates a broadcast channel, where each pushed message is received by all.
197+
fn broadcast<T: Exchangeable + Clone>(&mut self, identifier: usize, address: Rc<[usize]>) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>);
198+
196199
/// Allocates a new worker-unique identifier.
197200
fn new_identifier(&mut self) -> usize;
198201
/// The next worker-unique identifier to be allocated.
@@ -242,6 +245,13 @@ impl<A: Allocate> AsWorker for Worker<A> {
242245
self.temp_channel_ids.borrow_mut().push(identifier);
243246
self.allocator.borrow_mut().pipeline(identifier)
244247
}
248+
fn broadcast<T: Exchangeable + Clone>(&mut self, identifier: usize, address: Rc<[usize]>) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>) {
249+
if address.is_empty() { panic!("Unacceptable address: Length zero"); }
250+
let mut paths = self.paths.borrow_mut();
251+
paths.insert(identifier, address);
252+
self.temp_channel_ids.borrow_mut().push(identifier);
253+
self.allocator.borrow_mut().broadcast(identifier)
254+
}
245255

246256
fn new_identifier(&mut self) -> usize { self.new_identifier() }
247257
fn peek_identifier(&self) -> usize { self.peek_identifier() }

0 commit comments

Comments
 (0)