Skip to content

Commit 7d30357

Browse files
committed
Merge branch 'permessage-deflate' into feat/taosdata/permessage-deflate
* permessage-deflate: (78 commits) Fix clippy warnings and errors when disabling handshake feature Apply most suggestions by @erebe about permessage deflate Improve performance of processing continue frames add more details for utf8 errors for debugging Implement `From<Bytes>` for `Message` Don't copy data before decompression Apply suggestions from code review fix(Utf8Bytes): hash consistency for Borrow + Hash traits Bump version Implement `PartialOrd` and `Ord` for `Utf8Bytes` Derive `Hash` for `Utf8Bytes` Implement `Borrow<str>`, `AsRef<[u8]>`, `AsRef<str>` and `AsRef<Bytes>` for `Utf8Bytes Fix formatting of srv_accept_unmasked_frames example Decompress single-frame messages as well Add WebSocketConfig::read_buffer_size docs explaining performance/memory tradeoff (snapview#482) Update rand requirement from 0.8.0 to 0.9.0 (snapview#481) Remove byteorder, use bytes::Buf::get_uint instead (snapview#477) feat: add unsafe Utf8Bytes::from_bytes_unchecked (snapview#476) Keep the set-len refactor but use safe resize Revert "Revert "Use set-len when reading into buffer"" ...
2 parents e4d146b + f322da9 commit 7d30357

37 files changed

+960
-405
lines changed

.github/workflows/ci.yml

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,22 @@ jobs:
77
name: Format
88
runs-on: ubuntu-latest
99
steps:
10-
- uses: actions/checkout@v3
10+
- uses: actions/checkout@v4
1111
- uses: dtolnay/rust-toolchain@nightly
1212
with:
1313
components: rustfmt
1414
- run: cargo fmt --all --check
1515

16+
clippy:
17+
name: Clippy
18+
runs-on: ubuntu-latest
19+
steps:
20+
- uses: actions/checkout@v4
21+
- uses: dtolnay/rust-toolchain@stable
22+
with:
23+
components: clippy
24+
- run: cargo clippy --all --tests --all-features -- -D warnings
25+
1626
test:
1727
name: Test
1828
runs-on: ubuntu-latest
@@ -24,7 +34,7 @@ jobs:
2434

2535
steps:
2636
- name: Checkout sources
27-
uses: actions/checkout@v3
37+
uses: actions/checkout@v4
2838

2939
- name: Install toolchain
3040
uses: dtolnay/rust-toolchain@master
@@ -50,11 +60,11 @@ jobs:
5060
strategy:
5161
matrix:
5262
rust:
53-
- 1.60.0
63+
- 1.63.0
5464

5565
steps:
5666
- name: Checkout sources
57-
uses: actions/checkout@v3
67+
uses: actions/checkout@v4
5868

5969
- name: Install toolchain
6070
uses: dtolnay/rust-toolchain@master
@@ -64,6 +74,9 @@ jobs:
6474
- name: Install dependencies
6575
run: sudo apt-get install libssl-dev
6676

77+
- name: Disable env_logger dev-dependency (cargo 1.63 workaround)
78+
run: sed -i 's/env_logger/#env_logger/' Cargo.toml
79+
6780
- name: Check
6881
run: cargo check
6982

@@ -80,7 +93,7 @@ jobs:
8093

8194
steps:
8295
- name: Checkout sources
83-
uses: actions/checkout@v3
96+
uses: actions/checkout@v4
8497

8598
- name: Install toolchain
8699
uses: dtolnay/rust-toolchain@master

