Skip to content

Commit 5bbf7c2

Browse files
committed
Introduce and use Bytes
1 parent a3fee2c commit 5bbf7c2

File tree

14 files changed

+118
-53
lines changed

14 files changed

+118
-53
lines changed

bytes/src/lib.rs

Lines changed: 76 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ pub mod arc {
5555
sequestered: Arc<dyn Any>,
5656
}
5757

58-
// Synchronization happens through `self.sequestered`, which mean to ensure that even
58+
// Synchronization happens through `self.sequestered`, which means to ensure that even
5959
// across multiple threads each region of the slice is uniquely "owned", if not in the
6060
// traditional Rust sense.
6161
unsafe impl Send for BytesMut { }
@@ -90,7 +90,7 @@ pub mod arc {
9090
///
9191
/// This method first tests `index` against `self.len`, which should ensure that both
9292
/// the returned `BytesMut` contains valid memory, and that `self` can no longer access it.
93-
pub fn extract_to(&mut self, index: usize) -> BytesMut {
93+
pub fn extract_to(&mut self, index: usize) -> Bytes {
9494

9595
assert!(index <= self.len);
9696

@@ -103,7 +103,7 @@ pub mod arc {
103103
self.ptr = self.ptr.wrapping_add(index);
104104
self.len -= index;
105105

106-
result
106+
result.freeze()
107107
}
108108

109109
/// Regenerates the BytesMut if it is uniquely held.
@@ -142,6 +142,77 @@ pub mod arc {
142142
}
143143
}
144144

145+
/// Converts a writeable byte slice to an shareable byte slice.
146+
pub fn freeze(self) -> Bytes {
147+
Bytes {
148+
ptr: self.ptr,
149+
len: self.len,
150+
sequestered: self.sequestered,
151+
}
152+
}
153+
}
154+
155+
impl Deref for BytesMut {
156+
type Target = [u8];
157+
fn deref(&self) -> &[u8] {
158+
unsafe { ::std::slice::from_raw_parts(self.ptr, self.len) }
159+
}
160+
}
161+
162+
impl DerefMut for BytesMut {
163+
fn deref_mut(&mut self) -> &mut [u8] {
164+
unsafe { ::std::slice::from_raw_parts_mut(self.ptr, self.len) }
165+
}
166+
}
167+
168+
169+
/// A thread-safe shared byte buffer backed by a shared allocation.
170+
///
171+
/// An instance of this type contends that `ptr` is valid for `len` bytes,
172+
/// and that no other mutable reference to these bytes exists, other than
173+
/// through the type currently held in `sequestered`.
174+
#[derive(Clone)]
175+
pub struct Bytes {
176+
/// Pointer to the start of this slice (not the allocation).
177+
ptr: *const u8,
178+
/// Length of this slice.
179+
len: usize,
180+
/// Shared access to underlying resources.
181+
///
182+
/// Importantly, this is unavailable for as long as the struct exists, which may
183+
/// prevent shared access to ptr[0 .. len]. I'm not sure I understand Rust's rules
184+
/// enough to make a stronger statement about this.
185+
sequestered: Arc<dyn Any>,
186+
}
187+
188+
// Synchronization happens through `self.sequestered`, which means to ensure that even
189+
// across multiple threads the referenced range of bytes remain valid.
190+
unsafe impl Send for Bytes { }
191+
192+
impl Bytes {
193+
194+
/// Extracts [0, index) into a new `BytesMut` which is returned, updating `self`.
195+
///
196+
/// # Safety
197+
///
198+
/// This method first tests `index` against `self.len`, which should ensure that both
199+
/// the returned `BytesMut` contains valid memory, and that `self` can no longer access it.
200+
pub fn extract_to(&mut self, index: usize) -> Bytes {
201+
202+
assert!(index <= self.len);
203+
204+
let result = Bytes {
205+
ptr: self.ptr,
206+
len: index,
207+
sequestered: self.sequestered.clone(),
208+
};
209+
210+
self.ptr = self.ptr.wrapping_add(index);
211+
self.len -= index;
212+
213+
result
214+
}
215+
145216
/// Attempts to merge adjacent slices from the same allocation.
146217
///
147218
/// If the merge succeeds then `other.len` is added to `self` and the result is `Ok(())`.
@@ -164,7 +235,7 @@ pub mod arc {
164235
/// shared2.try_merge(shared1).ok().expect("Failed to merge 23 and 1");
165236
/// shared4.try_merge(shared2).ok().expect("Failed to merge 4 and 231");
166237
/// ```
167-
pub fn try_merge(&mut self, other: BytesMut) -> Result<(), BytesMut> {
238+
pub fn try_merge(&mut self, other: Bytes) -> Result<(), Bytes> {
168239
if Arc::ptr_eq(&self.sequestered, &other.sequestered) && ::std::ptr::eq(self.ptr.wrapping_add(self.len), other.ptr) {
169240
self.len += other.len;
170241
Ok(())
@@ -175,16 +246,10 @@ pub mod arc {
175246
}
176247
}
177248

178-
impl Deref for BytesMut {
249+
impl Deref for Bytes {
179250
type Target = [u8];
180251
fn deref(&self) -> &[u8] {
181252
unsafe { ::std::slice::from_raw_parts(self.ptr, self.len) }
182253
}
183254
}
184-
185-
impl DerefMut for BytesMut {
186-
fn deref_mut(&mut self) -> &mut [u8] {
187-
unsafe { ::std::slice::from_raw_parts_mut(self.ptr, self.len) }
188-
}
189-
}
190255
}

communication/examples/comm_hello.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ pub struct Message {
77
}
88

99
impl Bytesable for Message {
10-
fn from_bytes(bytes: timely_bytes::arc::BytesMut) -> Self {
10+
fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self {
1111
Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() }
1212
}
1313

communication/src/allocator/zero_copy/allocator.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::cell::RefCell;
44
use std::collections::{VecDeque, HashMap, hash_map::Entry};
55
use crossbeam_channel::{Sender, Receiver};
66

7-
use timely_bytes::arc::BytesMut;
7+
use timely_bytes::arc::Bytes;
88

99
use crate::networking::MessageHeader;
1010

@@ -121,15 +121,15 @@ pub struct TcpAllocator<A: Allocate> {
121121
index: usize, // number out of peers
122122
peers: usize, // number of peer allocators (for typed channel allocation).
123123

124-
staged: Vec<BytesMut>, // staging area for incoming Bytes
124+
staged: Vec<Bytes>, // staging area for incoming Bytes
125125
canaries: Rc<RefCell<Vec<usize>>>,
126126

127127
channel_id_bound: Option<usize>,
128128

129129
// sending, receiving, and responding to binary buffers.
130130
sends: Vec<Rc<RefCell<SendEndpoint<MergeQueue>>>>, // sends[x] -> goes to process x.
131131
recvs: Vec<MergeQueue>, // recvs[x] <- from process x.
132-
to_local: HashMap<usize, Rc<RefCell<VecDeque<BytesMut>>>>,// to worker-local typed pullers.
132+
to_local: HashMap<usize, Rc<RefCell<VecDeque<Bytes>>>>, // to worker-local typed pullers.
133133
}
134134

135135
impl<A: Allocate> Allocate for TcpAllocator<A> {
@@ -220,7 +220,7 @@ impl<A: Allocate> Allocate for TcpAllocator<A> {
220220
// No splitting occurs across allocations.
221221
while bytes.len() > 0 {
222222

223-
if let Some(header) = MessageHeader::try_read(&mut bytes[..]) {
223+
if let Some(header) = MessageHeader::try_read(&bytes[..]) {
224224

225225
// Get the header and payload, ditch the header.
226226
let mut peel = bytes.extract_to(header.required_bytes());

communication/src/allocator/zero_copy/allocator_process.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::cell::RefCell;
55
use std::collections::{VecDeque, HashMap, hash_map::Entry};
66
use crossbeam_channel::{Sender, Receiver};
77

8-
use timely_bytes::arc::BytesMut;
8+
use timely_bytes::arc::Bytes;
99

1010
use crate::networking::MessageHeader;
1111

@@ -110,10 +110,10 @@ pub struct ProcessAllocator {
110110
channel_id_bound: Option<usize>,
111111

112112
// sending, receiving, and responding to binary buffers.
113-
staged: Vec<BytesMut>,
113+
staged: Vec<Bytes>,
114114
sends: Vec<Rc<RefCell<SendEndpoint<MergeQueue>>>>, // sends[x] -> goes to thread x.
115115
recvs: Vec<MergeQueue>, // recvs[x] <- from thread x.
116-
to_local: HashMap<usize, Rc<RefCell<VecDeque<BytesMut>>>>, // to worker-local typed pullers.
116+
to_local: HashMap<usize, Rc<RefCell<VecDeque<Bytes>>>>, // to worker-local typed pullers.
117117
}
118118

119119
impl Allocate for ProcessAllocator {
@@ -188,7 +188,7 @@ impl Allocate for ProcessAllocator {
188188
// No splitting occurs across allocations.
189189
while bytes.len() > 0 {
190190

191-
if let Some(header) = MessageHeader::try_read(&mut bytes[..]) {
191+
if let Some(header) = MessageHeader::try_read(&bytes[..]) {
192192

193193
// Get the header and payload, ditch the header.
194194
let mut peel = bytes.extract_to(header.required_bytes());

communication/src/allocator/zero_copy/bytes_exchange.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,22 @@
33
use std::sync::{Arc, Mutex};
44
use std::collections::VecDeque;
55

6-
use timely_bytes::arc::BytesMut;
6+
use timely_bytes::arc::Bytes;
77
use super::bytes_slab::BytesSlab;
88

99
/// A target for `Bytes`.
1010
pub trait BytesPush {
1111
// /// Pushes bytes at the instance.
1212
// fn push(&mut self, bytes: Bytes);
1313
/// Pushes many bytes at the instance.
14-
fn extend<I: IntoIterator<Item=BytesMut>>(&mut self, iter: I);
14+
fn extend<I: IntoIterator<Item=Bytes>>(&mut self, iter: I);
1515
}
1616
/// A source for `Bytes`.
1717
pub trait BytesPull {
1818
// /// Pulls bytes from the instance.
1919
// fn pull(&mut self) -> Option<Bytes>;
2020
/// Drains many bytes from the instance.
21-
fn drain_into(&mut self, vec: &mut Vec<BytesMut>);
21+
fn drain_into(&mut self, vec: &mut Vec<Bytes>);
2222
}
2323

2424
use std::sync::atomic::{AtomicBool, Ordering};
@@ -28,7 +28,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
2828
/// TODO: explain "extend"
2929
#[derive(Clone)]
3030
pub struct MergeQueue {
31-
queue: Arc<Mutex<VecDeque<BytesMut>>>, // queue of bytes.
31+
queue: Arc<Mutex<VecDeque<Bytes>>>, // queue of bytes.
3232
buzzer: crate::buzzer::Buzzer, // awakens receiver thread.
3333
panic: Arc<AtomicBool>,
3434
}
@@ -50,7 +50,7 @@ impl MergeQueue {
5050
}
5151

5252
impl BytesPush for MergeQueue {
53-
fn extend<I: IntoIterator<Item=BytesMut>>(&mut self, iterator: I) {
53+
fn extend<I: IntoIterator<Item=Bytes>>(&mut self, iterator: I) {
5454

5555
if self.panic.load(Ordering::SeqCst) { panic!("MergeQueue poisoned."); }
5656

@@ -92,7 +92,7 @@ impl BytesPush for MergeQueue {
9292
}
9393

9494
impl BytesPull for MergeQueue {
95-
fn drain_into(&mut self, vec: &mut Vec<BytesMut>) {
95+
fn drain_into(&mut self, vec: &mut Vec<Bytes>) {
9696
if self.panic.load(Ordering::SeqCst) { panic!("MergeQueue poisoned."); }
9797

9898
// try to acquire lock without going to sleep (Rust's lock() might yield)

communication/src/allocator/zero_copy/bytes_slab.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
//! A large binary allocation for writing and sharing.
22
3-
use timely_bytes::arc::BytesMut;
3+
use timely_bytes::arc::{Bytes, BytesMut};
44

55
/// A large binary allocation for writing and sharing.
66
///
@@ -39,7 +39,7 @@ impl BytesSlab {
3939
self.valid += bytes;
4040
}
4141
/// Extracts the first `bytes` valid bytes.
42-
pub fn extract(&mut self, bytes: usize) -> BytesMut {
42+
pub fn extract(&mut self, bytes: usize) -> Bytes {
4343
debug_assert!(bytes <= self.valid);
4444
self.valid -= bytes;
4545
self.buffer.extract_to(bytes)

communication/src/allocator/zero_copy/push_pull.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::rc::Rc;
44
use std::cell::RefCell;
55
use std::collections::VecDeque;
66

7-
use timely_bytes::arc::BytesMut;
7+
use timely_bytes::arc::Bytes;
88

99
use crate::allocator::canary::Canary;
1010
use crate::networking::MessageHeader;
@@ -67,12 +67,12 @@ impl<T: Bytesable, P: BytesPush> Push<T> for Pusher<T, P> {
6767
pub struct Puller<T> {
6868
_canary: Canary,
6969
current: Option<T>,
70-
receiver: Rc<RefCell<VecDeque<BytesMut>>>, // source of serialized buffers
70+
receiver: Rc<RefCell<VecDeque<Bytes>>>, // source of serialized buffers
7171
}
7272

7373
impl<T: Bytesable> Puller<T> {
7474
/// Creates a new `Puller` instance from a shared queue.
75-
pub fn new(receiver: Rc<RefCell<VecDeque<BytesMut>>>, _canary: Canary) -> Puller<T> {
75+
pub fn new(receiver: Rc<RefCell<VecDeque<Bytes>>>, _canary: Canary) -> Puller<T> {
7676
Puller {
7777
_canary,
7878
current: None,
@@ -104,12 +104,12 @@ pub struct PullerInner<T> {
104104
inner: Box<dyn Pull<T>>, // inner pullable (e.g. intra-process typed queue)
105105
_canary: Canary,
106106
current: Option<T>,
107-
receiver: Rc<RefCell<VecDeque<BytesMut>>>, // source of serialized buffers
107+
receiver: Rc<RefCell<VecDeque<Bytes>>>, // source of serialized buffers
108108
}
109109

110110
impl<T: Bytesable> PullerInner<T> {
111111
/// Creates a new `PullerInner` instance from a shared queue.
112-
pub fn new(inner: Box<dyn Pull<T>>, receiver: Rc<RefCell<VecDeque<BytesMut>>>, _canary: Canary) -> Self {
112+
pub fn new(inner: Box<dyn Pull<T>>, receiver: Rc<RefCell<VecDeque<Bytes>>>, _canary: Canary) -> Self {
113113
PullerInner {
114114
inner,
115115
_canary,

communication/src/allocator/zero_copy/tcp.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,12 +174,12 @@ pub fn send_loop<S: Stream>(
174174
}
175175
else {
176176
// TODO: Could do scatter/gather write here.
177-
for mut bytes in stash.drain(..) {
177+
for bytes in stash.drain(..) {
178178

179179
// Record message sends.
180180
logger.as_mut().map(|logger| {
181181
let mut offset = 0;
182-
while let Some(header) = MessageHeader::try_read(&mut bytes[offset..]) {
182+
while let Some(header) = MessageHeader::try_read(&bytes[offset..]) {
183183
logger.log(MessageEvent { is_send: true, header, });
184184
offset += header.required_bytes();
185185
}

communication/src/initialize.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ impl Config {
189189
/// }
190190
///
191191
/// impl Bytesable for Message {
192-
/// fn from_bytes(bytes: timely_bytes::arc::BytesMut) -> Self {
192+
/// fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self {
193193
/// Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() }
194194
/// }
195195
///
@@ -285,7 +285,7 @@ pub fn initialize<T:Send+'static, F: Fn(Generic)->T+Send+Sync+'static>(
285285
/// }
286286
///
287287
/// impl Bytesable for Message {
288-
/// fn from_bytes(bytes: timely_bytes::arc::BytesMut) -> Self {
288+
/// fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self {
289289
/// Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() }
290290
/// }
291291
///

communication/src/lib.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
//! }
2525
//!
2626
//! impl Bytesable for Message {
27-
//! fn from_bytes(bytes: timely_bytes::arc::BytesMut) -> Self {
27+
//! fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self {
2828
//! Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() }
2929
//! }
3030
//!
@@ -107,12 +107,12 @@ pub use allocator::Generic as Allocator;
107107
pub use allocator::{Allocate, Exchangeable};
108108
pub use initialize::{initialize, initialize_from, Config, WorkerGuards};
109109

110-
use timely_bytes::arc::BytesMut;
110+
use timely_bytes::arc::Bytes;
111111

112-
/// A type that can be serialized and deserialized through `BytesMut`.
112+
/// A type that can be serialized and deserialized through `Bytes`.
113113
pub trait Bytesable {
114114
/// Wrap bytes as `Self`.
115-
fn from_bytes(bytes: BytesMut) -> Self;
115+
fn from_bytes(bytes: Bytes) -> Self;
116116

117117
/// The number of bytes required to serialize the data.
118118
fn length_in_bytes(&self) -> usize;

0 commit comments

Comments
 (0)