Skip to content

Commit 30f7efe

Browse files
committed
Merge branch 'master' into feat/taosdata/permessage-deflate
* master: Prepare 0.27.0 release When reading avoid over-reserving the in the case WouldBlock causes multiple read_frame calls (snapview#501) Add end to end "send+recv" benchmarks (snapview#497) Update src/protocol/frame/frame.rs Don't allow zero `in_buf_max_read` Fix large message read performance by enforcing max `read_buffer_size` read chunks
2 parents 7d30357 + 3ffeb33 commit 30f7efe

File tree

7 files changed

+142
-20
lines changed

7 files changed

+142
-20
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
# 0.27.0
2+
- Fix large message read performance by enforcing max `read_buffer_size` read chunks.
3+
- Make `Hash` implementation consistent for `Utf8Bytes` payloads.
4+
15
# 0.26.2
26
- Add `WebSocketConfig::read_buffer_size` docs explaining performance/memory tradeoff.
37
- Implement traits and add helper methods for the UTF8 payloads making them comparable and more ergonomic.

Cargo.toml

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ authors = ["Alexey Galakhov", "Daniel Abramov"]
77
license = "MIT OR Apache-2.0"
88
readme = "README.md"
99
homepage = "https://github.com/snapview/tungstenite-rs"
10-
documentation = "https://docs.rs/tungstenite/0.26.2"
10+
documentation = "https://docs.rs/tungstenite/0.27.0"
1111
repository = "https://github.com/snapview/tungstenite-rs"
12-
version = "0.26.2"
12+
version = "0.27.0"
1313
edition = "2021"
1414
rust-version = "1.63"
1515
include = ["benches/**/*", "src/**/*", "examples/**/*", "LICENSE-*", "README.md", "CHANGELOG.md"]
@@ -69,7 +69,7 @@ optional = true
6969
version = "0.26"
7070

7171
[dev-dependencies]
72-
criterion = "0.5.0"
72+
criterion = "0.6"
7373
env_logger = "0.11"
7474
input_buffer = "0.5.0"
7575
rand = "0.9.0"
@@ -90,6 +90,11 @@ harness = false
9090
name = "read"
9191
harness = false
9292

93+
[[bench]]
94+
name = "e2e"
95+
harness = false
96+
required-features = ["handshake"]
97+
9398
[[example]]
9499
name = "client"
95100
required-features = ["handshake"]

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,12 @@ Testing
7878
Tungstenite is thoroughly tested and passes the [Autobahn Test Suite](https://github.com/crossbario/autobahn-testsuite) for
7979
WebSockets. It is also covered by internal unit tests as well as possible.
8080

81+
Benchmark
82+
---------
83+
Benches are in [./benches](./benches/).
84+
* Run all with `cargo bench --bench \* -- --quick --noplot`
85+
* Run a particular set with, say "e2e", with `cargo bench --bench e2e -- --quick --noplot`
86+
8187
Contributing
8288
------------
8389

benches/buffer.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1-
use std::io::{Cursor, Read, Result as IoResult};
1+
#![allow(clippy::incompatible_msrv)] // msrv doesn't apply to benches
22

33
use bytes::Buf;
4-
use criterion::*;
4+
use criterion::{criterion_group, criterion_main, Criterion, Throughput};
55
use input_buffer::InputBuffer;
6-
6+
use std::{
7+
hint::black_box,
8+
io::{Cursor, Read, Result as IoResult},
9+
};
710
use tungstenite::buffer::ReadBuffer;
811

912
const CHUNK_SIZE: usize = 4096;

benches/e2e.rs

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
//! Benchmarks for end to end performance including real `Read` & `Write` impls.
2+
use bytes::Bytes;
3+
use criterion::{BatchSize, Criterion, Throughput};
4+
use rand::{
5+
distr::{Alphanumeric, SampleString},
6+
rngs::SmallRng,
7+
SeedableRng,
8+
};
9+
use std::net::TcpListener;
10+
use tungstenite::{accept_hdr_with_config, protocol::WebSocketConfig, Message};
11+
12+
/// Binary message meaning "stop".
13+
const B_STOP: Bytes = Bytes::from_static(b"stop");
14+
15+
fn benchmark(c: &mut Criterion) {
16+
/// Benchmark that starts a simple server and client then sends (writes+flush) a
17+
/// single text message client->server and reads a single response text message
18+
/// server->client. Both message will be of the given `msg_len` size.
19+
fn send_and_recv(msg_len: usize, b: &mut criterion::Bencher<'_>) {
20+
let socket = TcpListener::bind("127.0.0.1:0").unwrap();
21+
let port = socket.local_addr().unwrap().port();
22+
let conf = WebSocketConfig::default().max_message_size(None).max_frame_size(None);
23+
24+
let server_thread = std::thread::spawn(move || {
25+
// single thread / single client server
26+
let (stream, _) = socket.accept().unwrap();
27+
let mut websocket =
28+
accept_hdr_with_config(stream, |_: &_, res| Ok(res), Some(conf)).unwrap();
29+
loop {
30+
let uppercase_txt = match websocket.read().unwrap() {
31+
Message::Text(msg) => msg.to_ascii_uppercase(),
32+
Message::Binary(msg) if msg == B_STOP => return,
33+
msg => panic!("Unexpected msg: {msg:?}"),
34+
};
35+
websocket.send(Message::text(uppercase_txt)).unwrap();
36+
}
37+
});
38+
39+
let (mut client, _) = tungstenite::client::connect_with_config(
40+
format!("ws://127.0.0.1:{port}"),
41+
Some(conf),
42+
3,
43+
)
44+
.unwrap();
45+
let mut rng = SmallRng::seed_from_u64(123);
46+
47+
b.iter_batched(
48+
|| {
49+
let msg = Alphanumeric.sample_string(&mut rng, msg_len);
50+
let expected_response = msg.to_ascii_uppercase();
51+
(msg, expected_response)
52+
},
53+
|(txt, expected_response)| {
54+
client.send(Message::text(txt)).unwrap();
55+
let response = client.read().unwrap();
56+
match response {
57+
Message::Text(v) => assert_eq!(v, expected_response),
58+
msg => panic!("Unexpected response msg: {msg:?}"),
59+
};
60+
},
61+
BatchSize::PerIteration,
62+
);
63+
64+
// cleanup
65+
client.send(Message::binary(B_STOP)).unwrap();
66+
server_thread.join().unwrap();
67+
}
68+
69+
// bench sending & receiving various sizes 512B to 1GiB.
70+
for len in (0..8).map(|n| 512 * 8_usize.pow(n)) {
71+
let mut group = c.benchmark_group("send+recv");
72+
group
73+
.throughput(Throughput::Bytes(len as u64 * 2)) // *2 as we send and then recv it
74+
.bench_function(HumanLen(len).to_string(), |b| send_and_recv(len, b));
75+
}
76+
}
77+
78+
struct HumanLen(usize);
79+
80+
impl std::fmt::Display for HumanLen {
81+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82+
match self.0 {
83+
n if n < 1024 => write!(f, "{n} B"),
84+
n if n < 1024 * 1024 => write!(f, "{} KiB", n / 1024),
85+
n if n < 1024 * 1024 * 1024 => write!(f, "{} MiB", n / (1024 * 1024)),
86+
n => write!(f, "{} GiB", n / (1024 * 1024 * 1024)),
87+
}
88+
}
89+
}
90+
91+
criterion::criterion_group!(read_benches, benchmark);
92+
criterion::criterion_main!(read_benches);

src/protocol/frame/frame.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ impl Default for FrameHeader {
6666
}
6767

6868
impl FrameHeader {
69+
/// > The longest possible header is 14 bytes, which would represent a message sent from
70+
/// > the client to the server with a payload greater than 64KB.
71+
pub(crate) const MAX_SIZE: usize = 14;
72+
6973
/// Parse a header from an input stream.
7074
/// Returns `None` if insufficient data and does not consume anything in this case.
7175
/// Payload size is returned along with the header.

src/protocol/frame/mod.rs

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ where
104104
pub(super) struct FrameCodec {
105105
/// Buffer to read data from the stream.
106106
in_buffer: BytesMut,
107+
in_buf_max_read: usize,
107108
/// Buffer to send packets to the network.
108109
out_buffer: Vec<u8>,
109110
/// Capacity limit for `out_buffer`.
@@ -123,6 +124,7 @@ impl FrameCodec {
123124
pub(super) fn new(in_buf_len: usize) -> Self {
124125
Self {
125126
in_buffer: BytesMut::with_capacity(in_buf_len),
127+
in_buf_max_read: in_buf_len.max(FrameHeader::MAX_SIZE),
126128
out_buffer: <_>::default(),
127129
max_out_buffer_len: usize::MAX,
128130
out_buffer_write_len: 0,
@@ -136,6 +138,7 @@ impl FrameCodec {
136138
in_buffer.reserve(min_in_buf_len.saturating_sub(in_buffer.len()));
137139
Self {
138140
in_buffer,
141+
in_buf_max_read: min_in_buf_len.max(FrameHeader::MAX_SIZE),
139142
out_buffer: <_>::default(),
140143
max_out_buffer_len: usize::MAX,
141144
out_buffer_write_len: 0,
@@ -165,34 +168,39 @@ impl FrameCodec {
165168
let max_size = max_size.unwrap_or_else(usize::max_value);
166169

167170
let mut payload = loop {
168-
{
169-
if self.header.is_none() {
170-
let mut cursor = Cursor::new(&mut self.in_buffer);
171-
self.header = FrameHeader::parse(&mut cursor)?;
172-
let advanced = cursor.position();
173-
bytes::Buf::advance(&mut self.in_buffer, advanced as _);
174-
}
171+
if self.header.is_none() {
172+
let mut cursor = Cursor::new(&mut self.in_buffer);
173+
self.header = FrameHeader::parse(&mut cursor)?;
174+
let advanced = cursor.position();
175+
bytes::Buf::advance(&mut self.in_buffer, advanced as _);
175176

176177
if let Some((_, len)) = &self.header {
177178
let len = *len as usize;
178179

179-
// Enforce frame size limit early and make sure `length`
180-
// is not too big (fits into `usize`).
180+
// Enforce frame size limit early
181181
if len > max_size {
182182
return Err(Error::Capacity(CapacityError::MessageTooLong {
183183
size: len,
184184
max_size,
185185
}));
186186
}
187187

188-
if len <= self.in_buffer.len() {
189-
break self.in_buffer.split_to(len);
190-
}
188+
// Reserve full message length only once, even for multiple
189+
// loops or if WouldBlock errors cause multiple fn calls.
190+
self.in_buffer.reserve(len);
191+
} else {
192+
self.in_buffer.reserve(FrameHeader::MAX_SIZE);
193+
}
194+
}
195+
196+
if let Some((_, len)) = &self.header {
197+
let len = *len as usize;
198+
if len <= self.in_buffer.len() {
199+
break self.in_buffer.split_to(len);
191200
}
192201
}
193202

194203
// Not enough data in buffer.
195-
self.in_buffer.reserve(self.header.as_ref().map(|(_, l)| *l as usize).unwrap_or(6));
196204
if self.read_in(stream)? == 0 {
197205
trace!("no frame received");
198206
return Ok(None);
@@ -225,7 +233,7 @@ impl FrameCodec {
225233
fn read_in(&mut self, stream: &mut impl Read) -> io::Result<usize> {
226234
let len = self.in_buffer.len();
227235
debug_assert!(self.in_buffer.capacity() > len);
228-
self.in_buffer.resize(self.in_buffer.capacity(), 0);
236+
self.in_buffer.resize(self.in_buffer.capacity().min(len + self.in_buf_max_read), 0);
229237
let size = stream.read(&mut self.in_buffer[len..]);
230238
self.in_buffer.truncate(len + size.as_ref().copied().unwrap_or(0));
231239
size

0 commit comments

Comments
 (0)