Skip to content

Commit 38ef688

Browse files
bytes: Separate Bytes and BytesMut (#637)
* Remove some unsafe * Rename Bytes to BytesMut * Introduce and use Bytes * Update comments and doctests * Comments, inlines, and pruning
1 parent 6826f06 commit 38ef688

File tree

6 files changed

+116
-46
lines changed

6 files changed

+116
-46
lines changed

bytes/src/lib.rs

Lines changed: 104 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
//! A simplified implementation of the `bytes` crate, with different features, less safety.
22
//!
3+
//! The crate is currently minimalist rather than maximalist, and for example does not support
4+
//! methods on `BytesMut` that seem like they should be safe, because they are not yet needed.
5+
//! For example, `BytesMut` should be able to implement `Send`, and `BytesMut::extract_to` could
6+
//! return a `BytesMut` rather than a `Bytes`.
7+
//!
38
//! # Examples
49
//!
510
//! ```
6-
//! use timely_bytes::arc::Bytes;
11+
//! use timely_bytes::arc::BytesMut;
712
//!
813
//! let bytes = vec![0u8; 1024];
9-
//! let mut shared1 = Bytes::from(bytes);
14+
//! let mut shared1 = BytesMut::from(bytes);
1015
//! let mut shared2 = shared1.extract_to(100);
1116
//! let mut shared3 = shared1.extract_to(100);
1217
//! let mut shared4 = shared2.extract_to(60);
@@ -17,13 +22,10 @@
1722
//! assert_eq!(shared4.len(), 60);
1823
//!
1924
//! for byte in shared1.iter_mut() { *byte = 1u8; }
20-
//! for byte in shared2.iter_mut() { *byte = 2u8; }
21-
//! for byte in shared3.iter_mut() { *byte = 3u8; }
22-
//! for byte in shared4.iter_mut() { *byte = 4u8; }
2325
//!
2426
//! // memory in slabs [4, 2, 3, 1]: merge back in arbitrary order.
2527
//! shared2.try_merge(shared3).ok().expect("Failed to merge 2 and 3");
26-
//! shared2.try_merge(shared1).ok().expect("Failed to merge 23 and 1");
28+
//! shared2.try_merge(shared1.freeze()).ok().expect("Failed to merge 23 and 1");
2729
//! shared4.try_merge(shared2).ok().expect("Failed to merge 4 and 231");
2830
//!
2931
//! assert_eq!(shared4.len(), 1024);
@@ -38,7 +40,11 @@ pub mod arc {
3840
use std::any::Any;
3941

4042
/// A thread-safe byte buffer backed by a shared allocation.
41-
pub struct Bytes {
43+
///
44+
/// An instance of this type contends that `ptr` is valid for `len` bytes,
45+
/// and that no other reference to these bytes exists, other than through
46+
/// the type currently held in `sequestered`.
47+
pub struct BytesMut {
4248
/// Pointer to the start of this slice (not the allocation).
4349
ptr: *mut u8,
4450
/// Length of this slice.
@@ -51,15 +57,10 @@ pub mod arc {
5157
sequestered: Arc<dyn Any>,
5258
}
5359

54-
// Synchronization happens through `self.sequestered`, which mean to ensure that even
55-
// across multiple threads each region of the slice is uniquely "owned", if not in the
56-
// traditional Rust sense.
57-
unsafe impl Send for Bytes { }
58-
59-
impl Bytes {
60+
impl BytesMut {
6061

6162
/// Create a new instance from a byte allocation.
62-
pub fn from<B>(bytes: B) -> Bytes where B : DerefMut<Target=[u8]>+'static {
63+
pub fn from<B>(bytes: B) -> BytesMut where B : DerefMut<Target=[u8]>+'static {
6364

6465
// Sequester allocation behind an `Arc`, which *should* keep the address
6566
// stable for the lifetime of `sequestered`. The `Arc` also serves as our
@@ -73,7 +74,7 @@ pub mod arc {
7374
.map(|a| (a.as_mut_ptr(), a.len()))
7475
.unwrap();
7576

76-
Bytes {
77+
BytesMut {
7778
ptr,
7879
len,
7980
sequestered,
@@ -90,40 +91,40 @@ pub mod arc {
9091

9192
assert!(index <= self.len);
9293

93-
let result = Bytes {
94+
let result = BytesMut {
9495
ptr: self.ptr,
9596
len: index,
9697
sequestered: self.sequestered.clone(),
9798
};
9899

99-
unsafe { self.ptr = self.ptr.add(index); }
100+
self.ptr = self.ptr.wrapping_add(index);
100101
self.len -= index;
101102

102-
result
103+
result.freeze()
103104
}
104105

105-
/// Regenerates the Bytes if it is uniquely held.
106+
/// Regenerates the BytesMut if it is uniquely held.
106107
///
107108
/// If uniquely held, this method recovers the initial pointer and length
108-
/// of the sequestered allocation and re-initializes the Bytes. The return
109+
/// of the sequestered allocation and re-initializes the BytesMut. The return
109110
/// value indicates whether this occurred.
110111
///
111112
/// # Examples
112113
///
113114
/// ```
114-
/// use timely_bytes::arc::Bytes;
115+
/// use timely_bytes::arc::BytesMut;
115116
///
116117
/// let bytes = vec![0u8; 1024];
117-
/// let mut shared1 = Bytes::from(bytes);
118+
/// let mut shared1 = BytesMut::from(bytes);
118119
/// let mut shared2 = shared1.extract_to(100);
119120
/// let mut shared3 = shared1.extract_to(100);
120121
/// let mut shared4 = shared2.extract_to(60);
121122
///
122-
/// drop(shared1);
123+
/// drop(shared3);
123124
/// drop(shared2);
124125
/// drop(shared4);
125-
/// assert!(shared3.try_regenerate::<Vec<u8>>());
126-
/// assert!(shared3.len() == 1024);
126+
/// assert!(shared1.try_regenerate::<Vec<u8>>());
127+
/// assert!(shared1.len() == 1024);
127128
/// ```
128129
pub fn try_regenerate<B>(&mut self) -> bool where B: DerefMut<Target=[u8]>+'static {
129130
// Only possible if this is the only reference to the sequestered allocation.
@@ -138,6 +139,80 @@ pub mod arc {
138139
}
139140
}
140141

142+
/// Converts a writeable byte slice to a shareable byte slice.
143+
#[inline(always)]
144+
pub fn freeze(self) -> Bytes {
145+
Bytes {
146+
ptr: self.ptr,
147+
len: self.len,
148+
sequestered: self.sequestered,
149+
}
150+
}
151+
}
152+
153+
impl Deref for BytesMut {
154+
type Target = [u8];
155+
#[inline(always)]
156+
fn deref(&self) -> &[u8] {
157+
unsafe { ::std::slice::from_raw_parts(self.ptr, self.len) }
158+
}
159+
}
160+
161+
impl DerefMut for BytesMut {
162+
#[inline(always)]
163+
fn deref_mut(&mut self) -> &mut [u8] {
164+
unsafe { ::std::slice::from_raw_parts_mut(self.ptr, self.len) }
165+
}
166+
}
167+
168+
169+
/// A thread-safe shared byte buffer backed by a shared allocation.
170+
///
171+
/// An instance of this type contends that `ptr` is valid for `len` bytes,
172+
/// and that no other mutable reference to these bytes exists, other than
173+
/// through the type currently held in `sequestered`.
174+
#[derive(Clone)]
175+
pub struct Bytes {
176+
/// Pointer to the start of this slice (not the allocation).
177+
ptr: *const u8,
178+
/// Length of this slice.
179+
len: usize,
180+
/// Shared access to underlying resources.
181+
///
182+
/// Importantly, this is unavailable for as long as the struct exists, which may
183+
/// prevent shared access to ptr[0 .. len]. I'm not sure I understand Rust's rules
184+
/// enough to make a stronger statement about this.
185+
sequestered: Arc<dyn Any>,
186+
}
187+
188+
// Synchronization happens through `self.sequestered`, which means to ensure that even
189+
// across multiple threads the referenced range of bytes remain valid.
190+
unsafe impl Send for Bytes { }
191+
192+
impl Bytes {
193+
194+
/// Extracts [0, index) into a new `Bytes` which is returned, updating `self`.
195+
///
196+
/// # Safety
197+
///
198+
/// This method first tests `index` against `self.len`, which should ensure that both
199+
/// the returned `Bytes` contains valid memory, and that `self` can no longer access it.
200+
pub fn extract_to(&mut self, index: usize) -> Bytes {
201+
202+
assert!(index <= self.len);
203+
204+
let result = Bytes {
205+
ptr: self.ptr,
206+
len: index,
207+
sequestered: self.sequestered.clone(),
208+
};
209+
210+
self.ptr = self.ptr.wrapping_add(index);
211+
self.len -= index;
212+
213+
result
214+
}
215+
141216
/// Attempts to merge adjacent slices from the same allocation.
142217
///
143218
/// If the merge succeeds then `other.len` is added to `self` and the result is `Ok(())`.
@@ -147,10 +222,10 @@ pub mod arc {
147222
/// # Examples
148223
///
149224
/// ```
150-
/// use timely_bytes::arc::Bytes;
225+
/// use timely_bytes::arc::BytesMut;
151226
///
152227
/// let bytes = vec![0u8; 1024];
153-
/// let mut shared1 = Bytes::from(bytes);
228+
/// let mut shared1 = BytesMut::from(bytes).freeze();
154229
/// let mut shared2 = shared1.extract_to(100);
155230
/// let mut shared3 = shared1.extract_to(100);
156231
/// let mut shared4 = shared2.extract_to(60);
@@ -161,7 +236,7 @@ pub mod arc {
161236
/// shared4.try_merge(shared2).ok().expect("Failed to merge 4 and 231");
162237
/// ```
163238
pub fn try_merge(&mut self, other: Bytes) -> Result<(), Bytes> {
164-
if Arc::ptr_eq(&self.sequestered, &other.sequestered) && ::std::ptr::eq(unsafe { self.ptr.add(self.len) }, other.ptr) {
239+
if Arc::ptr_eq(&self.sequestered, &other.sequestered) && ::std::ptr::eq(self.ptr.wrapping_add(self.len), other.ptr) {
165240
self.len += other.len;
166241
Ok(())
167242
}
@@ -173,14 +248,9 @@ pub mod arc {
173248

174249
impl Deref for Bytes {
175250
type Target = [u8];
251+
#[inline(always)]
176252
fn deref(&self) -> &[u8] {
177253
unsafe { ::std::slice::from_raw_parts(self.ptr, self.len) }
178254
}
179255
}
180-
181-
impl DerefMut for Bytes {
182-
fn deref_mut(&mut self) -> &mut [u8] {
183-
unsafe { ::std::slice::from_raw_parts_mut(self.ptr, self.len) }
184-
}
185-
}
186256
}

communication/src/allocator/zero_copy/allocator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
220220
// No splitting occurs across allocations.
221221
while bytes.len() > 0 {
222222

223-
if let Some(header) = MessageHeader::try_read(&mut bytes[..]) {
223+
if let Some(header) = MessageHeader::try_read(&bytes[..]) {
224224

225225
// Get the header and payload, ditch the header.
226226
let mut peel = bytes.extract_to(header.required_bytes());

communication/src/allocator/zero_copy/allocator_process.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ impl Allocate for ProcessAllocator {
188188
// No splitting occurs across allocations.
189189
while bytes.len() > 0 {
190190

191-
if let Some(header) = MessageHeader::try_read(&mut bytes[..]) {
191+
if let Some(header) = MessageHeader::try_read(&bytes[..]) {
192192

193193
// Get the header and payload, ditch the header.
194194
let mut peel = bytes.extract_to(header.required_bytes());

communication/src/allocator/zero_copy/bytes_slab.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
//! A large binary allocation for writing and sharing.
22
3-
use timely_bytes::arc::Bytes;
3+
use timely_bytes::arc::{Bytes, BytesMut};
44

55
/// A large binary allocation for writing and sharing.
66
///
7-
/// A bytes slab wraps a `Bytes` and maintains a valid (written) length, and supports writing after
7+
/// A bytes slab wraps a `BytesMut` and maintains a valid (written) length, and supports writing after
88
/// this valid length, and extracting `Bytes` up to this valid length. Extracted bytes are enqueued
99
/// and checked for uniqueness in order to recycle them (once all shared references are dropped).
1010
pub struct BytesSlab {
11-
buffer: Bytes, // current working buffer.
12-
in_progress: Vec<Option<Bytes>>, // buffers shared with workers.
13-
stash: Vec<Bytes>, // reclaimed and reusable buffers.
11+
buffer: BytesMut, // current working buffer.
12+
in_progress: Vec<Option<BytesMut>>, // buffers shared with workers.
13+
stash: Vec<BytesMut>, // reclaimed and reusable buffers.
1414
shift: usize, // current buffer allocation size.
1515
valid: usize, // buffer[..valid] are valid bytes.
1616
}
@@ -19,7 +19,7 @@ impl BytesSlab {
1919
/// Allocates a new `BytesSlab` with an initial size determined by a shift.
2020
pub fn new(shift: usize) -> Self {
2121
BytesSlab {
22-
buffer: Bytes::from(vec![0u8; 1 << shift].into_boxed_slice()),
22+
buffer: BytesMut::from(vec![0u8; 1 << shift].into_boxed_slice()),
2323
in_progress: Vec::new(),
2424
stash: Vec::new(),
2525
shift,
@@ -82,7 +82,7 @@ impl BytesSlab {
8282
self.in_progress.retain(|x| x.is_some());
8383
}
8484

85-
let new_buffer = self.stash.pop().unwrap_or_else(|| Bytes::from(vec![0; 1 << self.shift].into_boxed_slice()));
85+
let new_buffer = self.stash.pop().unwrap_or_else(|| BytesMut::from(vec![0; 1 << self.shift].into_boxed_slice()));
8686
let old_buffer = ::std::mem::replace(&mut self.buffer, new_buffer);
8787

8888
self.buffer[.. self.valid].copy_from_slice(&old_buffer[.. self.valid]);

communication/src/allocator/zero_copy/tcp.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,12 +174,12 @@ pub fn send_loop<S: Stream>(
174174
}
175175
else {
176176
// TODO: Could do scatter/gather write here.
177-
for mut bytes in stash.drain(..) {
177+
for bytes in stash.drain(..) {
178178

179179
// Record message sends.
180180
logger.as_mut().map(|logger| {
181181
let mut offset = 0;
182-
while let Some(header) = MessageHeader::try_read(&mut bytes[offset..]) {
182+
while let Some(header) = MessageHeader::try_read(&bytes[offset..]) {
183183
logger.log(MessageEvent { is_send: true, header, });
184184
offset += header.required_bytes();
185185
}

communication/src/networking.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ impl MessageHeader {
4444

4545
/// Returns a header when there is enough supporting data
4646
#[inline]
47-
pub fn try_read(bytes: &mut [u8]) -> Option<MessageHeader> {
47+
pub fn try_read(bytes: &[u8]) -> Option<MessageHeader> {
4848
let mut cursor = io::Cursor::new(&bytes[..]);
4949
let mut buffer = [0; Self::FIELDS];
5050
cursor.read_u64_into::<ByteOrder>(&mut buffer).ok()?;

0 commit comments

Comments
 (0)