CHANGELOG.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,33 @@
1+
# 0.26.2
2+
- Add `WebSocketConfig::read_buffer_size` docs explaining performance/memory tradeoff.
3+
- Implement traits and add helper methods for the UTF8 payloads making them comparable and more ergonomic.
4+
5+
# 0.26.1
6+
- Fix/revert unsoundness that could lead to UB with dodgy `Read` stream implementations.
7+
8+
# 0.26.0
9+
- Simplify `Message` to use `Bytes` payload directly with simpler `Utf8Bytes` for text.
10+
- Change `CloseFrame` to use `Utf8Bytes` for `reason`.
11+
- Re-export `Bytes`.
12+
13+
# 0.25.0
14+
15+
- New `Payload` type for `Message` that allows sending messages with a payload that can be cheaply cloned (`Bytes`).
16+
Long standing [issue](https://github.com/snapview/tungstenite-rs/issues/96) solved!
17+
- Add `WebSocketConfig::read_buffer_size` default 128 KiB. This improves high load read performance.
18+
**Note: This default increases memory usage compared to previous versions particularly for users expecting a high number of connections. Configure 4-8 KiB to get a similar memory usage to 0.24**.
19+
- Make `WebSocketConfig` non-exhaustive & add builder style construction fns.
20+
- Remove deprecated `WebSocketConfig::max_send_queue`.
21+
- Trim spaces on `Sec-WebSocket-Protocol` header.
22+
- Eliminate data copies when reading complete messages & optimise read buffer. Improves performance.
23+
- Update `thiserror` to `2`.
24+
25+
# 0.24.0
26+
27+
- Raised MSRV to 1.63 to match `tokio-tungstenite`.
28+
- Connecting to WSS URL without TLS features specified results in a better error.
29+
- Handshake will now flush after completion to be safe (works better with buffered streams).
30+
131
# 0.23.0
232

333
- Disable default features for `rustls` giving the user more flexibility.

Cargo.toml

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@ authors = ["Alexey Galakhov", "Daniel Abramov"]
77
license = "MIT OR Apache-2.0"
88
readme = "README.md"
99
homepage = "https://github.com/snapview/tungstenite-rs"
10-
documentation = "https://docs.rs/tungstenite/0.23.0"
10+
documentation = "https://docs.rs/tungstenite/0.26.2"
1111
repository = "https://github.com/snapview/tungstenite-rs"
12-
version = "0.23.0"
13-
edition = "2018"
14-
rust-version = "1.60"
12+
version = "0.26.2"
13+
edition = "2021"
14+
rust-version = "1.63"
1515
include = ["benches/**/*", "src/**/*", "examples/**/*", "LICENSE-*", "README.md", "CHANGELOG.md"]
1616

1717
[package.metadata.docs.rs]
@@ -30,15 +30,14 @@ deflate = ["flate2"]
3030

3131
[dependencies]
3232
data-encoding = { version = "2", optional = true }
33-
byteorder = "1.3.2"
34-
bytes = "1.0"
33+
bytes = "1.9.0"
3534
headers = { version = "0.4.0", optional = true }
3635
http = { version = "1.0", optional = true }
3736
httparse = { version = "1.3.4", optional = true }
3837
log = "0.4.8"
39-
rand = "0.8.0"
38+
rand = "0.9.0"
4039
sha1 = { version = "0.10", optional = true }
41-
thiserror = "1.0.23"
40+
thiserror = "2.0.7"
4241
url = { version = "2.1.0", optional = true }
4342
utf-8 = "0.7.5"
4443

@@ -63,19 +62,22 @@ version = "1.0"
6362

6463
[dependencies.rustls-native-certs]
6564
optional = true
66-
version = "0.7.0"
65+
version = "0.8.0"
6766

6867
[dependencies.webpki-roots]
6968
optional = true
7069
version = "0.26"
7170

7271
[dev-dependencies]
7372
criterion = "0.5.0"
74-
env_logger = "0.10.0"
73+
env_logger = "0.11"
7574
input_buffer = "0.5.0"
76-
rand = "0.8.4"
75+
rand = "0.9.0"
7776
socket2 = "0.5.5"
7877

78+
[profile.bench]
79+
lto = "thin"
80+
7981
[[bench]]
8082
name = "buffer"
8183
harness = false
@@ -84,6 +86,10 @@ harness = false
8486
name = "write"
8587
harness = false
8688

89+
[[bench]]
90+
name = "read"
91+
harness = false
92+
8793
[[example]]
8894
name = "client"
8995
required-features = ["handshake"]

benches/buffer.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ impl<const CHUNK_SIZE: usize> Buf for StackReadBuffer<CHUNK_SIZE> {
7777
}
7878

7979
fn advance(&mut self, cnt: usize) {
80-
Buf::advance(self.as_cursor_mut(), cnt)
80+
Buf::advance(self.as_cursor_mut(), cnt);
8181
}
8282
}
8383

