Skip to content

Commit 771df11

Browse files
committed
Broadcast channels for remote processes
1 parent 38ef688 commit 771df11

File tree

4 files changed

+67
-11
lines changed

4 files changed

+67
-11
lines changed

communication/src/allocator/zero_copy/allocator.rs

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,8 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
163163
let header = MessageHeader {
164164
channel: identifier,
165165
source: self.index,
166-
target: target_index,
166+
target_lower: target_index,
167+
target_upper: target_index + 1,
167168
length: 0,
168169
seqno: 0,
169170
};
@@ -187,6 +188,50 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
187188
(pushes, puller, )
188189
}
189190

191+
fn broadcast<T: Exchangeable + Clone>(&mut self, identifier: usize) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>) {
192+
193+
// Assume and enforce in-order identifier allocation.
194+
if let Some(bound) = self.channel_id_bound {
195+
assert!(bound < identifier);
196+
}
197+
self.channel_id_bound = Some(identifier);
198+
199+
// Result list of boxed pushers.
200+
// One entry for each process.
201+
let mut pushes = Vec::<Box<dyn Push<T>>>::with_capacity(self.sends.len() + 1);
202+
203+
// Inner exchange allocations.
204+
let inner_peers = self.inner.peers();
205+
let (inner_send, inner_recv) = self.inner.broadcast(identifier);
206+
207+
pushes.push(inner_send);
208+
for (mut index, send) in self.sends.iter().enumerate() {
209+
if index >= self.index/inner_peers { index += 1; }
210+
let header = MessageHeader {
211+
channel: identifier,
212+
source: self.index,
213+
target_lower: index * inner_peers,
214+
target_upper: index * inner_peers + inner_peers,
215+
length: 0,
216+
seqno: 0,
217+
};
218+
pushes.push(Box::new(Pusher::new(header, send.clone())))
219+
}
220+
221+
let channel =
222+
self.to_local
223+
.entry(identifier)
224+
.or_insert_with(|| Rc::new(RefCell::new(VecDeque::new())))
225+
.clone();
226+
227+
use crate::allocator::counters::Puller as CountPuller;
228+
let canary = Canary::new(identifier, self.canaries.clone());
229+
let puller = Box::new(CountPuller::new(PullerInner::new(inner_recv, channel, canary), identifier, self.events().clone()));
230+
231+
let pushes = Box::new(crate::allocator::Broadcaster { spare: None, pushers: pushes });
232+
(pushes, puller, )
233+
}
234+
190235
// Perform preparatory work, most likely reading binary buffers from self.recv.
191236
#[inline(never)]
192237
fn receive(&mut self) {

communication/src/allocator/zero_copy/allocator_process.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,8 @@ impl Allocate for ProcessAllocator {
135135
let header = MessageHeader {
136136
channel: identifier,
137137
source: self.index,
138-
target: target_index,
138+
target_lower: target_index,
139+
target_upper: target_index+1,
139140
length: 0,
140141
seqno: 0,
141142
};

communication/src/allocator/zero_copy/tcp.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,9 @@ where
9494
});
9595

9696
if header.length > 0 {
97-
stageds[header.target - worker_offset].push(bytes);
97+
for target in header.target_lower .. header.target_upper {
98+
stageds[target - worker_offset].push(bytes.clone());
99+
}
98100
}
99101
else {
100102
// Shutting down; confirm absence of subsequent data.
@@ -196,7 +198,8 @@ pub fn send_loop<S: Stream>(
196198
let header = MessageHeader {
197199
channel: 0,
198200
source: 0,
199-
target: 0,
201+
target_lower: 0,
202+
target_upper: 0,
200203
length: 0,
201204
seqno: 0,
202205
};

communication/src/networking.rs

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,13 @@ pub struct MessageHeader {
2929
pub channel: usize,
3030
/// index of worker sending message.
3131
pub source: usize,
32-
/// index of worker receiving message.
33-
pub target: usize,
32+
/// lower bound of index of worker receiving message.
33+
pub target_lower: usize,
34+
/// upper bound of index of worker receiving message.
35+
///
36+
/// This is often `self.target_lower + 1` for point to point messages,
37+
/// but can be larger for broadcast messages.
38+
pub target_upper: usize,
3439
/// number of bytes in message.
3540
pub length: usize,
3641
/// sequence number.
@@ -40,7 +45,7 @@ pub struct MessageHeader {
4045
impl MessageHeader {
4146

4247
/// The number of `usize` fields in [MessageHeader].
43-
const FIELDS: usize = 5;
48+
const FIELDS: usize = 6;
4449

4550
/// Returns a header when there is enough supporting data
4651
#[inline]
@@ -52,9 +57,10 @@ impl MessageHeader {
5257
// Order must match writing order.
5358
channel: buffer[0] as usize,
5459
source: buffer[1] as usize,
55-
target: buffer[2] as usize,
56-
length: buffer[3] as usize,
57-
seqno: buffer[4] as usize,
60+
target_lower: buffer[2] as usize,
61+
target_upper: buffer[3] as usize,
62+
length: buffer[4] as usize,
63+
seqno: buffer[5] as usize,
5864
};
5965

6066
if bytes.len() >= header.required_bytes() {
@@ -72,7 +78,8 @@ impl MessageHeader {
7278
// Order must match reading order.
7379
cursor.write_u64::<ByteOrder>(self.channel as u64)?;
7480
cursor.write_u64::<ByteOrder>(self.source as u64)?;
75-
cursor.write_u64::<ByteOrder>(self.target as u64)?;
81+
cursor.write_u64::<ByteOrder>(self.target_lower as u64)?;
82+
cursor.write_u64::<ByteOrder>(self.target_upper as u64)?;
7683
cursor.write_u64::<ByteOrder>(self.length as u64)?;
7784
cursor.write_u64::<ByteOrder>(self.seqno as u64)?;
7885

0 commit comments

Comments
 (0)