Skip to content

Commit 839d02d

Browse files
Demonstrate BytesSlab with abstracted bytes storage (#641)
* Demonstrate BytesSlab with abstracted bytes storage * lgalloc example Signed-off-by: Moritz Hoffmann <[email protected]> * Fix lgalloc integration Signed-off-by: Moritz Hoffmann <[email protected]> * Only build lgalloc eample on supported os Signed-off-by: Moritz Hoffmann <[email protected]> * Only depend on lgalloc on supported os Signed-off-by: Moritz Hoffmann <[email protected]> --------- Signed-off-by: Moritz Hoffmann <[email protected]> Co-authored-by: Moritz Hoffmann <[email protected]>
1 parent 67d8e05 commit 839d02d

File tree

11 files changed

+186
-28
lines changed

11 files changed

+186
-28
lines changed

communication/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,7 @@ timely_bytes = { path = "../bytes", version = "0.13" }
2525
timely_container = { path = "../container", version = "0.14.0" }
2626
timely_logging = { path = "../logging", version = "0.13" }
2727
crossbeam-channel = "0.5"
28+
29+
# Lgalloc only supports linux and macos, don't depend on any other OS.
30+
[target.'cfg(any(target_os = "linux", target_os = "macos"))'.dev-dependencies]
31+
lgalloc = "0.4"

communication/examples/lgalloc.rs

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
#![cfg(any(target_os = "linux", target_os = "macos"))]
2+
3+
use std::ops::{Deref, DerefMut};
4+
use std::ptr::NonNull;
5+
use timely_communication::{Allocate, Bytesable};
6+
7+
/// A wrapper that indicates the serialization/deserialization strategy.
8+
pub struct Message {
9+
/// Text contents.
10+
pub payload: String,
11+
}
12+
13+
impl Bytesable for Message {
14+
fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self {
15+
Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() }
16+
}
17+
18+
fn length_in_bytes(&self) -> usize {
19+
self.payload.len()
20+
}
21+
22+
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
23+
writer.write_all(self.payload.as_bytes()).unwrap();
24+
}
25+
}
26+
27+
fn lgalloc_refill(size: usize) -> Box<LgallocHandle> {
28+
let (pointer, capacity, handle) = lgalloc::allocate::<u8>(size).unwrap();
29+
let handle = Some(handle);
30+
Box::new(LgallocHandle { handle, pointer, capacity })
31+
}
32+
33+
struct LgallocHandle {
34+
handle: Option<lgalloc::Handle>,
35+
pointer: NonNull<u8>,
36+
capacity: usize,
37+
}
38+
39+
impl Deref for LgallocHandle {
40+
type Target = [u8];
41+
#[inline(always)]
42+
fn deref(&self) -> &Self::Target {
43+
unsafe { std::slice::from_raw_parts(self.pointer.as_ptr(), self.capacity) }
44+
}
45+
}
46+
47+
impl DerefMut for LgallocHandle {
48+
#[inline(always)]
49+
fn deref_mut(&mut self) -> &mut Self::Target {
50+
unsafe { std::slice::from_raw_parts_mut(self.pointer.as_ptr(), self.capacity) }
51+
}
52+
}
53+
54+
impl Drop for LgallocHandle {
55+
fn drop(&mut self) {
56+
lgalloc::deallocate(self.handle.take().unwrap());
57+
}
58+
}
59+
60+
fn main() {
61+
let mut config = lgalloc::LgAlloc::new();
62+
config.enable().with_path(std::env::temp_dir());
63+
lgalloc::lgalloc_set_config(&config);
64+
65+
let refill = std::sync::Arc::new(|size| lgalloc_refill(size) as Box<dyn DerefMut<Target=[u8]>>);
66+
67+
// extract the configuration from user-supplied arguments, initialize the computation.
68+
let config = timely_communication::Config::ProcessBinary(4);
69+
let (allocators, others) = config.try_build_with(refill).unwrap();
70+
let guards = timely_communication::initialize_from(allocators, others, |mut allocator| {
71+
72+
println!("worker {} of {} started", allocator.index(), allocator.peers());
73+
74+
// allocates a pair of senders list and one receiver.
75+
let (mut senders, mut receiver) = allocator.allocate(0);
76+
77+
// send typed data along each channel
78+
for i in 0 .. allocator.peers() {
79+
senders[i].send(Message { payload: format!("hello, {}", i)});
80+
senders[i].done();
81+
}
82+
83+
// no support for termination notification,
84+
// we have to count down ourselves.
85+
let mut received = 0;
86+
while received < allocator.peers() {
87+
88+
allocator.receive();
89+
90+
if let Some(message) = receiver.recv() {
91+
println!("worker {}: received: <{}>", allocator.index(), message.payload);
92+
received += 1;
93+
}
94+
95+
allocator.release();
96+
}
97+
98+
allocator.index()
99+
});
100+
101+
// computation runs until guards are joined or dropped.
102+
if let Ok(guards) = guards {
103+
for guard in guards.join() {
104+
println!("result: {:?}", guard);
105+
}
106+
}
107+
else { println!("error in computation"); }
108+
}