@@ -114,10 +114,10 @@ fn benchmark(c: &mut Criterion) {
114114
group.throughput(Throughput::Bytes(STREAM_SIZE as u64));
115115
group.bench_function("InputBuffer", |b| b.iter(|| input_buffer(black_box(stream.clone()))));
116116
group.bench_function("ReadBuffer (stack)", |b| {
117-
b.iter(|| stack_read_buffer(black_box(stream.clone())))
117+
b.iter(|| stack_read_buffer(black_box(stream.clone())));
118118
});
119119
group.bench_function("ReadBuffer (heap)", |b| {
120-
b.iter(|| heap_read_buffer(black_box(stream.clone())))
120+
b.iter(|| heap_read_buffer(black_box(stream.clone())));
121121
});
122122
group.finish();
123123
}

benches/read.rs

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
//! Benchmarks for read performance.
2+
use criterion::{BatchSize, Criterion};
3+
use std::{
4+
io::{self, Read, Write},
5+
sync::{Arc, Mutex},
6+
};
7+
use tungstenite::{protocol::Role, Message, WebSocket};
8+
9+
/// Mock stream with no artificial delays.
10+
#[derive(Default, Clone)]
11+
struct MockIo(Arc<Mutex<Vec<u8>>>);
12+
13+
impl Read for MockIo {
14+
fn read(&mut self, to: &mut [u8]) -> io::Result<usize> {
15+
let mut data = self.0.lock().unwrap();
16+
if data.is_empty() {
17+
return Err(io::Error::new(io::ErrorKind::WouldBlock, "not ready"));
18+
}
19+
let len = data.len().min(to.len());
20+
to[..len].copy_from_slice(data.drain(..len).as_slice());
21+
Ok(len)
22+
}
23+
}
24+
25+
impl Write for MockIo {
26+
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
27+
self.0.lock().unwrap().write(buf)
28+
}
29+
fn flush(&mut self) -> io::Result<()> {
30+
Ok(())
31+
}
32+
}
33+
34+
fn benchmark(c: &mut Criterion) {
35+
/// Benchmark reading 100k mix of binary & text messages.
36+
fn read_100k(role: Role, b: &mut criterion::Bencher<'_>) {
37+
let io = MockIo::default();
38+
let mut writer = WebSocket::from_raw_socket(
39+
io.clone(),
40+
match role {
41+
Role::Client => Role::Server,
42+
Role::Server => Role::Client,
43+
},
44+
None,
45+
);
46+
let mut ws = WebSocket::from_raw_socket(io, role, None);
47+
48+
b.iter_batched(
49+
|| {
50+
let mut sum = 0;
51+
for i in 0_u64..100_000 {
52+
writer
53+
.send(match i {
54+
_ if i % 3 == 0 => Message::binary(i.to_le_bytes().to_vec()),
55+
_ => Message::text(format!("{{\"id\":{i}}}")),
56+
})
57+
.unwrap();
58+
sum += i;
59+
}
60+
sum
61+
},
62+
|expected_sum| {
63+
let mut sum = 0;
64+
while sum != expected_sum {
65+
match ws.read().unwrap() {
66+
Message::Binary(v) => {
67+
let a: &[u8; 8] = v.as_ref().try_into().unwrap();
68+
sum += u64::from_le_bytes(*a);
69+
}
70+
Message::Text(msg) => {
71+
let i: u64 = msg.as_str()[6..msg.len() - 1].parse().unwrap();
72+
sum += i;
73+
}
74+
m => panic!("Unexpected {m}"),
75+
}
76+
}
77+
},
78+
BatchSize::SmallInput,
79+
);
80+
}
81+
82+
c.bench_function("read+unmask 100k small messages (server)", |b| {
83+
read_100k(Role::Server, b);
84+
});
85+
86+
c.bench_function("read 100k small messages (client)", |b| {
87+
read_100k(Role::Client, b);
88+
});
89+
}
90+
91+
criterion::criterion_group!(read_benches, benchmark);
92+
criterion::criterion_main!(read_benches);

benches/write.rs

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
//! Benchmarks for write performance.
2-
use criterion::{BatchSize, Criterion};
2+
use criterion::Criterion;
33
use std::{
4-
hint,
5-
io::{self, Read, Write},
4+
hint, io,
65
time::{Duration, Instant},
76
};
8-
use tungstenite::{Message, WebSocket};
7+
use tungstenite::{protocol::Role, Message, WebSocket};
98

