diff --git a/communication/src/allocator/generic.rs b/communication/src/allocator/generic.rs index 605ade7a1..da6ca85b1 100644 --- a/communication/src/allocator/generic.rs +++ b/communication/src/allocator/generic.rs @@ -55,6 +55,15 @@ impl Generic { Generic::ZeroCopy(z) => z.allocate(identifier), } } + /// Constructs several send endpoints and one receive endpoint. + fn broadcast(&mut self, identifier: usize) -> (Box>, Box>) { + match self { + Generic::Thread(t) => t.broadcast(identifier), + Generic::Process(p) => p.broadcast(identifier), + Generic::ProcessBinary(pb) => pb.broadcast(identifier), + Generic::ZeroCopy(z) => z.broadcast(identifier), + } + } /// Perform work before scheduling operators. fn receive(&mut self) { match self { @@ -89,7 +98,9 @@ impl Allocate for Generic { fn allocate(&mut self, identifier: usize) -> (Vec>>, Box>) { self.allocate(identifier) } - + fn broadcast(&mut self, identifier: usize) -> (Box>, Box>) { + self.broadcast(identifier) + } fn receive(&mut self) { self.receive(); } fn release(&mut self) { self.release(); } fn events(&self) -> &Rc>> { self.events() } diff --git a/communication/src/allocator/zero_copy/allocator.rs b/communication/src/allocator/zero_copy/allocator.rs index 7944abe30..b9470102a 100644 --- a/communication/src/allocator/zero_copy/allocator.rs +++ b/communication/src/allocator/zero_copy/allocator.rs @@ -163,7 +163,8 @@ impl Allocate for TcpAllocator { let header = MessageHeader { channel: identifier, source: self.index, - target: target_index, + target_lower: target_index, + target_upper: target_index + 1, length: 0, seqno: 0, }; @@ -187,6 +188,52 @@ impl Allocate for TcpAllocator { (pushes, puller, ) } + fn broadcast(&mut self, identifier: usize) -> (Box>, Box>) { + + // Assume and enforce in-order identifier allocation. + if let Some(bound) = self.channel_id_bound { + assert!(bound < identifier); + } + self.channel_id_bound = Some(identifier); + + // Result list of boxed pushers. + // One entry for each process. + let mut pushes = Vec::>>::with_capacity(self.sends.len() + 1); + + // Inner exchange allocations. + let inner_peers = self.inner.peers(); + let (inner_send, inner_recv) = self.inner.broadcast(identifier); + + pushes.push(inner_send); + for (mut index, send) in self.sends.iter().enumerate() { + // The span of worker indexes jumps by `inner_peers` as we skip our own process. + // We bump `index` by one as we pass `self.index/inner_peers` to effect this. + if index >= self.index/inner_peers { index += 1; } + let header = MessageHeader { + channel: identifier, + source: self.index, + target_lower: index * inner_peers, + target_upper: index * inner_peers + inner_peers, + length: 0, + seqno: 0, + }; + pushes.push(Box::new(Pusher::new(header, send.clone()))) + } + + let channel = + self.to_local + .entry(identifier) + .or_insert_with(|| Rc::new(RefCell::new(VecDeque::new()))) + .clone(); + + use crate::allocator::counters::Puller as CountPuller; + let canary = Canary::new(identifier, self.canaries.clone()); + let puller = Box::new(CountPuller::new(PullerInner::new(inner_recv, channel, canary), identifier, self.events().clone())); + + let pushes = Box::new(crate::allocator::Broadcaster { spare: None, pushers: pushes }); + (pushes, puller, ) + } + // Perform preparatory work, most likely reading binary buffers from self.recv. #[inline(never)] fn receive(&mut self) { diff --git a/communication/src/allocator/zero_copy/allocator_process.rs b/communication/src/allocator/zero_copy/allocator_process.rs index 9f2f93384..16468fbef 100644 --- a/communication/src/allocator/zero_copy/allocator_process.rs +++ b/communication/src/allocator/zero_copy/allocator_process.rs @@ -135,7 +135,8 @@ impl Allocate for ProcessAllocator { let header = MessageHeader { channel: identifier, source: self.index, - target: target_index, + target_lower: target_index, + target_upper: target_index+1, length: 0, seqno: 0, }; @@ -192,7 +193,7 @@ impl Allocate for ProcessAllocator { // Get the header and payload, ditch the header. let mut peel = bytes.extract_to(header.required_bytes()); - let _ = peel.extract_to(40); + let _ = peel.extract_to(header.header_bytes()); // Increment message count for channel. // Safe to do this even if the channel has been dropped. diff --git a/communication/src/allocator/zero_copy/tcp.rs b/communication/src/allocator/zero_copy/tcp.rs index 0bb89a0ea..9c0d3e655 100644 --- a/communication/src/allocator/zero_copy/tcp.rs +++ b/communication/src/allocator/zero_copy/tcp.rs @@ -94,7 +94,9 @@ where }); if header.length > 0 { - stageds[header.target - worker_offset].push(bytes); + for target in header.target_lower .. header.target_upper { + stageds[target - worker_offset].push(bytes.clone()); + } } else { // Shutting down; confirm absence of subsequent data. @@ -196,7 +198,8 @@ pub fn send_loop( let header = MessageHeader { channel: 0, source: 0, - target: 0, + target_lower: 0, + target_upper: 0, length: 0, seqno: 0, }; diff --git a/communication/src/networking.rs b/communication/src/networking.rs index 754424955..251e34427 100644 --- a/communication/src/networking.rs +++ b/communication/src/networking.rs @@ -29,8 +29,13 @@ pub struct MessageHeader { pub channel: usize, /// index of worker sending message. pub source: usize, - /// index of worker receiving message. - pub target: usize, + /// lower bound of index of worker receiving message. + pub target_lower: usize, + /// upper bound of index of worker receiving message. + /// + /// This is often `self.target_lower + 1` for point to point messages, + /// but can be larger for broadcast messages. + pub target_upper: usize, /// number of bytes in message. pub length: usize, /// sequence number. @@ -40,7 +45,7 @@ pub struct MessageHeader { impl MessageHeader { /// The number of `usize` fields in [MessageHeader]. - const FIELDS: usize = 5; + const FIELDS: usize = 6; /// Returns a header when there is enough supporting data #[inline] @@ -52,9 +57,10 @@ impl MessageHeader { // Order must match writing order. channel: buffer[0] as usize, source: buffer[1] as usize, - target: buffer[2] as usize, - length: buffer[3] as usize, - seqno: buffer[4] as usize, + target_lower: buffer[2] as usize, + target_upper: buffer[3] as usize, + length: buffer[4] as usize, + seqno: buffer[5] as usize, }; if bytes.len() >= header.required_bytes() { @@ -72,7 +78,8 @@ impl MessageHeader { // Order must match reading order. cursor.write_u64::(self.channel as u64)?; cursor.write_u64::(self.source as u64)?; - cursor.write_u64::(self.target as u64)?; + cursor.write_u64::(self.target_lower as u64)?; + cursor.write_u64::(self.target_upper as u64)?; cursor.write_u64::(self.length as u64)?; cursor.write_u64::(self.seqno as u64)?; @@ -82,7 +89,13 @@ impl MessageHeader { /// The number of bytes required for the header and data. #[inline] pub fn required_bytes(&self) -> usize { - std::mem::size_of::() * Self::FIELDS + self.length + self.header_bytes() + self.length + } + + /// The number of bytes required for the header. + #[inline(always)] + pub fn header_bytes(&self) -> usize { + std::mem::size_of::() * Self::FIELDS } }