Skip to content

Commit 67d8e05

Browse files
authored
Zero-copy for networked processes (#642)
* Zero-copy for networked processes Introduce a intra- and inter-process serializing configuration variant. Previously, it wasn't possible to construct a networked configuration that would use zero-copy for process-local communication. Signed-off-by: Moritz Hoffmann <[email protected]> * Oversight Signed-off-by: Moritz Hoffmann <[email protected]> --------- Signed-off-by: Moritz Hoffmann <[email protected]>
1 parent 3380c92 commit 67d8e05

File tree

6 files changed

+58
-19
lines changed

6 files changed

+58
-19
lines changed

communication/src/allocator/generic.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ pub enum Generic {
2525
ProcessBinary(ProcessAllocator),
2626
/// Inter-process allocator.
2727
ZeroCopy(TcpAllocator<Process>),
28+
/// Inter-process allocator, intra-process serializing allocator.
29+
ZeroCopyBinary(TcpAllocator<ProcessAllocator>),
2830
}
2931

3032
impl Generic {
@@ -35,6 +37,7 @@ impl Generic {
3537
Generic::Process(p) => p.index(),
3638
Generic::ProcessBinary(pb) => pb.index(),
3739
Generic::ZeroCopy(z) => z.index(),
40+
Generic::ZeroCopyBinary(z) => z.index(),
3841
}
3942
}
4043
/// The number of workers.
@@ -44,6 +47,7 @@ impl Generic {
4447
Generic::Process(p) => p.peers(),
4548
Generic::ProcessBinary(pb) => pb.peers(),
4649
Generic::ZeroCopy(z) => z.peers(),
50+
Generic::ZeroCopyBinary(z) => z.peers(),
4751
}
4852
}
4953
/// Constructs several send endpoints and one receive endpoint.
@@ -53,6 +57,7 @@ impl Generic {
5357
Generic::Process(p) => p.allocate(identifier),
5458
Generic::ProcessBinary(pb) => pb.allocate(identifier),
5559
Generic::ZeroCopy(z) => z.allocate(identifier),
60+
Generic::ZeroCopyBinary(z) => z.allocate(identifier),
5661
}
5762
}
5863
/// Constructs several send endpoints and one receive endpoint.
@@ -62,6 +67,7 @@ impl Generic {
6267
Generic::Process(p) => p.broadcast(identifier),
6368
Generic::ProcessBinary(pb) => pb.broadcast(identifier),
6469
Generic::ZeroCopy(z) => z.broadcast(identifier),
70+
Generic::ZeroCopyBinary(z) => z.broadcast(identifier),
6571
}
6672
}
6773
/// Perform work before scheduling operators.
@@ -71,6 +77,7 @@ impl Generic {
7177
Generic::Process(p) => p.receive(),
7278
Generic::ProcessBinary(pb) => pb.receive(),
7379
Generic::ZeroCopy(z) => z.receive(),
80+
Generic::ZeroCopyBinary(z) => z.receive(),
7481
}
7582
}
7683
/// Perform work after scheduling operators.
@@ -80,6 +87,7 @@ impl Generic {
8087
Generic::Process(p) => p.release(),
8188
Generic::ProcessBinary(pb) => pb.release(),
8289
Generic::ZeroCopy(z) => z.release(),
90+
Generic::ZeroCopyBinary(z) => z.release(),
8391
}
8492
}
8593
fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
@@ -88,6 +96,7 @@ impl Generic {
8896
Generic::Process(ref p) => p.events(),
8997
Generic::ProcessBinary(ref pb) => pb.events(),
9098
Generic::ZeroCopy(ref z) => z.events(),
99+
Generic::ZeroCopyBinary(ref z) => z.events(),
91100
}
92101
}
93102
}
@@ -110,6 +119,7 @@ impl Allocate for Generic {
110119
Generic::Process(p) => p.await_events(_duration),
111120
Generic::ProcessBinary(pb) => pb.await_events(_duration),
112121
Generic::ZeroCopy(z) => z.await_events(_duration),
122+
Generic::ZeroCopyBinary(z) => z.await_events(_duration),
113123
}
114124
}
115125
}
@@ -129,6 +139,8 @@ pub enum GenericBuilder {
129139
ProcessBinary(ProcessBuilder),
130140
/// Builder for `ZeroCopy` allocator.
131141
ZeroCopy(TcpBuilder<TypedProcessBuilder>),
142+
/// Builder for `ZeroCopyBinary` allocator.
143+
ZeroCopyBinary(TcpBuilder<ProcessBuilder>),
132144
}
133145

134146
impl AllocateBuilder for GenericBuilder {
@@ -139,6 +151,7 @@ impl AllocateBuilder for GenericBuilder {
139151
GenericBuilder::Process(p) => Generic::Process(p.build()),
140152
GenericBuilder::ProcessBinary(pb) => Generic::ProcessBinary(pb.build()),
141153
GenericBuilder::ZeroCopy(z) => Generic::ZeroCopy(z.build()),
154+
GenericBuilder::ZeroCopyBinary(z) => Generic::ZeroCopyBinary(z.build()),
142155
}
143156
}
144157
}

