diff --git a/bytes/src/lib.rs b/bytes/src/lib.rs index 7cb803274..8dd23f403 100644 --- a/bytes/src/lib.rs +++ b/bytes/src/lib.rs @@ -1,12 +1,17 @@ //! A simplified implementation of the `bytes` crate, with different features, less safety. //! +//! The crate is currently minimalist rather than maximalist, and for example does not support +//! methods on `BytesMut` that seem like they should be safe, because they are not yet needed. +//! For example, `BytesMut` should be able to implement `Send`, and `BytesMut::extract_to` could +//! return a `BytesMut` rather than a `Bytes`. +//! //! # Examples //! //! ``` -//! use timely_bytes::arc::Bytes; +//! use timely_bytes::arc::BytesMut; //! //! let bytes = vec![0u8; 1024]; -//! let mut shared1 = Bytes::from(bytes); +//! let mut shared1 = BytesMut::from(bytes); //! let mut shared2 = shared1.extract_to(100); //! let mut shared3 = shared1.extract_to(100); //! let mut shared4 = shared2.extract_to(60); @@ -17,13 +22,10 @@ //! assert_eq!(shared4.len(), 60); //! //! for byte in shared1.iter_mut() { *byte = 1u8; } -//! for byte in shared2.iter_mut() { *byte = 2u8; } -//! for byte in shared3.iter_mut() { *byte = 3u8; } -//! for byte in shared4.iter_mut() { *byte = 4u8; } //! //! // memory in slabs [4, 2, 3, 1]: merge back in arbitrary order. //! shared2.try_merge(shared3).ok().expect("Failed to merge 2 and 3"); -//! shared2.try_merge(shared1).ok().expect("Failed to merge 23 and 1"); +//! shared2.try_merge(shared1.freeze()).ok().expect("Failed to merge 23 and 1"); //! shared4.try_merge(shared2).ok().expect("Failed to merge 4 and 231"); //! //! assert_eq!(shared4.len(), 1024); @@ -38,7 +40,11 @@ pub mod arc { use std::any::Any; /// A thread-safe byte buffer backed by a shared allocation. - pub struct Bytes { + /// + /// An instance of this type contends that `ptr` is valid for `len` bytes, + /// and that no other reference to these bytes exists, other than through + /// the type currently held in `sequestered`. + pub struct BytesMut { /// Pointer to the start of this slice (not the allocation). ptr: *mut u8, /// Length of this slice. @@ -51,15 +57,10 @@ pub mod arc { sequestered: Arc, } - // Synchronization happens through `self.sequestered`, which mean to ensure that even - // across multiple threads each region of the slice is uniquely "owned", if not in the - // traditional Rust sense. - unsafe impl Send for Bytes { } - - impl Bytes { + impl BytesMut { /// Create a new instance from a byte allocation. - pub fn from(bytes: B) -> Bytes where B : DerefMut+'static { + pub fn from(bytes: B) -> BytesMut where B : DerefMut+'static { // Sequester allocation behind an `Arc`, which *should* keep the address // stable for the lifetime of `sequestered`. The `Arc` also serves as our @@ -73,7 +74,7 @@ pub mod arc { .map(|a| (a.as_mut_ptr(), a.len())) .unwrap(); - Bytes { + BytesMut { ptr, len, sequestered, @@ -90,40 +91,40 @@ pub mod arc { assert!(index <= self.len); - let result = Bytes { + let result = BytesMut { ptr: self.ptr, len: index, sequestered: self.sequestered.clone(), }; - unsafe { self.ptr = self.ptr.add(index); } + self.ptr = self.ptr.wrapping_add(index); self.len -= index; - result + result.freeze() } - /// Regenerates the Bytes if it is uniquely held. + /// Regenerates the BytesMut if it is uniquely held. /// /// If uniquely held, this method recovers the initial pointer and length - /// of the sequestered allocation and re-initializes the Bytes. The return + /// of the sequestered allocation and re-initializes the BytesMut. The return /// value indicates whether this occurred. /// /// # Examples /// /// ``` - /// use timely_bytes::arc::Bytes; + /// use timely_bytes::arc::BytesMut; /// /// let bytes = vec![0u8; 1024]; - /// let mut shared1 = Bytes::from(bytes); + /// let mut shared1 = BytesMut::from(bytes); /// let mut shared2 = shared1.extract_to(100); /// let mut shared3 = shared1.extract_to(100); /// let mut shared4 = shared2.extract_to(60); /// - /// drop(shared1); + /// drop(shared3); /// drop(shared2); /// drop(shared4); - /// assert!(shared3.try_regenerate::>()); - /// assert!(shared3.len() == 1024); + /// assert!(shared1.try_regenerate::>()); + /// assert!(shared1.len() == 1024); /// ``` pub fn try_regenerate(&mut self) -> bool where B: DerefMut+'static { // Only possible if this is the only reference to the sequestered allocation. @@ -138,6 +139,80 @@ pub mod arc { } } + /// Converts a writeable byte slice to a shareable byte slice. + #[inline(always)] + pub fn freeze(self) -> Bytes { + Bytes { + ptr: self.ptr, + len: self.len, + sequestered: self.sequestered, + } + } + } + + impl Deref for BytesMut { + type Target = [u8]; + #[inline(always)] + fn deref(&self) -> &[u8] { + unsafe { ::std::slice::from_raw_parts(self.ptr, self.len) } + } + } + + impl DerefMut for BytesMut { + #[inline(always)] + fn deref_mut(&mut self) -> &mut [u8] { + unsafe { ::std::slice::from_raw_parts_mut(self.ptr, self.len) } + } + } + + + /// A thread-safe shared byte buffer backed by a shared allocation. + /// + /// An instance of this type contends that `ptr` is valid for `len` bytes, + /// and that no other mutable reference to these bytes exists, other than + /// through the type currently held in `sequestered`. + #[derive(Clone)] + pub struct Bytes { + /// Pointer to the start of this slice (not the allocation). + ptr: *const u8, + /// Length of this slice. + len: usize, + /// Shared access to underlying resources. + /// + /// Importantly, this is unavailable for as long as the struct exists, which may + /// prevent shared access to ptr[0 .. len]. I'm not sure I understand Rust's rules + /// enough to make a stronger statement about this. + sequestered: Arc, + } + + // Synchronization happens through `self.sequestered`, which means to ensure that even + // across multiple threads the referenced range of bytes remain valid. + unsafe impl Send for Bytes { } + + impl Bytes { + + /// Extracts [0, index) into a new `Bytes` which is returned, updating `self`. + /// + /// # Safety + /// + /// This method first tests `index` against `self.len`, which should ensure that both + /// the returned `Bytes` contains valid memory, and that `self` can no longer access it. + pub fn extract_to(&mut self, index: usize) -> Bytes { + + assert!(index <= self.len); + + let result = Bytes { + ptr: self.ptr, + len: index, + sequestered: self.sequestered.clone(), + }; + + self.ptr = self.ptr.wrapping_add(index); + self.len -= index; + + result + } + /// Attempts to merge adjacent slices from the same allocation. /// /// If the merge succeeds then `other.len` is added to `self` and the result is `Ok(())`. @@ -147,10 +222,10 @@ pub mod arc { /// # Examples /// /// ``` - /// use timely_bytes::arc::Bytes; + /// use timely_bytes::arc::BytesMut; /// /// let bytes = vec![0u8; 1024]; - /// let mut shared1 = Bytes::from(bytes); + /// let mut shared1 = BytesMut::from(bytes).freeze(); /// let mut shared2 = shared1.extract_to(100); /// let mut shared3 = shared1.extract_to(100); /// let mut shared4 = shared2.extract_to(60); @@ -161,7 +236,7 @@ pub mod arc { /// shared4.try_merge(shared2).ok().expect("Failed to merge 4 and 231"); /// ``` pub fn try_merge(&mut self, other: Bytes) -> Result<(), Bytes> { - if Arc::ptr_eq(&self.sequestered, &other.sequestered) && ::std::ptr::eq(unsafe { self.ptr.add(self.len) }, other.ptr) { + if Arc::ptr_eq(&self.sequestered, &other.sequestered) && ::std::ptr::eq(self.ptr.wrapping_add(self.len), other.ptr) { self.len += other.len; Ok(()) } @@ -173,14 +248,9 @@ pub mod arc { impl Deref for Bytes { type Target = [u8]; + #[inline(always)] fn deref(&self) -> &[u8] { unsafe { ::std::slice::from_raw_parts(self.ptr, self.len) } } } - - impl DerefMut for Bytes { - fn deref_mut(&mut self) -> &mut [u8] { - unsafe { ::std::slice::from_raw_parts_mut(self.ptr, self.len) } - } - } } diff --git a/communication/src/allocator/zero_copy/allocator.rs b/communication/src/allocator/zero_copy/allocator.rs index f1dda0fb5..7944abe30 100644 --- a/communication/src/allocator/zero_copy/allocator.rs +++ b/communication/src/allocator/zero_copy/allocator.rs @@ -220,7 +220,7 @@ impl Allocate for TcpAllocator { // No splitting occurs across allocations. while bytes.len() > 0 { - if let Some(header) = MessageHeader::try_read(&mut bytes[..]) { + if let Some(header) = MessageHeader::try_read(&bytes[..]) { // Get the header and payload, ditch the header. let mut peel = bytes.extract_to(header.required_bytes()); diff --git a/communication/src/allocator/zero_copy/allocator_process.rs b/communication/src/allocator/zero_copy/allocator_process.rs index d4eed9b43..9f2f93384 100644 --- a/communication/src/allocator/zero_copy/allocator_process.rs +++ b/communication/src/allocator/zero_copy/allocator_process.rs @@ -188,7 +188,7 @@ impl Allocate for ProcessAllocator { // No splitting occurs across allocations. while bytes.len() > 0 { - if let Some(header) = MessageHeader::try_read(&mut bytes[..]) { + if let Some(header) = MessageHeader::try_read(&bytes[..]) { // Get the header and payload, ditch the header. let mut peel = bytes.extract_to(header.required_bytes()); diff --git a/communication/src/allocator/zero_copy/bytes_slab.rs b/communication/src/allocator/zero_copy/bytes_slab.rs index 1fb2de2c3..7278d3885 100644 --- a/communication/src/allocator/zero_copy/bytes_slab.rs +++ b/communication/src/allocator/zero_copy/bytes_slab.rs @@ -1,16 +1,16 @@ //! A large binary allocation for writing and sharing. -use timely_bytes::arc::Bytes; +use timely_bytes::arc::{Bytes, BytesMut}; /// A large binary allocation for writing and sharing. /// -/// A bytes slab wraps a `Bytes` and maintains a valid (written) length, and supports writing after +/// A bytes slab wraps a `BytesMut` and maintains a valid (written) length, and supports writing after /// this valid length, and extracting `Bytes` up to this valid length. Extracted bytes are enqueued /// and checked for uniqueness in order to recycle them (once all shared references are dropped). pub struct BytesSlab { - buffer: Bytes, // current working buffer. - in_progress: Vec>, // buffers shared with workers. - stash: Vec, // reclaimed and reusable buffers. + buffer: BytesMut, // current working buffer. + in_progress: Vec>, // buffers shared with workers. + stash: Vec, // reclaimed and reusable buffers. shift: usize, // current buffer allocation size. valid: usize, // buffer[..valid] are valid bytes. } @@ -19,7 +19,7 @@ impl BytesSlab { /// Allocates a new `BytesSlab` with an initial size determined by a shift. pub fn new(shift: usize) -> Self { BytesSlab { - buffer: Bytes::from(vec![0u8; 1 << shift].into_boxed_slice()), + buffer: BytesMut::from(vec![0u8; 1 << shift].into_boxed_slice()), in_progress: Vec::new(), stash: Vec::new(), shift, @@ -82,7 +82,7 @@ impl BytesSlab { self.in_progress.retain(|x| x.is_some()); } - let new_buffer = self.stash.pop().unwrap_or_else(|| Bytes::from(vec![0; 1 << self.shift].into_boxed_slice())); + let new_buffer = self.stash.pop().unwrap_or_else(|| BytesMut::from(vec![0; 1 << self.shift].into_boxed_slice())); let old_buffer = ::std::mem::replace(&mut self.buffer, new_buffer); self.buffer[.. self.valid].copy_from_slice(&old_buffer[.. self.valid]); diff --git a/communication/src/allocator/zero_copy/tcp.rs b/communication/src/allocator/zero_copy/tcp.rs index ecf9bddc4..0bb89a0ea 100644 --- a/communication/src/allocator/zero_copy/tcp.rs +++ b/communication/src/allocator/zero_copy/tcp.rs @@ -174,12 +174,12 @@ pub fn send_loop( } else { // TODO: Could do scatter/gather write here. - for mut bytes in stash.drain(..) { + for bytes in stash.drain(..) { // Record message sends. logger.as_mut().map(|logger| { let mut offset = 0; - while let Some(header) = MessageHeader::try_read(&mut bytes[offset..]) { + while let Some(header) = MessageHeader::try_read(&bytes[offset..]) { logger.log(MessageEvent { is_send: true, header, }); offset += header.required_bytes(); } diff --git a/communication/src/networking.rs b/communication/src/networking.rs index dbd8933cd..754424955 100644 --- a/communication/src/networking.rs +++ b/communication/src/networking.rs @@ -44,7 +44,7 @@ impl MessageHeader { /// Returns a header when there is enough supporting data #[inline] - pub fn try_read(bytes: &mut [u8]) -> Option { + pub fn try_read(bytes: &[u8]) -> Option { let mut cursor = io::Cursor::new(&bytes[..]); let mut buffer = [0; Self::FIELDS]; cursor.read_u64_into::(&mut buffer).ok()?;