|
1 | | -#![cfg(any(target_os = "linux", target_os = "macos"))] |
| 1 | +//! Test using lgalloc as the allocator for zero-copy data transfer. |
| 2 | +//! Note that this example only works on Linux and MacOS. |
| 3 | +
|
| 4 | +#[cfg(any(target_os = "linux", target_os = "macos"))] |
| 5 | +mod example { |
| 6 | + use std::ops::{Deref, DerefMut}; |
| 7 | + use std::ptr::NonNull; |
| 8 | + use timely_communication::{Allocate, Bytesable}; |
| 9 | + use timely_communication::allocator::zero_copy::bytes_slab::BytesRefill; |
| 10 | + |
| 11 | + /// A wrapper that indicates the serialization/deserialization strategy. |
| 12 | + pub struct Message { |
| 13 | + /// Text contents. |
| 14 | + pub payload: String, |
| 15 | + } |
2 | 16 |
|
3 | | -use std::ops::{Deref, DerefMut}; |
4 | | -use std::ptr::NonNull; |
5 | | -use timely_communication::{Allocate, Bytesable}; |
6 | | -use timely_communication::allocator::zero_copy::bytes_slab::BytesRefill; |
| 17 | + impl Bytesable for Message { |
| 18 | + fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self { |
| 19 | + Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() } |
| 20 | + } |
7 | 21 |
|
8 | | -/// A wrapper that indicates the serialization/deserialization strategy. |
9 | | -pub struct Message { |
10 | | - /// Text contents. |
11 | | - pub payload: String, |
12 | | -} |
| 22 | + fn length_in_bytes(&self) -> usize { |
| 23 | + self.payload.len() |
| 24 | + } |
13 | 25 |
|
14 | | -impl Bytesable for Message { |
15 | | - fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self { |
16 | | - Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() } |
| 26 | + fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) { |
| 27 | + writer.write_all(self.payload.as_bytes()).unwrap(); |
| 28 | + } |
17 | 29 | } |
18 | 30 |
|
19 | | - fn length_in_bytes(&self) -> usize { |
20 | | - self.payload.len() |
| 31 | + fn lgalloc_refill(size: usize) -> Box<LgallocHandle> { |
| 32 | + let (pointer, capacity, handle) = lgalloc::allocate::<u8>(size).unwrap(); |
| 33 | + let handle = Some(handle); |
| 34 | + Box::new(LgallocHandle { handle, pointer, capacity }) |
21 | 35 | } |
22 | 36 |
|
23 | | - fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) { |
24 | | - writer.write_all(self.payload.as_bytes()).unwrap(); |
| 37 | + struct LgallocHandle { |
| 38 | + handle: Option<lgalloc::Handle>, |
| 39 | + pointer: NonNull<u8>, |
| 40 | + capacity: usize, |
25 | 41 | } |
26 | | -} |
27 | 42 |
|
28 | | -fn lgalloc_refill(size: usize) -> Box<LgallocHandle> { |
29 | | - let (pointer, capacity, handle) = lgalloc::allocate::<u8>(size).unwrap(); |
30 | | - let handle = Some(handle); |
31 | | - Box::new(LgallocHandle { handle, pointer, capacity }) |
32 | | -} |
33 | | - |
34 | | -struct LgallocHandle { |
35 | | - handle: Option<lgalloc::Handle>, |
36 | | - pointer: NonNull<u8>, |
37 | | - capacity: usize, |
38 | | -} |
39 | | - |
40 | | -impl Deref for LgallocHandle { |
41 | | - type Target = [u8]; |
42 | | - #[inline(always)] |
43 | | - fn deref(&self) -> &Self::Target { |
44 | | - unsafe { std::slice::from_raw_parts(self.pointer.as_ptr(), self.capacity) } |
| 43 | + impl Deref for LgallocHandle { |
| 44 | + type Target = [u8]; |
| 45 | + #[inline(always)] |
| 46 | + fn deref(&self) -> &Self::Target { |
| 47 | + unsafe { std::slice::from_raw_parts(self.pointer.as_ptr(), self.capacity) } |
| 48 | + } |
45 | 49 | } |
46 | | -} |
47 | 50 |
|
48 | | -impl DerefMut for LgallocHandle { |
49 | | - #[inline(always)] |
50 | | - fn deref_mut(&mut self) -> &mut Self::Target { |
51 | | - unsafe { std::slice::from_raw_parts_mut(self.pointer.as_ptr(), self.capacity) } |
| 51 | + impl DerefMut for LgallocHandle { |
| 52 | + #[inline(always)] |
| 53 | + fn deref_mut(&mut self) -> &mut Self::Target { |
| 54 | + unsafe { std::slice::from_raw_parts_mut(self.pointer.as_ptr(), self.capacity) } |
| 55 | + } |
52 | 56 | } |
53 | | -} |
54 | 57 |
|
55 | | -impl Drop for LgallocHandle { |
56 | | - fn drop(&mut self) { |
57 | | - lgalloc::deallocate(self.handle.take().unwrap()); |
| 58 | + impl Drop for LgallocHandle { |
| 59 | + fn drop(&mut self) { |
| 60 | + lgalloc::deallocate(self.handle.take().unwrap()); |
| 61 | + } |
58 | 62 | } |
59 | | -} |
60 | 63 |
|
61 | | -fn main() { |
62 | | - let mut config = lgalloc::LgAlloc::new(); |
63 | | - config.enable().with_path(std::env::temp_dir()); |
64 | | - lgalloc::lgalloc_set_config(&config); |
| 64 | + pub(crate) fn main() { |
| 65 | + let mut lgconfig = lgalloc::LgAlloc::new(); |
| 66 | + lgconfig.enable().with_path(std::env::temp_dir()); |
| 67 | + lgalloc::lgalloc_set_config(&lgconfig); |
65 | 68 |
|
66 | | - let refill = BytesRefill { |
67 | | - logic: std::sync::Arc::new(|size| lgalloc_refill(size) as Box<dyn DerefMut<Target=[u8]>>), |
68 | | - limit: None, |
69 | | - }; |
| 69 | + let refill = BytesRefill { |
| 70 | + logic: std::sync::Arc::new(|size| lgalloc_refill(size) as Box<dyn DerefMut<Target=[u8]>>), |
| 71 | + limit: None, |
| 72 | + }; |
70 | 73 |
|
71 | | - // extract the configuration from user-supplied arguments, initialize the computation. |
72 | | - let config = timely_communication::Config::ProcessBinary(4); |
73 | | - let (allocators, others) = config.try_build_with(refill).unwrap(); |
74 | | - let guards = timely_communication::initialize_from(allocators, others, |mut allocator| { |
| 74 | + // extract the configuration from user-supplied arguments, initialize the computation. |
| 75 | + let config = timely_communication::Config::ProcessBinary(4); |
| 76 | + let (allocators, others) = config.try_build_with(refill).unwrap(); |
| 77 | + let guards = timely_communication::initialize_from(allocators, others, |mut allocator| { |
75 | 78 |
|
76 | | - println!("worker {} of {} started", allocator.index(), allocator.peers()); |
| 79 | + println!("worker {} of {} started", allocator.index(), allocator.peers()); |
77 | 80 |
|
78 | | - // allocates a pair of senders list and one receiver. |
79 | | - let (mut senders, mut receiver) = allocator.allocate(0); |
| 81 | + // allocates a pair of senders list and one receiver. |
| 82 | + let (mut senders, mut receiver) = allocator.allocate(0); |
80 | 83 |
|
81 | | - // send typed data along each channel |
82 | | - for i in 0 .. allocator.peers() { |
83 | | - senders[i].send(Message { payload: format!("hello, {}", i)}); |
84 | | - senders[i].done(); |
85 | | - } |
| 84 | + // send typed data along each channel |
| 85 | + for (i, sender) in senders.iter_mut().enumerate() { |
| 86 | + sender.send(Message { payload: format!("hello, {}", i)}); |
| 87 | + sender.done(); |
| 88 | + } |
86 | 89 |
|
87 | | - // no support for termination notification, |
88 | | - // we have to count down ourselves. |
89 | | - let mut received = 0; |
90 | | - while received < allocator.peers() { |
| 90 | + // no support for termination notification, |
| 91 | + // we have to count down ourselves. |
| 92 | + let mut received = 0; |
| 93 | + while received < allocator.peers() { |
91 | 94 |
|
92 | | - allocator.receive(); |
| 95 | + allocator.receive(); |
93 | 96 |
|
94 | | - if let Some(message) = receiver.recv() { |
95 | | - println!("worker {}: received: <{}>", allocator.index(), message.payload); |
96 | | - received += 1; |
97 | | - } |
| 97 | + if let Some(message) = receiver.recv() { |
| 98 | + println!("worker {}: received: <{}>", allocator.index(), message.payload); |
| 99 | + received += 1; |
| 100 | + } |
98 | 101 |
|
99 | | - allocator.release(); |
100 | | - } |
| 102 | + allocator.release(); |
| 103 | + } |
101 | 104 |
|
102 | | - allocator.index() |
103 | | - }); |
| 105 | + allocator.index() |
| 106 | + }); |
104 | 107 |
|
105 | | - // computation runs until guards are joined or dropped. |
106 | | - if let Ok(guards) = guards { |
107 | | - for guard in guards.join() { |
108 | | - println!("result: {:?}", guard); |
| 108 | + // computation runs until guards are joined or dropped. |
| 109 | + if let Ok(guards) = guards { |
| 110 | + for guard in guards.join() { |
| 111 | + println!("result: {:?}", guard); |
| 112 | + } |
109 | 113 | } |
| 114 | + else { println!("error in computation"); } |
110 | 115 | } |
111 | | - else { println!("error in computation"); } |
| 116 | +} |
| 117 | + |
| 118 | +// Disable entirely on non-Linux and non-MacOS platforms. |
| 119 | +#[cfg(not(any(target_os = "linux", target_os = "macos")))] |
| 120 | +mod example { |
| 121 | + pub fn main() { } |
| 122 | +} |
| 123 | + |
| 124 | +pub fn main() { |
| 125 | + example::main(); |
112 | 126 | } |
0 commit comments