Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 104 additions & 34 deletions bytes/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
//! A simplified implementation of the `bytes` crate, with different features, less safety.
//!
//! The crate is currently minimalist rather than maximalist, and for example does not support
//! methods on `BytesMut` that seem like they should be safe, because they are not yet needed.
//! For example, `BytesMut` should be able to implement `Send`, and `BytesMut::extract_to` could
//! return a `BytesMut` rather than a `Bytes`.
//!
//! # Examples
//!
//! ```
//! use timely_bytes::arc::Bytes;
//! use timely_bytes::arc::BytesMut;
//!
//! let bytes = vec![0u8; 1024];
//! let mut shared1 = Bytes::from(bytes);
//! let mut shared1 = BytesMut::from(bytes);
//! let mut shared2 = shared1.extract_to(100);
//! let mut shared3 = shared1.extract_to(100);
//! let mut shared4 = shared2.extract_to(60);
Expand All @@ -17,13 +22,10 @@
//! assert_eq!(shared4.len(), 60);
//!
//! for byte in shared1.iter_mut() { *byte = 1u8; }
//! for byte in shared2.iter_mut() { *byte = 2u8; }
//! for byte in shared3.iter_mut() { *byte = 3u8; }
//! for byte in shared4.iter_mut() { *byte = 4u8; }
//!
//! // memory in slabs [4, 2, 3, 1]: merge back in arbitrary order.
//! shared2.try_merge(shared3).ok().expect("Failed to merge 2 and 3");
//! shared2.try_merge(shared1).ok().expect("Failed to merge 23 and 1");
//! shared2.try_merge(shared1.freeze()).ok().expect("Failed to merge 23 and 1");
//! shared4.try_merge(shared2).ok().expect("Failed to merge 4 and 231");
//!
//! assert_eq!(shared4.len(), 1024);
Expand All @@ -38,7 +40,11 @@ pub mod arc {
use std::any::Any;

/// A thread-safe byte buffer backed by a shared allocation.
pub struct Bytes {
///
/// An instance of this type contends that `ptr` is valid for `len` bytes,
/// and that no other reference to these bytes exists, other than through
/// the type currently held in `sequestered`.
pub struct BytesMut {
/// Pointer to the start of this slice (not the allocation).
ptr: *mut u8,
/// Length of this slice.
Expand All @@ -51,15 +57,10 @@ pub mod arc {
sequestered: Arc<dyn Any>,
}

// Synchronization happens through `self.sequestered`, which mean to ensure that even
// across multiple threads each region of the slice is uniquely "owned", if not in the
// traditional Rust sense.
unsafe impl Send for Bytes { }

impl Bytes {
impl BytesMut {

/// Create a new instance from a byte allocation.
pub fn from<B>(bytes: B) -> Bytes where B : DerefMut<Target=[u8]>+'static {
pub fn from<B>(bytes: B) -> BytesMut where B : DerefMut<Target=[u8]>+'static {

// Sequester allocation behind an `Arc`, which *should* keep the address
// stable for the lifetime of `sequestered`. The `Arc` also serves as our
Expand All @@ -73,7 +74,7 @@ pub mod arc {
.map(|a| (a.as_mut_ptr(), a.len()))
.unwrap();

Bytes {
BytesMut {
ptr,
len,
sequestered,
Expand All @@ -90,40 +91,40 @@ pub mod arc {

assert!(index <= self.len);

let result = Bytes {
let result = BytesMut {
ptr: self.ptr,
len: index,
sequestered: self.sequestered.clone(),
};

unsafe { self.ptr = self.ptr.add(index); }
self.ptr = self.ptr.wrapping_add(index);
self.len -= index;

result
result.freeze()
}

/// Regenerates the Bytes if it is uniquely held.
/// Regenerates the BytesMut if it is uniquely held.
///
/// If uniquely held, this method recovers the initial pointer and length
/// of the sequestered allocation and re-initializes the Bytes. The return
/// of the sequestered allocation and re-initializes the BytesMut. The return
/// value indicates whether this occurred.
///
/// # Examples
///
/// ```
/// use timely_bytes::arc::Bytes;
/// use timely_bytes::arc::BytesMut;
///
/// let bytes = vec![0u8; 1024];
/// let mut shared1 = Bytes::from(bytes);
/// let mut shared1 = BytesMut::from(bytes);
/// let mut shared2 = shared1.extract_to(100);
/// let mut shared3 = shared1.extract_to(100);
/// let mut shared4 = shared2.extract_to(60);
///
/// drop(shared1);
/// drop(shared3);
/// drop(shared2);
/// drop(shared4);
/// assert!(shared3.try_regenerate::<Vec<u8>>());
/// assert!(shared3.len() == 1024);
/// assert!(shared1.try_regenerate::<Vec<u8>>());
/// assert!(shared1.len() == 1024);
/// ```
pub fn try_regenerate<B>(&mut self) -> bool where B: DerefMut<Target=[u8]>+'static {
// Only possible if this is the only reference to the sequestered allocation.
Expand All @@ -138,6 +139,80 @@ pub mod arc {
}
}

/// Converts a writeable byte slice to a shareable byte slice.
#[inline(always)]
pub fn freeze(self) -> Bytes {
Bytes {
ptr: self.ptr,
len: self.len,
sequestered: self.sequestered,
}
}
}

impl Deref for BytesMut {
type Target = [u8];
#[inline(always)]
fn deref(&self) -> &[u8] {
unsafe { ::std::slice::from_raw_parts(self.ptr, self.len) }
}
}

impl DerefMut for BytesMut {
#[inline(always)]
fn deref_mut(&mut self) -> &mut [u8] {
unsafe { ::std::slice::from_raw_parts_mut(self.ptr, self.len) }
}
}


/// A thread-safe shared byte buffer backed by a shared allocation.
///
/// An instance of this type contends that `ptr` is valid for `len` bytes,
/// and that no other mutable reference to these bytes exists, other than
/// through the type currently held in `sequestered`.
#[derive(Clone)]
pub struct Bytes {
/// Pointer to the start of this slice (not the allocation).
ptr: *const u8,
/// Length of this slice.
len: usize,
/// Shared access to underlying resources.
///
/// Importantly, this is unavailable for as long as the struct exists, which may
/// prevent shared access to ptr[0 .. len]. I'm not sure I understand Rust's rules
/// enough to make a stronger statement about this.
sequestered: Arc<dyn Any>,
}

// Synchronization happens through `self.sequestered`, which means to ensure that even
// across multiple threads the referenced range of bytes remain valid.
unsafe impl Send for Bytes { }

impl Bytes {

/// Extracts [0, index) into a new `Bytes` which is returned, updating `self`.
///
/// # Safety
///
/// This method first tests `index` against `self.len`, which should ensure that both
/// the returned `Bytes` contains valid memory, and that `self` can no longer access it.
pub fn extract_to(&mut self, index: usize) -> Bytes {

assert!(index <= self.len);

let result = Bytes {
ptr: self.ptr,
len: index,
sequestered: self.sequestered.clone(),
};

self.ptr = self.ptr.wrapping_add(index);
self.len -= index;

result
}

/// Attempts to merge adjacent slices from the same allocation.
///
/// If the merge succeeds then `other.len` is added to `self` and the result is `Ok(())`.
Expand All @@ -147,10 +222,10 @@ pub mod arc {
/// # Examples
///
/// ```
/// use timely_bytes::arc::Bytes;
/// use timely_bytes::arc::BytesMut;
///
/// let bytes = vec![0u8; 1024];
/// let mut shared1 = Bytes::from(bytes);
/// let mut shared1 = BytesMut::from(bytes).freeze();
/// let mut shared2 = shared1.extract_to(100);
/// let mut shared3 = shared1.extract_to(100);
/// let mut shared4 = shared2.extract_to(60);
Expand All @@ -161,7 +236,7 @@ pub mod arc {
/// shared4.try_merge(shared2).ok().expect("Failed to merge 4 and 231");
/// ```
pub fn try_merge(&mut self, other: Bytes) -> Result<(), Bytes> {
if Arc::ptr_eq(&self.sequestered, &other.sequestered) && ::std::ptr::eq(unsafe { self.ptr.add(self.len) }, other.ptr) {
if Arc::ptr_eq(&self.sequestered, &other.sequestered) && ::std::ptr::eq(self.ptr.wrapping_add(self.len), other.ptr) {
self.len += other.len;
Ok(())
}
Expand All @@ -173,14 +248,9 @@ pub mod arc {

impl Deref for Bytes {
type Target = [u8];
#[inline(always)]
fn deref(&self) -> &[u8] {
unsafe { ::std::slice::from_raw_parts(self.ptr, self.len) }
}
}

impl DerefMut for Bytes {
fn deref_mut(&mut self) -> &mut [u8] {
unsafe { ::std::slice::from_raw_parts_mut(self.ptr, self.len) }
}
}
}
2 changes: 1 addition & 1 deletion communication/src/allocator/zero_copy/allocator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
// No splitting occurs across allocations.
while bytes.len() > 0 {

if let Some(header) = MessageHeader::try_read(&mut bytes[..]) {
if let Some(header) = MessageHeader::try_read(&bytes[..]) {

// Get the header and payload, ditch the header.
let mut peel = bytes.extract_to(header.required_bytes());
Expand Down
2 changes: 1 addition & 1 deletion communication/src/allocator/zero_copy/allocator_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ impl Allocate for ProcessAllocator {
// No splitting occurs across allocations.
while bytes.len() > 0 {

if let Some(header) = MessageHeader::try_read(&mut bytes[..]) {
if let Some(header) = MessageHeader::try_read(&bytes[..]) {

// Get the header and payload, ditch the header.
let mut peel = bytes.extract_to(header.required_bytes());
Expand Down
14 changes: 7 additions & 7 deletions communication/src/allocator/zero_copy/bytes_slab.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
//! A large binary allocation for writing and sharing.

use timely_bytes::arc::Bytes;
use timely_bytes::arc::{Bytes, BytesMut};

/// A large binary allocation for writing and sharing.
///
/// A bytes slab wraps a `Bytes` and maintains a valid (written) length, and supports writing after
/// A bytes slab wraps a `BytesMut` and maintains a valid (written) length, and supports writing after
/// this valid length, and extracting `Bytes` up to this valid length. Extracted bytes are enqueued
/// and checked for uniqueness in order to recycle them (once all shared references are dropped).
pub struct BytesSlab {
buffer: Bytes, // current working buffer.
in_progress: Vec<Option<Bytes>>, // buffers shared with workers.
stash: Vec<Bytes>, // reclaimed and reusable buffers.
buffer: BytesMut, // current working buffer.
in_progress: Vec<Option<BytesMut>>, // buffers shared with workers.
stash: Vec<BytesMut>, // reclaimed and reusable buffers.
shift: usize, // current buffer allocation size.
valid: usize, // buffer[..valid] are valid bytes.
}
Expand All @@ -19,7 +19,7 @@ impl BytesSlab {
/// Allocates a new `BytesSlab` with an initial size determined by a shift.
pub fn new(shift: usize) -> Self {
BytesSlab {
buffer: Bytes::from(vec![0u8; 1 << shift].into_boxed_slice()),
buffer: BytesMut::from(vec![0u8; 1 << shift].into_boxed_slice()),
in_progress: Vec::new(),
stash: Vec::new(),
shift,
Expand Down Expand Up @@ -82,7 +82,7 @@ impl BytesSlab {
self.in_progress.retain(|x| x.is_some());
}

let new_buffer = self.stash.pop().unwrap_or_else(|| Bytes::from(vec![0; 1 << self.shift].into_boxed_slice()));
let new_buffer = self.stash.pop().unwrap_or_else(|| BytesMut::from(vec![0; 1 << self.shift].into_boxed_slice()));
let old_buffer = ::std::mem::replace(&mut self.buffer, new_buffer);

self.buffer[.. self.valid].copy_from_slice(&old_buffer[.. self.valid]);
Expand Down
4 changes: 2 additions & 2 deletions communication/src/allocator/zero_copy/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,12 @@ pub fn send_loop<S: Stream>(
}
else {
// TODO: Could do scatter/gather write here.
for mut bytes in stash.drain(..) {
for bytes in stash.drain(..) {

// Record message sends.
logger.as_mut().map(|logger| {
let mut offset = 0;
while let Some(header) = MessageHeader::try_read(&mut bytes[offset..]) {
while let Some(header) = MessageHeader::try_read(&bytes[offset..]) {
logger.log(MessageEvent { is_send: true, header, });
offset += header.required_bytes();
}
Expand Down
2 changes: 1 addition & 1 deletion communication/src/networking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl MessageHeader {

/// Returns a header when there is enough supporting data
#[inline]
pub fn try_read(bytes: &mut [u8]) -> Option<MessageHeader> {
pub fn try_read(bytes: &[u8]) -> Option<MessageHeader> {
let mut cursor = io::Cursor::new(&bytes[..]);
let mut buffer = [0; Self::FIELDS];
cursor.read_u64_into::<ByteOrder>(&mut buffer).ok()?;
Expand Down