communication/src/allocator/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,10 +126,12 @@ impl<T: Clone> Push<T> for Broadcaster<T> {
126126
}
127127
}
128128

129+
use crate::allocator::zero_copy::bytes_slab::BytesRefill;
130+
129131
/// A builder for vectors of peers.
130132
pub trait PeerBuilder {
131133
/// The peer type.
132134
type Peer: AllocateBuilder + Sized;
133135
/// Allocate a list of `Self::Peer` of length `peers`.
134-
fn new_vector(peers: usize) -> Vec<Self::Peer>;
136+
fn new_vector(peers: usize, refill: BytesRefill) -> Vec<Self::Peer>;
135137
}

communication/src/allocator/process.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ impl Process {
7575
impl PeerBuilder for Process {
7676
type Peer = ProcessBuilder;
7777
/// Allocate a list of connected intra-process allocators.
78-
fn new_vector(peers: usize) -> Vec<ProcessBuilder> {
78+
fn new_vector(peers: usize, _refill: crate::allocator::BytesRefill) -> Vec<ProcessBuilder> {
7979

8080
let mut counters_send = Vec::with_capacity(peers);
8181
let mut counters_recv = Vec::with_capacity(peers);

communication/src/allocator/zero_copy/allocator.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::networking::MessageHeader;
1111
use crate::{Allocate, Push, Pull};
1212
use crate::allocator::{AllocateBuilder, Exchangeable};
1313
use crate::allocator::canary::Canary;
14-
14+
use crate::allocator::zero_copy::bytes_slab::BytesRefill;
1515
use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue};
1616
use super::push_pull::{Pusher, PullerInner};
1717

@@ -27,6 +27,8 @@ pub struct TcpBuilder<A: AllocateBuilder> {
2727
peers: usize, // number of peer allocators.
2828
futures: Vec<Receiver<MergeQueue>>, // to receive queues to each network thread.
2929
promises: Vec<Sender<MergeQueue>>, // to send queues from each network thread.
30+
/// Byte slab refill function.
31+
refill: BytesRefill,
3032
}
3133

3234
/// Creates a vector of builders, sharing appropriate state.
@@ -44,8 +46,9 @@ pub struct TcpBuilder<A: AllocateBuilder> {
4446
pub fn new_vector<A: AllocateBuilder>(
4547
allocators: Vec<A>,
4648
my_process: usize,
47-
processes: usize)
48-
-> (Vec<TcpBuilder<A>>,
49+
processes: usize,
50+
refill: BytesRefill,
51+
) -> (Vec<TcpBuilder<A>>,
4952
Vec<Vec<Sender<MergeQueue>>>,
5053
Vec<Vec<Receiver<MergeQueue>>>)
5154
{
@@ -68,6 +71,7 @@ pub fn new_vector<A: AllocateBuilder>(
6871
peers: threads * processes,
6972
promises,
7073
futures,
74+
refill: refill.clone(),
7175
}})
7276
.collect();
7377

@@ -92,7 +96,7 @@ impl<A: AllocateBuilder> TcpBuilder<A> {
9296
let mut sends = Vec::with_capacity(self.peers);
9397
for pusher in self.futures.into_iter() {
9498
let queue = pusher.recv().expect("Failed to receive push queue");
95-
let sendpoint = SendEndpoint::new(queue);
99+
let sendpoint = SendEndpoint::new(queue, self.refill.clone());
96100
sends.push(Rc::new(RefCell::new(sendpoint)));
97101
}
98102

communication/src/allocator/zero_copy/allocator_process.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::networking::MessageHeader;
1212
use crate::{Allocate, Push, Pull};
1313
use crate::allocator::{AllocateBuilder, Exchangeable, PeerBuilder};
1414
use crate::allocator::canary::Canary;
15-
15+
use crate::allocator::zero_copy::bytes_slab::BytesRefill;
1616
use super::bytes_exchange::{BytesPull, SendEndpoint, MergeQueue};
1717

1818
use super::push_pull::{Pusher, Puller};
@@ -28,14 +28,15 @@ pub struct ProcessBuilder {
2828
peers: usize, // number of peer allocators.
2929
pushers: Vec<Receiver<MergeQueue>>, // for pushing bytes at other workers.
3030
pullers: Vec<Sender<MergeQueue>>, // for pulling bytes from other workers.
31+
refill: BytesRefill,
3132
}
3233

3334
impl PeerBuilder for ProcessBuilder {
3435
type Peer = ProcessBuilder;
3536
/// Creates a vector of builders, sharing appropriate state.
3637
///
3738
/// This method requires access to a byte exchanger, from which it mints channels.
38-
fn new_vector(count: usize) -> Vec<ProcessBuilder> {
39+
fn new_vector(count: usize, refill: BytesRefill) -> Vec<ProcessBuilder> {
3940

4041
// Channels for the exchange of `MergeQueue` endpoints.
4142
let (pullers_vec, pushers_vec) = crate::promise_futures(count, count);
@@ -50,6 +51,7 @@ impl PeerBuilder for ProcessBuilder {
5051
peers: count,
5152
pushers,
5253
pullers,
54+
refill: refill.clone(),
5355
}
5456
)
5557
.collect()
@@ -73,7 +75,7 @@ impl ProcessBuilder {
7375
let mut sends = Vec::with_capacity(self.peers);
7476
for pusher in self.pushers.into_iter() {
7577
let queue = pusher.recv().expect("Failed to receive MergeQueue");
76-
let sendpoint = SendEndpoint::new(queue);
78+
let sendpoint = SendEndpoint::new(queue, self.refill.clone());
7779
sends.push(Rc::new(RefCell::new(sendpoint)));
7880
}
7981

communication/src/allocator/zero_copy/bytes_exchange.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::sync::{Arc, Mutex};
44
use std::collections::VecDeque;
55

66
use timely_bytes::arc::Bytes;
7-
use super::bytes_slab::BytesSlab;
7+
use super::bytes_slab::{BytesRefill, BytesSlab};
88

99
/// A target for `Bytes`.
1010
pub trait BytesPush {
@@ -142,10 +142,10 @@ impl<P: BytesPush> SendEndpoint<P> {
142142
}
143143

144144
/// Allocates a new `BytesSendEndpoint` from a shared queue.
145-
pub fn new(queue: P) -> Self {
145+
pub fn new(queue: P, refill: BytesRefill) -> Self {
146146
SendEndpoint {
147147
send: queue,
148-
buffer: BytesSlab::new(20),
148+
buffer: BytesSlab::new(20, refill),
149149
}
150150
}
151151
/// Makes the next `bytes` bytes valid.

communication/src/allocator/zero_copy/bytes_slab.rs

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
//! A large binary allocation for writing and sharing.
22
3+
use std::ops::{Deref, DerefMut};
34
use timely_bytes::arc::{Bytes, BytesMut};
45

56
/// A large binary allocation for writing and sharing.
@@ -13,17 +14,22 @@ pub struct BytesSlab {
1314
stash: Vec<BytesMut>, // reclaimed and reusable buffers.
1415
shift: usize, // current buffer allocation size.
1516
valid: usize, // buffer[..valid] are valid bytes.
17+
new_bytes: BytesRefill, // function to allocate new buffers.
1618
}
1719

20+
/// A function to allocate a new buffer of at least `usize` bytes.
21+
pub type BytesRefill = std::sync::Arc<dyn Fn(usize) -> Box<dyn DerefMut<Target=[u8]>>+Send+Sync>;
22+
1823
impl BytesSlab {
1924
/// Allocates a new `BytesSlab` with an initial size determined by a shift.
20-
pub fn new(shift: usize) -> Self {
25+
pub fn new(shift: usize, new_bytes: BytesRefill) -> Self {
2126
BytesSlab {
22-
buffer: BytesMut::from(vec![0u8; 1 << shift].into_boxed_slice()),
27+
buffer: BytesMut::from(BoxDerefMut { boxed: new_bytes(1 << shift) }),
2328
in_progress: Vec::new(),
2429
stash: Vec::new(),
2530
shift,
2631
valid: 0,
32+
new_bytes,
2733
}
2834
}
2935
/// The empty region of the slab.
@@ -68,7 +74,7 @@ impl BytesSlab {
6874
if self.stash.is_empty() {
6975
for shared in self.in_progress.iter_mut() {
7076
if let Some(mut bytes) = shared.take() {
71-
if bytes.try_regenerate::<Box<[u8]>>() {
77+
if bytes.try_regenerate::<BoxDerefMut>() {
7278
// NOTE: Test should be redundant, but better safe...
7379
if bytes.len() == (1 << self.shift) {
7480
self.stash.push(bytes);
@@ -82,7 +88,7 @@ impl BytesSlab {
8288
self.in_progress.retain(|x| x.is_some());
8389
}
8490

85-
let new_buffer = self.stash.pop().unwrap_or_else(|| BytesMut::from(vec![0; 1 << self.shift].into_boxed_slice()));
91+
let new_buffer = self.stash.pop().unwrap_or_else(|| BytesMut::from(BoxDerefMut { boxed: (self.new_bytes)(1 << self.shift) }));
8692
let old_buffer = ::std::mem::replace(&mut self.buffer, new_buffer);
8793

8894
self.buffer[.. self.valid].copy_from_slice(&old_buffer[.. self.valid]);
@@ -92,3 +98,21 @@ impl BytesSlab {
9298
}
9399
}
94100
}
101+
102+
/// A wrapper for `Box<dyn DerefMut<Target=T>>` that dereferences to `T` rather than `dyn DerefMut<Target=T>`.
103+
struct BoxDerefMut {
104+
boxed: Box<dyn DerefMut<Target=[u8]>+'static>,
105+
}
106+
107+
impl Deref for BoxDerefMut {
108+
type Target = [u8];
109+
fn deref(&self) -> &Self::Target {
110+
&self.boxed[..]
111+
}
112+
}
113+
114+
impl DerefMut for BoxDerefMut {
115+
fn deref_mut(&mut self) -> &mut Self::Target {
116+
&mut self.boxed[..]
117+
}
118+
}

communication/src/allocator/zero_copy/initialize.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
use std::sync::Arc;
44
use timely_logging::Logger;
55
use crate::allocator::PeerBuilder;
6+
use crate::allocator::zero_copy::bytes_slab::BytesRefill;
67
use crate::logging::CommunicationEventBuilder;
78
use crate::networking::create_sockets;
89
use super::tcp::{send_loop, recv_loop};
@@ -39,12 +40,13 @@ pub fn initialize_networking<P: PeerBuilder>(
3940
my_index: usize,
4041
threads: usize,
4142
noisy: bool,
43+
refill: BytesRefill,
4244
log_sender: Arc<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEventBuilder>>+Send+Sync>,
4345
)
4446
-> ::std::io::Result<(Vec<TcpBuilder<P::Peer>>, CommsGuard)>
4547
{
4648
let sockets = create_sockets(addresses, my_index, noisy)?;
47-
initialize_networking_from_sockets::<_, P>(sockets, my_index, threads, log_sender)
49+
initialize_networking_from_sockets::<_, P>(sockets, my_index, threads, refill, log_sender)
4850
}
4951

5052
/// Initialize send and recv threads from sockets.
@@ -58,6 +60,7 @@ pub fn initialize_networking_from_sockets<S: Stream + 'static, P: PeerBuilder>(
5860
mut sockets: Vec<Option<S>>,
5961
my_index: usize,
6062
threads: usize,
63+
refill: BytesRefill,
6164
log_sender: Arc<dyn Fn(CommunicationSetup)->Option<Logger<CommunicationEventBuilder>>+Send+Sync>,
6265
)
6366
-> ::std::io::Result<(Vec<TcpBuilder<P::Peer>>, CommsGuard)>
@@ -69,14 +72,15 @@ pub fn initialize_networking_from_sockets<S: Stream + 'static, P: PeerBuilder>(
6972

7073
let processes = sockets.len();
7174

72-
let process_allocators = P::new_vector(threads);
73-
let (builders, promises, futures) = new_vector(process_allocators, my_index, processes);
75+
let process_allocators = P::new_vector(threads, refill.clone());
76+
let (builders, promises, futures) = new_vector(process_allocators, my_index, processes, refill.clone());
7477

7578
let mut promises_iter = promises.into_iter();
7679
let mut futures_iter = futures.into_iter();
7780

7881
let mut send_guards = Vec::with_capacity(sockets.len());
7982
let mut recv_guards = Vec::with_capacity(sockets.len());
83+
let refill = refill.clone();
8084

8185
// for each process, if a stream exists (i.e. not local) ...
8286
for (index, stream) in sockets.into_iter().enumerate().filter_map(|(i, s)| s.map(|s| (i, s))) {
@@ -108,6 +112,7 @@ pub fn initialize_networking_from_sockets<S: Stream + 'static, P: PeerBuilder>(
108112
// let remote_sends = remote_sends.clone();
109113
let log_sender = log_sender.clone();
110114
let stream = stream.try_clone()?;
115+
let refill = refill.clone();
111116
let join_guard =
112117
::std::thread::Builder::new()
113118
.name(format!("timely:recv-{}", index))
@@ -117,7 +122,7 @@ pub fn initialize_networking_from_sockets<S: Stream + 'static, P: PeerBuilder>(
117122
sender: false,
118123
remote: Some(index),
119124
});
120-
recv_loop(stream, remote_send, threads * my_index, my_index, index, logger);
125+
recv_loop(stream, remote_send, threads * my_index, my_index, index, refill, logger);
121126
})?;
122127

123128
recv_guards.push(join_guard);

0 commit comments

Comments
 (0)