Skip to content

Commit 8e0f19d

Browse files
committed
Optimise read buf initialization performance
Particularly improves large read_buffer_size performance
1 parent 2d4abe8 commit 8e0f19d

File tree

3 files changed

+232
-7
lines changed

3 files changed

+232
-7
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
# Unreleased
2+
* Optimise read performance particularly for larger than default `read_buffer_size`s.
3+
14
# 0.28.0
25
* Reduce `Error` size 136 -> **32** by boxing internals of `Error::Http`, `Error::WriteBufferFull`,
36
`ProtocolError::InvalidHeader`, `TlsError::Native`, `TlsError::Rustls`.
Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
use bytes::{Buf, BytesMut};
2+
use std::{
3+
ops::{Deref, DerefMut},
4+
ptr,
5+
};
6+
7+
/// [`BytesMut`] wrapper that tracks initialization state of its spare capacity.
8+
///
9+
/// Supports safe & efficient repeated calls to [`Self::resize`] + [`Self::truncate`].
10+
///
11+
/// This optimisation is useful for [`std::io::Read`] to safely provide spare
12+
/// capacity as an initialized slice.
13+
///
14+
/// Related, may be obsoleted by: <https://github.com/rust-lang/rust/issues/78485>
15+
#[derive(Debug, Default)]
16+
pub struct InitAwareBuf {
17+
bytes: BytesMut,
18+
/// Capacity that has been initialized.
19+
init_cap: usize,
20+
}
21+
22+
impl InitAwareBuf {
23+
#[inline]
24+
pub fn with_capacity(capacity: usize) -> Self {
25+
Self { bytes: BytesMut::with_capacity(capacity), init_cap: 0 }
26+
}
27+
28+
#[inline]
29+
pub fn len(&self) -> usize {
30+
self.bytes.len()
31+
}
32+
33+
#[inline]
34+
pub fn capacity(&self) -> usize {
35+
self.bytes.capacity()
36+
}
37+
38+
#[inline]
39+
pub fn split_to(&mut self, at: usize) -> BytesMut {
40+
let split = self.bytes.split_to(at);
41+
self.init_cap -= at;
42+
split
43+
}
44+
45+
#[inline]
46+
pub fn reserve(&mut self, additional: usize) {
47+
// Increasing capacity doesn't change `init_cap`
48+
self.bytes.reserve(additional);
49+
}
50+
51+
/// Sets the length of the buffer to `len`. If above the current
52+
/// initialized capacity any uninitialized bytes will be zeroed.
53+
///
54+
/// This is more efficient that [`BytesMut::resize`] as spare capacity
55+
/// is only initialized **once** past the initialized_capacity. This
56+
/// allow the method to be efficiently called after truncating.
57+
///
58+
/// # Panics
59+
/// Panics if `len > capacity`.
60+
#[inline]
61+
pub fn resize(&mut self, len: usize) {
62+
if len <= self.init_cap {
63+
// SAFETY: init_cap tracks initialised bytes.
64+
unsafe {
65+
self.bytes.set_len(len);
66+
}
67+
} else {
68+
assert!(len <= self.capacity());
69+
let cur_len = self.bytes.len();
70+
let spare = self.bytes.spare_capacity_mut();
71+
let already_init = self.init_cap - cur_len;
72+
let zeroes = len - self.init_cap;
73+
debug_assert!(already_init + zeroes <= spare.len());
74+
unsafe {
75+
// SAFETY: spare capacity is sufficient for `zeroes` extra bytes
76+
ptr::write_bytes(spare[already_init..].as_mut_ptr().cast::<u8>(), 0, zeroes);
77+
// SAFETY: len has been initialized
78+
self.bytes.set_len(len);
79+
}
80+
self.init_cap = len;
81+
}
82+
}
83+
84+
#[inline]
85+
pub fn truncate(&mut self, len: usize) {
86+
// truncating doesn't change `init_cap`
87+
self.bytes.truncate(len);
88+
}
89+
90+
#[inline]
91+
pub fn advance(&mut self, cnt: usize) {
92+
self.bytes.advance(cnt);
93+
self.init_cap -= cnt;
94+
}
95+
}
96+
97+
impl From<BytesMut> for InitAwareBuf {
98+
#[inline]
99+
fn from(bytes: BytesMut) -> Self {
100+
let init_cap = bytes.len();
101+
Self { bytes, init_cap }
102+
}
103+
}
104+
105+
impl From<InitAwareBuf> for BytesMut {
106+
#[inline]
107+
fn from(value: InitAwareBuf) -> Self {
108+
value.bytes
109+
}
110+
}
111+
112+
impl AsRef<[u8]> for InitAwareBuf {
113+
#[inline]
114+
fn as_ref(&self) -> &[u8] {
115+
&self.bytes
116+
}
117+
}
118+
119+
impl Deref for InitAwareBuf {
120+
type Target = [u8];
121+
122+
#[inline]
123+
fn deref(&self) -> &[u8] {
124+
&self.bytes
125+
}
126+
}
127+
128+
impl AsMut<[u8]> for InitAwareBuf {
129+
#[inline]
130+
fn as_mut(&mut self) -> &mut [u8] {
131+
&mut self.bytes
132+
}
133+
}
134+
135+
impl DerefMut for InitAwareBuf {
136+
#[inline]
137+
fn deref_mut(&mut self) -> &mut [u8] {
138+
&mut self.bytes
139+
}
140+
}
141+
142+
#[cfg(test)]
143+
mod test {
144+
use super::*;
145+
146+
#[test]
147+
fn reserve_resize_truncate() {
148+
let mut buf = InitAwareBuf::default();
149+
assert_eq!(buf.len(), 0);
150+
assert_eq!(buf.init_cap, 0);
151+
assert_eq!(buf.capacity(), 0);
152+
153+
buf.reserve(64);
154+
assert_eq!(buf.len(), 0);
155+
assert_eq!(buf.init_cap, 0);
156+
let new_capacity = buf.capacity();
157+
assert!(new_capacity >= 64);
158+
159+
buf.resize(10);
160+
assert_eq!(buf.len(), 10);
161+
assert_eq!(buf.init_cap, 10);
162+
assert_eq!(buf.capacity(), new_capacity);
163+
assert_eq!(&*buf, &[0; 10]);
164+
165+
// write 3 bytes =8
166+
buf[0] = 8;
167+
buf[1] = 8;
168+
buf[2] = 8;
169+
// mark the other bytes as =44
170+
for i in 3..10 {
171+
buf[i] = 44;
172+
}
173+
buf.truncate(3);
174+
assert_eq!(buf.len(), 3);
175+
assert_eq!(buf.init_cap, 10);
176+
assert_eq!(buf.capacity(), new_capacity);
177+
assert_eq!(&*buf, &[8; 3]);
178+
179+
// resizing should need do nothing now since this has already been initialized once
180+
buf.resize(10);
181+
assert_eq!(buf.len(), 10);
182+
assert_eq!(buf.init_cap, 10);
183+
assert_eq!(buf.capacity(), new_capacity);
184+
assert_eq!(&*buf, &[8, 8, 8, 44, 44, 44, 44, 44, 44, 44]);
185+
186+
buf.truncate(3);
187+
assert_eq!(&*buf, &[8; 3]);
188+
189+
// resizing should only init to zero the 3 bytes that hadn't previously been
190+
buf.resize(13);
191+
assert_eq!(buf.len(), 13);
192+
assert_eq!(buf.init_cap, 13);
193+
assert_eq!(buf.capacity(), new_capacity);
194+
assert_eq!(&*buf, &[8, 8, 8, 44, 44, 44, 44, 44, 44, 44, 0, 0, 0]);
195+
}
196+
197+
#[test]
198+
fn advance() {
199+
let mut buf = InitAwareBuf::from(BytesMut::from(&[0, 1, 2, 3, 4][..]));
200+
assert_eq!(buf.len(), 5);
201+
assert_eq!(buf.init_cap, 5);
202+
203+
buf.advance(2);
204+
assert_eq!(buf.len(), 3);
205+
assert_eq!(buf.init_cap, 3);
206+
assert_eq!(&*buf, &[2, 3, 4]);
207+
}
208+
209+
#[test]
210+
fn split_to() {
211+
let mut buf = InitAwareBuf::from(BytesMut::from(&[0, 1, 2, 3, 4][..]));
212+
assert_eq!(buf.len(), 5);
213+
assert_eq!(buf.init_cap, 5);
214+
215+
let split = buf.split_to(2);
216+
assert_eq!(buf.len(), 3);
217+
assert_eq!(buf.init_cap, 3);
218+
assert_eq!(&*buf, &[2, 3, 4]);
219+
assert_eq!(&*split, &[0, 1]);
220+
}
221+
}

