Skip to content

Commit 8f15860

Browse files
Allow limits on spare buffers maintained (#644)
1 parent 839d02d commit 8f15860

File tree

3 files changed

+24
-7
lines changed

3 files changed

+24
-7
lines changed

communication/examples/lgalloc.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
use std::ops::{Deref, DerefMut};
44
use std::ptr::NonNull;
55
use timely_communication::{Allocate, Bytesable};
6+
use timely_communication::allocator::zero_copy::bytes_slab::BytesRefill;
67

78
/// A wrapper that indicates the serialization/deserialization strategy.
89
pub struct Message {
@@ -62,7 +63,10 @@ fn main() {
6263
config.enable().with_path(std::env::temp_dir());
6364
lgalloc::lgalloc_set_config(&config);
6465

65-
let refill = std::sync::Arc::new(|size| lgalloc_refill(size) as Box<dyn DerefMut<Target=[u8]>>);
66+
let refill = BytesRefill {
67+
logic: std::sync::Arc::new(|size| lgalloc_refill(size) as Box<dyn DerefMut<Target=[u8]>>),
68+
limit: None,
69+
};
6670

6771
// extract the configuration from user-supplied arguments, initialize the computation.
6872
let config = timely_communication::Config::ProcessBinary(4);

communication/src/allocator/zero_copy/bytes_slab.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,23 @@ pub struct BytesSlab {
1414
stash: Vec<BytesMut>, // reclaimed and reusable buffers.
1515
shift: usize, // current buffer allocation size.
1616
valid: usize, // buffer[..valid] are valid bytes.
17-
new_bytes: BytesRefill, // function to allocate new buffers.
17+
new_bytes: BytesRefill, // function to allocate new buffers.
1818
}
1919

20-
/// A function to allocate a new buffer of at least `usize` bytes.
21-
pub type BytesRefill = std::sync::Arc<dyn Fn(usize) -> Box<dyn DerefMut<Target=[u8]>>+Send+Sync>;
20+
/// Ability to acquire and policy to retain byte buffers.
21+
#[derive(Clone)]
22+
pub struct BytesRefill {
23+
/// Logic to acquire a new buffer of a certain number of bytes.
24+
pub logic: std::sync::Arc<dyn Fn(usize) -> Box<dyn DerefMut<Target=[u8]>>+Send+Sync>,
25+
/// An optional limit on the number of empty buffers retained.
26+
pub limit: Option<usize>,
27+
}
2228

2329
impl BytesSlab {
2430
/// Allocates a new `BytesSlab` with an initial size determined by a shift.
2531
pub fn new(shift: usize, new_bytes: BytesRefill) -> Self {
2632
BytesSlab {
27-
buffer: BytesMut::from(BoxDerefMut { boxed: new_bytes(1 << shift) }),
33+
buffer: BytesMut::from(BoxDerefMut { boxed: (new_bytes.logic)(1 << shift) }),
2834
in_progress: Vec::new(),
2935
stash: Vec::new(),
3036
shift,
@@ -88,9 +94,13 @@ impl BytesSlab {
8894
self.in_progress.retain(|x| x.is_some());
8995
}
9096

91-
let new_buffer = self.stash.pop().unwrap_or_else(|| BytesMut::from(BoxDerefMut { boxed: (self.new_bytes)(1 << self.shift) }));
97+
let new_buffer = self.stash.pop().unwrap_or_else(|| BytesMut::from(BoxDerefMut { boxed: (self.new_bytes.logic)(1 << self.shift) }));
9298
let old_buffer = ::std::mem::replace(&mut self.buffer, new_buffer);
9399

100+
if let Some(limit) = self.new_bytes.limit {
101+
self.stash.truncate(limit);
102+
}
103+
94104
self.buffer[.. self.valid].copy_from_slice(&old_buffer[.. self.valid]);
95105
if !increased_shift {
96106
self.in_progress.push(Some(old_buffer));

communication/src/initialize.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,10 @@ impl Config {
152152

153153
/// Attempts to assemble the described communication infrastructure.
154154
pub fn try_build(self) -> Result<(Vec<GenericBuilder>, Box<dyn Any+Send>), String> {
155-
let refill = Arc::new(|size| Box::new(vec![0_u8; size]) as Box<dyn DerefMut<Target=[u8]>>);
155+
let refill = BytesRefill {
156+
logic: Arc::new(|size| Box::new(vec![0_u8; size]) as Box<dyn DerefMut<Target=[u8]>>),
157+
limit: None,
158+
};
156159
self.try_build_with(refill)
157160
}
158161

0 commit comments

Comments
 (0)