Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion communication/src/allocator/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ impl Generic {
Generic::ZeroCopy(z) => z.allocate(identifier),
}
}
/// Constructs several send endpoints and one receive endpoint.
fn broadcast<T: Exchangeable+Clone>(&mut self, identifier: usize) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>) {
match self {
Generic::Thread(t) => t.broadcast(identifier),
Generic::Process(p) => p.broadcast(identifier),
Generic::ProcessBinary(pb) => pb.broadcast(identifier),
Generic::ZeroCopy(z) => z.broadcast(identifier),
}
}
/// Perform work before scheduling operators.
fn receive(&mut self) {
match self {
Expand Down Expand Up @@ -89,7 +98,9 @@ impl Allocate for Generic {
fn allocate<T: Exchangeable>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>) {
self.allocate(identifier)
}

fn broadcast<T: Exchangeable+Clone>(&mut self, identifier: usize) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>) {
self.broadcast(identifier)
}
fn receive(&mut self) { self.receive(); }
fn release(&mut self) { self.release(); }
fn events(&self) -> &Rc<RefCell<Vec<usize>>> { self.events() }
Expand Down
49 changes: 48 additions & 1 deletion communication/src/allocator/zero_copy/allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
let header = MessageHeader {
channel: identifier,
source: self.index,
target: target_index,
target_lower: target_index,
target_upper: target_index + 1,
length: 0,
seqno: 0,
};
Expand All @@ -187,6 +188,52 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
(pushes, puller, )
}

fn broadcast<T: Exchangeable + Clone>(&mut self, identifier: usize) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>) {

// Assume and enforce in-order identifier allocation.
if let Some(bound) = self.channel_id_bound {
assert!(bound < identifier);
}
self.channel_id_bound = Some(identifier);

// Result list of boxed pushers.
// One entry for each process.
let mut pushes = Vec::<Box<dyn Push<T>>>::with_capacity(self.sends.len() + 1);

// Inner exchange allocations.
let inner_peers = self.inner.peers();
let (inner_send, inner_recv) = self.inner.broadcast(identifier);

pushes.push(inner_send);
for (mut index, send) in self.sends.iter().enumerate() {
// The span of worker indexes jumps by `inner_peers` as we skip our own process.
// We bump `index` by one as we pass `self.index/inner_peers` to effect this.
if index >= self.index/inner_peers { index += 1; }
let header = MessageHeader {
channel: identifier,
source: self.index,
target_lower: index * inner_peers,
target_upper: index * inner_peers + inner_peers,
length: 0,
seqno: 0,
};
pushes.push(Box::new(Pusher::new(header, send.clone())))
}

let channel =
self.to_local
.entry(identifier)
.or_insert_with(|| Rc::new(RefCell::new(VecDeque::new())))
.clone();

use crate::allocator::counters::Puller as CountPuller;
let canary = Canary::new(identifier, self.canaries.clone());
let puller = Box::new(CountPuller::new(PullerInner::new(inner_recv, channel, canary), identifier, self.events().clone()));

let pushes = Box::new(crate::allocator::Broadcaster { spare: None, pushers: pushes });
(pushes, puller, )
}

// Perform preparatory work, most likely reading binary buffers from self.recv.
#[inline(never)]
fn receive(&mut self) {
Expand Down
5 changes: 3 additions & 2 deletions communication/src/allocator/zero_copy/allocator_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ impl Allocate for ProcessAllocator {
let header = MessageHeader {
channel: identifier,
source: self.index,
target: target_index,
target_lower: target_index,
target_upper: target_index+1,
length: 0,
seqno: 0,
};
Expand Down Expand Up @@ -192,7 +193,7 @@ impl Allocate for ProcessAllocator {

// Get the header and payload, ditch the header.
let mut peel = bytes.extract_to(header.required_bytes());
let _ = peel.extract_to(40);
let _ = peel.extract_to(header.header_bytes());

// Increment message count for channel.
// Safe to do this even if the channel has been dropped.
Expand Down
7 changes: 5 additions & 2 deletions communication/src/allocator/zero_copy/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ where
});

if header.length > 0 {
stageds[header.target - worker_offset].push(bytes);
for target in header.target_lower .. header.target_upper {
stageds[target - worker_offset].push(bytes.clone());
}
}
else {
// Shutting down; confirm absence of subsequent data.
Expand Down Expand Up @@ -196,7 +198,8 @@ pub fn send_loop<S: Stream>(
let header = MessageHeader {
channel: 0,
source: 0,
target: 0,
target_lower: 0,
target_upper: 0,
length: 0,
seqno: 0,
};
Expand Down
29 changes: 21 additions & 8 deletions communication/src/networking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,13 @@ pub struct MessageHeader {
pub channel: usize,
/// index of worker sending message.
pub source: usize,
/// index of worker receiving message.
pub target: usize,
/// lower bound of index of worker receiving message.
pub target_lower: usize,
/// upper bound of index of worker receiving message.
///
/// This is often `self.target_lower + 1` for point to point messages,
/// but can be larger for broadcast messages.
pub target_upper: usize,
/// number of bytes in message.
pub length: usize,
/// sequence number.
Expand All @@ -40,7 +45,7 @@ pub struct MessageHeader {
impl MessageHeader {

/// The number of `usize` fields in [MessageHeader].
const FIELDS: usize = 5;
const FIELDS: usize = 6;

/// Returns a header when there is enough supporting data
#[inline]
Expand All @@ -52,9 +57,10 @@ impl MessageHeader {
// Order must match writing order.
channel: buffer[0] as usize,
source: buffer[1] as usize,
target: buffer[2] as usize,
length: buffer[3] as usize,
seqno: buffer[4] as usize,
target_lower: buffer[2] as usize,
target_upper: buffer[3] as usize,
length: buffer[4] as usize,
seqno: buffer[5] as usize,
};

if bytes.len() >= header.required_bytes() {
Expand All @@ -72,7 +78,8 @@ impl MessageHeader {
// Order must match reading order.
cursor.write_u64::<ByteOrder>(self.channel as u64)?;
cursor.write_u64::<ByteOrder>(self.source as u64)?;
cursor.write_u64::<ByteOrder>(self.target as u64)?;
cursor.write_u64::<ByteOrder>(self.target_lower as u64)?;
cursor.write_u64::<ByteOrder>(self.target_upper as u64)?;
cursor.write_u64::<ByteOrder>(self.length as u64)?;
cursor.write_u64::<ByteOrder>(self.seqno as u64)?;

Expand All @@ -82,7 +89,13 @@ impl MessageHeader {
/// The number of bytes required for the header and data.
#[inline]
pub fn required_bytes(&self) -> usize {
std::mem::size_of::<u64>() * Self::FIELDS + self.length
self.header_bytes() + self.length
}

/// The number of bytes required for the header.
#[inline(always)]
pub fn header_bytes(&self) -> usize {
std::mem::size_of::<u64>() * Self::FIELDS
}
}

Expand Down