Skip to content

Commit a3fee2c

Browse files
committed
Rename Bytes to BytesMut
1 parent f8a8cd0 commit a3fee2c

File tree

12 files changed

+68
-64
lines changed

12 files changed

+68
-64
lines changed

bytes/src/lib.rs

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,10 @@
33
//! # Examples
44
//!
55
//! ```
6-
//! use timely_bytes::arc::Bytes;
6+
//! use timely_bytes::arc::BytesMut;
77
//!
88
//! let bytes = vec![0u8; 1024];
9-
//! let mut shared1 = Bytes::from(bytes);
9+
//! let mut shared1 = BytesMut::from(bytes);
1010
//! let mut shared2 = shared1.extract_to(100);
1111
//! let mut shared3 = shared1.extract_to(100);
1212
//! let mut shared4 = shared2.extract_to(60);
@@ -38,7 +38,11 @@ pub mod arc {
3838
use std::any::Any;
3939

4040
/// A thread-safe byte buffer backed by a shared allocation.
41-
pub struct Bytes {
41+
///
42+
/// An instance of this type contends that `ptr` is valid for `len` bytes,
43+
/// and that no other reference to these bytes exists, other than through
44+
/// the type currently held in `sequestered`.
45+
pub struct BytesMut {
4246
/// Pointer to the start of this slice (not the allocation).
4347
ptr: *mut u8,
4448
/// Length of this slice.
@@ -54,12 +58,12 @@ pub mod arc {
5458
// Synchronization happens through `self.sequestered`, which mean to ensure that even
5559
// across multiple threads each region of the slice is uniquely "owned", if not in the
5660
// traditional Rust sense.
57-
unsafe impl Send for Bytes { }
61+
unsafe impl Send for BytesMut { }
5862

59-
impl Bytes {
63+
impl BytesMut {
6064

6165
/// Create a new instance from a byte allocation.
62-
pub fn from<B>(bytes: B) -> Bytes where B : DerefMut<Target=[u8]>+'static {
66+
pub fn from<B>(bytes: B) -> BytesMut where B : DerefMut<Target=[u8]>+'static {
6367

6468
// Sequester allocation behind an `Arc`, which *should* keep the address
6569
// stable for the lifetime of `sequestered`. The `Arc` also serves as our
@@ -73,24 +77,24 @@ pub mod arc {
7377
.map(|a| (a.as_mut_ptr(), a.len()))
7478
.unwrap();
7579

76-
Bytes {
80+
BytesMut {
7781
ptr,
7882
len,
7983
sequestered,
8084
}
8185
}
8286

83-
/// Extracts [0, index) into a new `Bytes` which is returned, updating `self`.
87+
/// Extracts [0, index) into a new `BytesMut` which is returned, updating `self`.
8488
///
8589
/// # Safety
8690
///
8791
/// This method first tests `index` against `self.len`, which should ensure that both
88-
/// the returned `Bytes` contains valid memory, and that `self` can no longer access it.
89-
pub fn extract_to(&mut self, index: usize) -> Bytes {
92+
/// the returned `BytesMut` contains valid memory, and that `self` can no longer access it.
93+
pub fn extract_to(&mut self, index: usize) -> BytesMut {
9094

9195
assert!(index <= self.len);
9296

93-
let result = Bytes {
97+
let result = BytesMut {
9498
ptr: self.ptr,
9599
len: index,
96100
sequestered: self.sequestered.clone(),
@@ -102,19 +106,19 @@ pub mod arc {
102106
result
103107
}
104108

105-
/// Regenerates the Bytes if it is uniquely held.
109+
/// Regenerates the BytesMut if it is uniquely held.
106110
///
107111
/// If uniquely held, this method recovers the initial pointer and length
108-
/// of the sequestered allocation and re-initializes the Bytes. The return
112+
/// of the sequestered allocation and re-initializes the BytesMut. The return
109113
/// value indicates whether this occurred.
110114
///
111115
/// # Examples
112116
///
113117
/// ```
114-
/// use timely_bytes::arc::Bytes;
118+
/// use timely_bytes::arc::BytesMut;
115119
///
116120
/// let bytes = vec![0u8; 1024];
117-
/// let mut shared1 = Bytes::from(bytes);
121+
/// let mut shared1 = BytesMut::from(bytes);
118122
/// let mut shared2 = shared1.extract_to(100);
119123
/// let mut shared3 = shared1.extract_to(100);
120124
/// let mut shared4 = shared2.extract_to(60);
@@ -147,10 +151,10 @@ pub mod arc {
147151
/// # Examples
148152
///
149153
/// ```
150-
/// use timely_bytes::arc::Bytes;
154+
/// use timely_bytes::arc::BytesMut;
151155
///
152156
/// let bytes = vec![0u8; 1024];
153-
/// let mut shared1 = Bytes::from(bytes);
157+
/// let mut shared1 = BytesMut::from(bytes);
154158
/// let mut shared2 = shared1.extract_to(100);
155159
/// let mut shared3 = shared1.extract_to(100);
156160
/// let mut shared4 = shared2.extract_to(60);
@@ -160,7 +164,7 @@ pub mod arc {
160164
/// shared2.try_merge(shared1).ok().expect("Failed to merge 23 and 1");
161165
/// shared4.try_merge(shared2).ok().expect("Failed to merge 4 and 231");
162166
/// ```
163-
pub fn try_merge(&mut self, other: Bytes) -> Result<(), Bytes> {
167+
pub fn try_merge(&mut self, other: BytesMut) -> Result<(), BytesMut> {
164168
if Arc::ptr_eq(&self.sequestered, &other.sequestered) && ::std::ptr::eq(self.ptr.wrapping_add(self.len), other.ptr) {
165169
self.len += other.len;
166170
Ok(())
@@ -171,14 +175,14 @@ pub mod arc {
171175
}
172176
}
173177

174-
impl Deref for Bytes {
178+
impl Deref for BytesMut {
175179
type Target = [u8];
176180
fn deref(&self) -> &[u8] {
177181
unsafe { ::std::slice::from_raw_parts(self.ptr, self.len) }
178182
}
179183
}
180184

181-
impl DerefMut for Bytes {
185+
impl DerefMut for BytesMut {
182186
fn deref_mut(&mut self) -> &mut [u8] {
183187
unsafe { ::std::slice::from_raw_parts_mut(self.ptr, self.len) }
184188
}

communication/examples/comm_hello.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ pub struct Message {
77
}
88

99
impl Bytesable for Message {
10-
fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self {
10+
fn from_bytes(bytes: timely_bytes::arc::BytesMut) -> Self {
1111
Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() }
1212
}
1313

communication/src/allocator/zero_copy/allocator.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::cell::RefCell;
44
use std::collections::{VecDeque, HashMap, hash_map::Entry};
55
use crossbeam_channel::{Sender, Receiver};
66

7-
use timely_bytes::arc::Bytes;
7+
use timely_bytes::arc::BytesMut;
88

99
use crate::networking::MessageHeader;
1010

@@ -121,15 +121,15 @@ pub struct TcpAllocator<A: Allocate> {
121121
index: usize, // number out of peers
122122
peers: usize, // number of peer allocators (for typed channel allocation).
123123

124-
staged: Vec<Bytes>, // staging area for incoming Bytes
124+
staged: Vec<BytesMut>, // staging area for incoming Bytes
125125
canaries: Rc<RefCell<Vec<usize>>>,
126126

127127
channel_id_bound: Option<usize>,
128128

129129
// sending, receiving, and responding to binary buffers.
130130
sends: Vec<Rc<RefCell<SendEndpoint<MergeQueue>>>>, // sends[x] -> goes to process x.
131131
recvs: Vec<MergeQueue>, // recvs[x] <- from process x.
132-
to_local: HashMap<usize, Rc<RefCell<VecDeque<Bytes>>>>, // to worker-local typed pullers.
132+
to_local: HashMap<usize, Rc<RefCell<VecDeque<BytesMut>>>>,// to worker-local typed pullers.
133133
}
134134

135135
impl<A: Allocate> Allocate for TcpAllocator<A> {

communication/src/allocator/zero_copy/allocator_process.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::cell::RefCell;
55
use std::collections::{VecDeque, HashMap, hash_map::Entry};
66
use crossbeam_channel::{Sender, Receiver};
77

8-
use timely_bytes::arc::Bytes;
8+
use timely_bytes::arc::BytesMut;
99

1010
use crate::networking::MessageHeader;
1111

@@ -110,10 +110,10 @@ pub struct ProcessAllocator {
110110
channel_id_bound: Option<usize>,
111111

112112
// sending, receiving, and responding to binary buffers.
113-
staged: Vec<Bytes>,
113+
staged: Vec<BytesMut>,
114114
sends: Vec<Rc<RefCell<SendEndpoint<MergeQueue>>>>, // sends[x] -> goes to thread x.
115115
recvs: Vec<MergeQueue>, // recvs[x] <- from thread x.
116-
to_local: HashMap<usize, Rc<RefCell<VecDeque<Bytes>>>>, // to worker-local typed pullers.
116+
to_local: HashMap<usize, Rc<RefCell<VecDeque<BytesMut>>>>, // to worker-local typed pullers.
117117
}
118118

119119
impl Allocate for ProcessAllocator {

communication/src/allocator/zero_copy/bytes_exchange.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,22 @@
33
use std::sync::{Arc, Mutex};
44
use std::collections::VecDeque;
55

6-
use timely_bytes::arc::Bytes;
6+
use timely_bytes::arc::BytesMut;
77
use super::bytes_slab::BytesSlab;
88

99
/// A target for `Bytes`.
1010
pub trait BytesPush {
1111
// /// Pushes bytes at the instance.
1212
// fn push(&mut self, bytes: Bytes);
1313
/// Pushes many bytes at the instance.
14-
fn extend<I: IntoIterator<Item=Bytes>>(&mut self, iter: I);
14+
fn extend<I: IntoIterator<Item=BytesMut>>(&mut self, iter: I);
1515
}
1616
/// A source for `Bytes`.
1717
pub trait BytesPull {
1818
// /// Pulls bytes from the instance.
1919
// fn pull(&mut self) -> Option<Bytes>;
2020
/// Drains many bytes from the instance.
21-
fn drain_into(&mut self, vec: &mut Vec<Bytes>);
21+
fn drain_into(&mut self, vec: &mut Vec<BytesMut>);
2222
}
2323

2424
use std::sync::atomic::{AtomicBool, Ordering};
@@ -28,7 +28,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
2828
/// TODO: explain "extend"
2929
#[derive(Clone)]
3030
pub struct MergeQueue {
31-
queue: Arc<Mutex<VecDeque<Bytes>>>, // queue of bytes.
31+
queue: Arc<Mutex<VecDeque<BytesMut>>>, // queue of bytes.
3232
buzzer: crate::buzzer::Buzzer, // awakens receiver thread.
3333
panic: Arc<AtomicBool>,
3434
}
@@ -50,7 +50,7 @@ impl MergeQueue {
5050
}
5151

5252
impl BytesPush for MergeQueue {
53-
fn extend<I: IntoIterator<Item=Bytes>>(&mut self, iterator: I) {
53+
fn extend<I: IntoIterator<Item=BytesMut>>(&mut self, iterator: I) {
5454

5555
if self.panic.load(Ordering::SeqCst) { panic!("MergeQueue poisoned."); }
5656

@@ -92,7 +92,7 @@ impl BytesPush for MergeQueue {
9292
}
9393

9494
impl BytesPull for MergeQueue {
95-
fn drain_into(&mut self, vec: &mut Vec<Bytes>) {
95+
fn drain_into(&mut self, vec: &mut Vec<BytesMut>) {
9696
if self.panic.load(Ordering::SeqCst) { panic!("MergeQueue poisoned."); }
9797

9898
// try to acquire lock without going to sleep (Rust's lock() might yield)

communication/src/allocator/zero_copy/bytes_slab.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
//! A large binary allocation for writing and sharing.
22
3-
use timely_bytes::arc::Bytes;
3+
use timely_bytes::arc::BytesMut;
44

55
/// A large binary allocation for writing and sharing.
66
///
7-
/// A bytes slab wraps a `Bytes` and maintains a valid (written) length, and supports writing after
8-
/// this valid length, and extracting `Bytes` up to this valid length. Extracted bytes are enqueued
7+
/// A bytes slab wraps a `BytesMut` and maintains a valid (written) length, and supports writing after
8+
/// this valid length, and extracting `BytesMut` up to this valid length. Extracted bytes are enqueued
99
/// and checked for uniqueness in order to recycle them (once all shared references are dropped).
1010
pub struct BytesSlab {
11-
buffer: Bytes, // current working buffer.
12-
in_progress: Vec<Option<Bytes>>, // buffers shared with workers.
13-
stash: Vec<Bytes>, // reclaimed and reusable buffers.
11+
buffer: BytesMut, // current working buffer.
12+
in_progress: Vec<Option<BytesMut>>, // buffers shared with workers.
13+
stash: Vec<BytesMut>, // reclaimed and reusable buffers.
1414
shift: usize, // current buffer allocation size.
1515
valid: usize, // buffer[..valid] are valid bytes.
1616
}
@@ -19,7 +19,7 @@ impl BytesSlab {
1919
/// Allocates a new `BytesSlab` with an initial size determined by a shift.
2020
pub fn new(shift: usize) -> Self {
2121
BytesSlab {
22-
buffer: Bytes::from(vec![0u8; 1 << shift].into_boxed_slice()),
22+
buffer: BytesMut::from(vec![0u8; 1 << shift].into_boxed_slice()),
2323
in_progress: Vec::new(),
2424
stash: Vec::new(),
2525
shift,
@@ -39,7 +39,7 @@ impl BytesSlab {
3939
self.valid += bytes;
4040
}
4141
/// Extracts the first `bytes` valid bytes.
42-
pub fn extract(&mut self, bytes: usize) -> Bytes {
42+
pub fn extract(&mut self, bytes: usize) -> BytesMut {
4343
debug_assert!(bytes <= self.valid);
4444
self.valid -= bytes;
4545
self.buffer.extract_to(bytes)
@@ -82,7 +82,7 @@ impl BytesSlab {
8282
self.in_progress.retain(|x| x.is_some());
8383
}
8484

85-
let new_buffer = self.stash.pop().unwrap_or_else(|| Bytes::from(vec![0; 1 << self.shift].into_boxed_slice()));
85+
let new_buffer = self.stash.pop().unwrap_or_else(|| BytesMut::from(vec![0; 1 << self.shift].into_boxed_slice()));
8686
let old_buffer = ::std::mem::replace(&mut self.buffer, new_buffer);
8787

8888
self.buffer[.. self.valid].copy_from_slice(&old_buffer[.. self.valid]);

communication/src/allocator/zero_copy/push_pull.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::rc::Rc;
44
use std::cell::RefCell;
55
use std::collections::VecDeque;
66

7-
use timely_bytes::arc::Bytes;
7+
use timely_bytes::arc::BytesMut;
88

99
use crate::allocator::canary::Canary;
1010
use crate::networking::MessageHeader;
@@ -67,12 +67,12 @@ impl<T: Bytesable, P: BytesPush> Push<T> for Pusher<T, P> {
6767
pub struct Puller<T> {
6868
_canary: Canary,
6969
current: Option<T>,
70-
receiver: Rc<RefCell<VecDeque<Bytes>>>, // source of serialized buffers
70+
receiver: Rc<RefCell<VecDeque<BytesMut>>>, // source of serialized buffers
7171
}
7272

7373
impl<T: Bytesable> Puller<T> {
7474
/// Creates a new `Puller` instance from a shared queue.
75-
pub fn new(receiver: Rc<RefCell<VecDeque<Bytes>>>, _canary: Canary) -> Puller<T> {
75+
pub fn new(receiver: Rc<RefCell<VecDeque<BytesMut>>>, _canary: Canary) -> Puller<T> {
7676
Puller {
7777
_canary,
7878
current: None,
@@ -104,12 +104,12 @@ pub struct PullerInner<T> {
104104
inner: Box<dyn Pull<T>>, // inner pullable (e.g. intra-process typed queue)
105105
_canary: Canary,
106106
current: Option<T>,
107-
receiver: Rc<RefCell<VecDeque<Bytes>>>, // source of serialized buffers
107+
receiver: Rc<RefCell<VecDeque<BytesMut>>>, // source of serialized buffers
108108
}
109109

110110
impl<T: Bytesable> PullerInner<T> {
111111
/// Creates a new `PullerInner` instance from a shared queue.
112-
pub fn new(inner: Box<dyn Pull<T>>, receiver: Rc<RefCell<VecDeque<Bytes>>>, _canary: Canary) -> Self {
112+
pub fn new(inner: Box<dyn Pull<T>>, receiver: Rc<RefCell<VecDeque<BytesMut>>>, _canary: Canary) -> Self {
113113
PullerInner {
114114
inner,
115115
_canary,

communication/src/initialize.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ impl Config {
189189
/// }
190190
///
191191
/// impl Bytesable for Message {
192-
/// fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self {
192+
/// fn from_bytes(bytes: timely_bytes::arc::BytesMut) -> Self {
193193
/// Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() }
194194
/// }
195195
///
@@ -285,7 +285,7 @@ pub fn initialize<T:Send+'static, F: Fn(Generic)->T+Send+Sync+'static>(
285285
/// }
286286
///
287287
/// impl Bytesable for Message {
288-
/// fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self {
288+
/// fn from_bytes(bytes: timely_bytes::arc::BytesMut) -> Self {
289289
/// Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() }
290290
/// }
291291
///

communication/src/lib.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
//! }
2525
//!
2626
//! impl Bytesable for Message {
27-
//! fn from_bytes(bytes: timely_bytes::arc::Bytes) -> Self {
27+
//! fn from_bytes(bytes: timely_bytes::arc::BytesMut) -> Self {
2828
//! Message { payload: std::str::from_utf8(&bytes[..]).unwrap().to_string() }
2929
//! }
3030
//!
@@ -107,12 +107,12 @@ pub use allocator::Generic as Allocator;
107107
pub use allocator::{Allocate, Exchangeable};
108108
pub use initialize::{initialize, initialize_from, Config, WorkerGuards};
109109

110-
use timely_bytes::arc::Bytes;
110+
use timely_bytes::arc::BytesMut;
111111

112-
/// A type that can be serialized and deserialized through `Bytes`.
112+
/// A type that can be serialized and deserialized through `BytesMut`.
113113
pub trait Bytesable {
114114
/// Wrap bytes as `Self`.
115-
fn from_bytes(bytes: Bytes) -> Self;
115+
fn from_bytes(bytes: BytesMut) -> Self;
116116

117117
/// The number of bytes required to serialize the data.
118118
fn length_in_bytes(&self) -> usize;

0 commit comments

Comments
 (0)