109
const MOCK_WRITE_LEN: usize = 8 * 1024 * 1024;
1110

@@ -16,12 +15,12 @@ const MOCK_WRITE_LEN: usize = 8 * 1024 * 1024;
1615
/// Each `flush` takes **~8µs** to simulate flush io.
1716
struct MockWrite(Vec<u8>);
1817

19-
impl Read for MockWrite {
18+
impl io::Read for MockWrite {
2019
fn read(&mut self, _: &mut [u8]) -> io::Result<usize> {
2120
Err(io::Error::new(io::ErrorKind::WouldBlock, "reads not supported"))
2221
}
2322
}
24-
impl Write for MockWrite {
23+
impl io::Write for MockWrite {
2524
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
2625
if self.0.len() + buf.len() > MOCK_WRITE_LEN {
2726
self.flush()?;
@@ -50,24 +49,28 @@ fn spin(duration: Duration) {
5049
}
5150

5251
fn benchmark(c: &mut Criterion) {
53-
// Writes 100k small json text messages then flushes
54-
c.bench_function("write 100k small texts then flush", |b| {
55-
let mut ws = WebSocket::from_raw_socket(
56-
MockWrite(Vec::with_capacity(MOCK_WRITE_LEN)),
57-
tungstenite::protocol::Role::Server,
58-
None,
59-
);
52+
fn write_100k_then_flush(role: Role, b: &mut criterion::Bencher<'_>) {
53+
let mut ws =
54+
WebSocket::from_raw_socket(MockWrite(Vec::with_capacity(MOCK_WRITE_LEN)), role, None);
6055

61-
b.iter_batched(
62-
|| (0..100_000).map(|i| Message::Text(format!("{{\"id\":{i}}}"))),
63-
|batch| {
64-
for msg in batch {
65-
ws.write(msg).unwrap();
66-
}
67-
ws.flush().unwrap();
68-
},
69-
BatchSize::SmallInput,
70-
)
56+
b.iter(|| {
57+
for i in 0_u64..100_000 {
58+
let msg = match i {
59+
_ if i % 3 == 0 => Message::binary(i.to_le_bytes().to_vec()),
60+
_ => Message::text(format!("{{\"id\":{i}}}")),
61+
};
62+
ws.write(msg).unwrap();
63+
}
64+
ws.flush().unwrap();
65+
});
66+
}
67+
68+
c.bench_function("write 100k small messages then flush (server)", |b| {
69+
write_100k_then_flush(Role::Server, b);
70+
});
71+
72+
c.bench_function("write+mask 100k small messages then flush (client)", |b| {
73+
write_100k_then_flush(Role::Client, b);
7174
});
7275
}
7376

examples/autobahn-client.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,18 +11,18 @@ fn get_case_count() -> Result<u32> {
1111
let (mut socket, _) = connect("ws://localhost:9001/getCaseCount")?;
1212
let msg = socket.read()?;
1313
socket.close(None)?;
14-
Ok(msg.into_text()?.parse::<u32>().unwrap())
14+
Ok(msg.into_text()?.as_str().parse::<u32>().unwrap())
1515
}
1616

1717
fn update_reports() -> Result<()> {
18-
let (mut socket, _) = connect(&format!("ws://localhost:9001/updateReports?agent={}", AGENT))?;
18+
let (mut socket, _) = connect(format!("ws://localhost:9001/updateReports?agent={AGENT}"))?;
1919
socket.close(None)?;
2020
Ok(())
2121
}
2222

2323
fn run_test(case: u32) -> Result<()> {
2424
info!("Running test case {}", case);
25-
let case_url = &format!("ws://localhost:9001/runCase?case={}&agent={}", case, AGENT);
25+
let case_url = format!("ws://localhost:9001/runCase?case={case}&agent={AGENT}");
2626

2727
let mut config = WebSocketConfig::default();
2828
config.compression = Some(DeflateConfig::default());
@@ -46,7 +46,7 @@ fn main() {
4646
for case in 1..=total {
4747
if let Err(e) = run_test(case) {
4848
match e {
49-
Error::ConnectionClosed | Error::Protocol(_) | Error::Utf8 => (),
49+
Error::ConnectionClosed | Error::Protocol(_) | Error::Utf8(_) => (),
5050
err => error!("test: {}", err),
5151
}
5252
}

0 commit comments

Comments
 (0)