src/protocol/frame/mod.rs

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ pub mod coding;
44

55
#[allow(clippy::module_inception)]
66
mod frame;
7+
mod init_aware_buf;
78
mod mask;
89
mod utf8;
910

@@ -14,7 +15,7 @@ pub use self::{
1415

1516
use crate::{
1617
error::{CapacityError, Error, ProtocolError, Result},
17-
protocol::frame::mask::apply_mask,
18+
protocol::frame::{init_aware_buf::InitAwareBuf, mask::apply_mask},
1819
Message,
1920
};
2021
use bytes::BytesMut;
@@ -46,7 +47,7 @@ impl<Stream> FrameSocket<Stream> {
4647

4748
/// Extract a stream from the socket.
4849
pub fn into_inner(self) -> (Stream, BytesMut) {
49-
(self.stream, self.codec.in_buffer)
50+
(self.stream, self.codec.in_buffer.into())
5051
}
5152

5253
/// Returns a shared reference to the inner stream.
@@ -103,7 +104,7 @@ where
103104
#[derive(Debug)]
104105
pub(super) struct FrameCodec {
105106
/// Buffer to read data from the stream.
106-
in_buffer: BytesMut,
107+
in_buffer: InitAwareBuf,
107108
in_buf_max_read: usize,
108109
/// Buffer to send packets to the network.
109110
out_buffer: Vec<u8>,
@@ -123,7 +124,7 @@ impl FrameCodec {
123124
/// Create a new frame codec.
124125
pub(super) fn new(in_buf_len: usize) -> Self {
125126
Self {
126-
in_buffer: BytesMut::with_capacity(in_buf_len),
127+
in_buffer: InitAwareBuf::with_capacity(in_buf_len),
127128
in_buf_max_read: in_buf_len.max(FrameHeader::MAX_SIZE),
128129
out_buffer: <_>::default(),
129130
max_out_buffer_len: usize::MAX,
@@ -137,7 +138,7 @@ impl FrameCodec {
137138
let mut in_buffer = BytesMut::from_iter(part);
138139
in_buffer.reserve(min_in_buf_len.saturating_sub(in_buffer.len()));
139140
Self {
140-
in_buffer,
141+
in_buffer: in_buffer.into(),
141142
in_buf_max_read: min_in_buf_len.max(FrameHeader::MAX_SIZE),
142143
out_buffer: <_>::default(),
143144
max_out_buffer_len: usize::MAX,
@@ -172,7 +173,7 @@ impl FrameCodec {
172173
let mut cursor = Cursor::new(&mut self.in_buffer);
173174
self.header = FrameHeader::parse(&mut cursor)?;
174175
let advanced = cursor.position();
175-
bytes::Buf::advance(&mut self.in_buffer, advanced as _);
176+
self.in_buffer.advance(advanced as _);
176177

177178
if let Some((_, len)) = &self.header {
178179
let len = *len as usize;
@@ -233,7 +234,7 @@ impl FrameCodec {
233234
fn read_in(&mut self, stream: &mut impl Read) -> io::Result<usize> {
234235
let len = self.in_buffer.len();
235236
debug_assert!(self.in_buffer.capacity() > len);
236-
self.in_buffer.resize(self.in_buffer.capacity().min(len + self.in_buf_max_read), 0);
237+
self.in_buffer.resize(self.in_buffer.capacity().min(len + self.in_buf_max_read));
237238
let size = stream.read(&mut self.in_buffer[len..]);
238239
self.in_buffer.truncate(len + size.as_ref().copied().unwrap_or(0));
239240
size

0 commit comments

Comments
 (0)