forked from TimelyDataflow/timely-dataflow
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlgalloc.rs
More file actions
126 lines (102 loc) · 4.03 KB
/
lgalloc.rs
File metadata and controls
126 lines (102 loc) · 4.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
//! Test using lgalloc as the allocator for zero-copy data transfer.
//! Note that this example only works on Linux and MacOS.
#[cfg(any(target_os = "linux", target_os = "macos"))]
mod example {
use std::ops::{Deref, DerefMut};
use std::ptr::NonNull;
use timely_communication::{Allocate, Bytesable};
use timely_communication::allocator::zero_copy::bytes_slab::BytesRefill;
/// A wrapper that indicates the serialization/deserialization strategy.
pub struct Message {
/// Text contents.
pub payload: String,
}
impl Bytesable for Message {
fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self {
Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() }
}
fn length_in_bytes(&self) -> usize {
self.payload.len()
}
fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
writer.write_all(self.payload.as_bytes()).unwrap();
}
}
fn lgalloc_refill(size: usize) -> Box<LgallocHandle> {
let (pointer, capacity, handle) = lgalloc::allocate::<u8>(size).unwrap();
let handle = Some(handle);
Box::new(LgallocHandle { handle, pointer, capacity })
}
struct LgallocHandle {
handle: Option<lgalloc::Handle>,
pointer: NonNull<u8>,
capacity: usize,
}
impl Deref for LgallocHandle {
type Target = [u8];
#[inline(always)]
fn deref(&self) -> &Self::Target {
unsafe { std::slice::from_raw_parts(self.pointer.as_ptr(), self.capacity) }
}
}
impl DerefMut for LgallocHandle {
#[inline(always)]
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe { std::slice::from_raw_parts_mut(self.pointer.as_ptr(), self.capacity) }
}
}
impl Drop for LgallocHandle {
fn drop(&mut self) {
lgalloc::deallocate(self.handle.take().unwrap());
}
}
pub(crate) fn main() {
let mut lgconfig = lgalloc::LgAlloc::new();
lgconfig.enable().with_path(std::env::temp_dir());
lgalloc::lgalloc_set_config(&lgconfig);
let refill = BytesRefill {
logic: std::sync::Arc::new(|size| lgalloc_refill(size) as Box<dyn DerefMut<Target=[u8]>>),
limit: None,
};
// extract the configuration from user-supplied arguments, initialize the computation.
let config = timely_communication::Config::ProcessBinary(4);
let (allocators, others) = config.try_build_with(refill).unwrap();
let guards = timely_communication::initialize_from(allocators, others, |mut allocator| {
println!("worker {} of {} started", allocator.index(), allocator.peers());
// allocates a pair of senders list and one receiver.
let (mut senders, mut receiver) = allocator.allocate(0);
// send typed data along each channel
for (i, sender) in senders.iter_mut().enumerate() {
sender.send(Message { payload: format!("hello, {}", i)});
sender.done();
}
// no support for termination notification,
// we have to count down ourselves.
let mut received = 0;
while received < allocator.peers() {
allocator.receive();
if let Some(message) = receiver.recv() {
println!("worker {}: received: <{}>", allocator.index(), message.payload);
received += 1;
}
allocator.release();
}
allocator.index()
});
// computation runs until guards are joined or dropped.
if let Ok(guards) = guards {
for guard in guards.join() {
println!("result: {:?}", guard);
}
}
else { println!("error in computation"); }
}
}
// Disable entirely on non-Linux and non-MacOS platforms.
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
mod example {
pub fn main() { }
}
pub fn main() {
example::main();
}