communication/src/allocator/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,3 +125,11 @@ impl<T: Clone> Push<T> for Broadcaster<T> {
125125
}
126126
}
127127
}
128+
129+
/// A builder for vectors of peers.
130+
pub trait PeerBuilder {
131+
/// The peer type.
132+
type Peer: AllocateBuilder + Sized;
133+
/// Allocate a list of `Self::Peer` of length `peers`.
134+
fn new_vector(peers: usize) -> Vec<Self::Peer>;
135+
}

communication/src/allocator/process.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use std::collections::{HashMap};
99
use crossbeam_channel::{Sender, Receiver};
1010

1111
use crate::allocator::thread::{ThreadBuilder};
12-
use crate::allocator::{Allocate, AllocateBuilder, Thread};
12+
use crate::allocator::{Allocate, AllocateBuilder, PeerBuilder, Thread};
1313
use crate::{Push, Pull};
1414
use crate::buzzer::Buzzer;
1515

@@ -70,8 +70,12 @@ pub struct Process {
7070
impl Process {
7171
/// Access the wrapped inner allocator.
7272
pub fn inner(&mut self) -> &mut Thread { &mut self.inner }
73+
}
74+
75+
impl PeerBuilder for Process {
76+
type Peer = ProcessBuilder;
7377
/// Allocate a list of connected intra-process allocators.
74-
pub fn new_vector(peers: usize) -> Vec<ProcessBuilder> {
78+
fn new_vector(peers: usize) -> Vec<ProcessBuilder> {
7579

7680
let mut counters_send = Vec::with_capacity(peers);
7781
let mut counters_recv = Vec::with_capacity(peers);

communication/src/allocator/zero_copy/allocator_process.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use timely_bytes::arc::Bytes;
1010
use crate::networking::MessageHeader;
1111

1212
use crate::{Allocate, Push, Pull};
13-
use crate::allocator::{AllocateBuilder, Exchangeable};
13+
use crate::allocator::{AllocateBuilder, Exchangeable, PeerBuilder};
1414
use crate::allocator::canary::Canary;
1515

1616
use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue};
@@ -30,11 +30,12 @@ pub struct ProcessBuilder {
3030
pullers: Vec<Sender<MergeQueue>>, // for pulling bytes from other workers.
3131
}
3232

33-
impl ProcessBuilder {
33+
impl PeerBuilder for ProcessBuilder {
34+
type Peer = ProcessBuilder;
3435
/// Creates a vector of builders, sharing appropriate state.
3536
///
3637
/// This method requires access to a byte exchanger, from which it mints channels.
37-
pub fn new_vector(count: usize) -> Vec<ProcessBuilder> {
38+
fn new_vector(count: usize) -> Vec<ProcessBuilder> {
3839

3940
// Channels for the exchange of `MergeQueue` endpoints.
4041
let (pullers_vec, pushers_vec) = crate::promise_futures(count, count);
@@ -53,7 +54,9 @@ impl ProcessBuilder {
5354
)
5455
.collect()
5556
}
57+
}
5658

59+
impl ProcessBuilder {
5760
/// Builds a `ProcessAllocator`, instantiating `Rc<RefCell<_>>` elements.
5861
pub fn build(self) -> ProcessAllocator {
5962

communication/src/allocator/zero_copy/initialize.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
33
use std::sync::Arc;
44
use timely_logging::Logger;
5-
use crate::allocator::process::ProcessBuilder;
5+
use crate::allocator::PeerBuilder;
66
use crate::logging::CommunicationEventBuilder;
77
use crate::networking::create_sockets;
88
use super::tcp::{send_loop, recv_loop};
@@ -34,17 +34,17 @@ impl Drop for CommsGuard {
3434
use crate::logging::CommunicationSetup;
3535

3636
/// Initializes network connections
37-
pub fn initialize_networking(
37+
pub fn initialize_networking<P: PeerBuilder>(
3838
addresses: Vec<String>,
3939
my_index: usize,
4040
threads: usize,
4141
noisy: bool,
4242
log_sender: Arc<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEventBuilder>>+Send+Sync>,
4343
)
44-
-> ::std::io::Result<(Vec<TcpBuilder<ProcessBuilder>>, CommsGuard)>
44+
-> ::std::io::Result<(Vec<TcpBuilder<P::Peer>>, CommsGuard)>
4545
{
4646
let sockets = create_sockets(addresses, my_index, noisy)?;
47-
initialize_networking_from_sockets(sockets, my_index, threads, log_sender)
47+
initialize_networking_from_sockets::<_, P>(sockets, my_index, threads, log_sender)
4848
}
4949

5050
/// Initialize send and recv threads from sockets.
@@ -54,13 +54,13 @@ pub fn initialize_networking(
5454
///
5555
/// It is important that the `sockets` argument contain sockets for each remote process, in order, and
5656
/// with position `my_index` set to `None`.
57-
pub fn initialize_networking_from_sockets<S: Stream + 'static>(
57+
pub fn initialize_networking_from_sockets<S: Stream + 'static, P: PeerBuilder>(
5858
mut sockets: Vec<Option<S>>,
5959
my_index: usize,
6060
threads: usize,
6161
log_sender: Arc<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEventBuilder>>+Send+Sync>,
6262
)
63-
-> ::std::io::Result<(Vec<TcpBuilder<ProcessBuilder>>, CommsGuard)>
63+
-> ::std::io::Result<(Vec<TcpBuilder<P::Peer>>, CommsGuard)>
6464
{
6565
// Sockets are expected to be blocking,
6666
for socket in sockets.iter_mut().flatten() {
@@ -69,7 +69,7 @@ pub fn initialize_networking_from_sockets<S: Stream + 'static>(
6969

7070
let processes = sockets.len();
7171

72-
let process_allocators = crate::allocator::process::Process::new_vector(threads);
72+
let process_allocators = P::new_vector(threads);
7373
let (builders, promises, futures) = new_vector(process_allocators, my_index, processes);
7474

7575
let mut promises_iter = promises.into_iter();

communication/src/initialize.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use getopts;
1212
use timely_logging::Logger;
1313

1414
use crate::allocator::thread::ThreadBuilder;
15-
use crate::allocator::{AllocateBuilder, Process, Generic, GenericBuilder};
15+
use crate::allocator::{AllocateBuilder, Process, Generic, GenericBuilder, PeerBuilder};
1616
use crate::allocator::zero_copy::allocator_process::ProcessBuilder;
1717
use crate::allocator::zero_copy::initialize::initialize_networking;
1818
use crate::logging::{CommunicationEventBuilder, CommunicationSetup};
@@ -36,6 +36,8 @@ pub enum Config {
3636
addresses: Vec<String>,
3737
/// Verbosely report connection process
3838
report: bool,
39+
/// Enable intra-process zero-copy
40+
zerocopy: bool,
3941
/// Closure to create a new logger for a communication thread
4042
log_fn: Arc<dyn Fn(CommunicationSetup) -> Option<Logger<CommunicationEventBuilder>> + Send + Sync>,
4143
}
@@ -47,14 +49,14 @@ impl Debug for Config {
4749
Config::Thread => write!(f, "Config::Thread()"),
4850
Config::Process(n) => write!(f, "Config::Process({})", n),
4951
Config::ProcessBinary(n) => write!(f, "Config::ProcessBinary({})", n),
50-
Config::Cluster { threads, process, addresses, report, .. } => f
52+
Config::Cluster { threads, process, addresses, report, zerocopy, log_fn: _ } => f
5153
.debug_struct("Config::Cluster")
5254
.field("threads", threads)
5355
.field("process", process)
5456
.field("addresses", addresses)
5557
.field("report", report)
56-
// TODO: Use `.finish_non_exhaustive()` after rust/#67364 lands
57-
.finish()
58+
.field("zerocopy", zerocopy)
59+
.finish_non_exhaustive()
5860
}
5961
}
6062
}
@@ -113,12 +115,13 @@ impl Config {
113115
}
114116
}
115117

116-
assert!(processes == addresses.len());
118+
assert_eq!(processes, addresses.len());
117119
Ok(Config::Cluster {
118120
threads,
119121
process,
120122
addresses,
121123
report,
124+
zerocopy,
122125
log_fn: Arc::new(|_| None),
123126
})
124127
} else if threads > 1 {
@@ -158,14 +161,22 @@ impl Config {
158161
Config::ProcessBinary(threads) => {
159162
Ok((ProcessBuilder::new_vector(threads).into_iter().map(GenericBuilder::ProcessBinary).collect(), Box::new(())))
160163
},
161-
Config::Cluster { threads, process, addresses, report, log_fn } => {
162-
match initialize_networking(addresses, process, threads, report, log_fn) {
164+
Config::Cluster { threads, process, addresses, report, zerocopy: false, log_fn } => {
165+
match initialize_networking::<Process>(addresses, process, threads, report, log_fn) {
163166
Ok((stuff, guard)) => {
164167
Ok((stuff.into_iter().map(GenericBuilder::ZeroCopy).collect(), Box::new(guard)))
165168
},
166169
Err(err) => Err(format!("failed to initialize networking: {}", err))
167170
}
168171
},
172+
Config::Cluster { threads, process, addresses, report, zerocopy: true, log_fn } => {
173+
match initialize_networking::<ProcessBuilder>(addresses, process, threads, report, log_fn) {
174+
Ok((stuff, guard)) => {
175+
Ok((stuff.into_iter().map(GenericBuilder::ZeroCopyBinary).collect(), Box::new(guard)))
176+
},
177+
Err(err) => Err(format!("failed to initialize networking: {}", err))
178+
}
179+
}
169180
}
170181
}
171182
}

0 commit comments

Comments
 (0)