Skip to content

Commit 0467da7

Browse files
authored
Use integer_encoding crate for varint encoding in tcp header (#62)
* use integer_encoding crate for varint encoding * avoid heap allocation * fix doc comment * max u32 packet size to support 32-bits machine and size decoding fix * use usize for max, not u32, because 32 bits machines can't decode it anyway
1 parent 642af96 commit 0467da7

File tree

4 files changed

+85
-50
lines changed

4 files changed

+85
-50
lines changed

Cargo.lock

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ net2 = "0.2.34"
3232
strum = { version = "0.20", features = ["derive"] }
3333
url = "2.2.0"
3434
tungstenite = { version = "0.13.0", optional = true }
35+
integer-encoding = "3.0.2"
3536

3637
[dev-dependencies]
3738
bincode = "1.3.1"

src/adapters/framed_tcp.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::adapter::{
33
ListeningInfo,
44
};
55
use crate::remote_addr::{RemoteAddr};
6-
use crate::encoding::{self, Decoder};
6+
use crate::encoding::{self, Decoder, MAX_ENCODED_SIZE};
77

88
use mio::net::{TcpListener, TcpStream};
99
use mio::event::{Source};
@@ -19,7 +19,7 @@ const INPUT_BUFFER_SIZE: usize = 65535; // 2^16 - 1
1919
/// The max packet value for tcp.
2020
/// Although this size is very high, it is preferred send data in smaller chunks with a rate
2121
/// to not saturate the receiver thread in the endpoint.
22-
pub const MAX_TCP_PAYLOAD_LEN: usize = encoding::Padding::MAX as usize;
22+
pub const MAX_TCP_PAYLOAD_LEN: usize = usize::MAX;
2323

2424
pub struct FramedTcpAdapter;
2525
impl Adapter for FramedTcpAdapter {
@@ -89,14 +89,15 @@ impl Remote for RemoteResource {
8989
}
9090

9191
fn send(&self, data: &[u8]) -> SendStatus {
92-
let encoded_size = encoding::encode_size(data);
92+
let mut buf = [0; MAX_ENCODED_SIZE]; // used to avoid a heap allocation
93+
let encoded_size = encoding::encode_size(data, &mut buf);
9394

9495
let mut total_bytes_sent = 0;
95-
let total_bytes = encoding::PADDING + data.len();
96+
let total_bytes = encoded_size.len() + data.len();
9697
loop {
97-
let data_to_send = match total_bytes_sent < encoding::PADDING {
98+
let data_to_send = match total_bytes_sent < encoded_size.len() {
9899
true => &encoded_size[total_bytes_sent..],
99-
false => &data[total_bytes_sent - encoding::PADDING..],
100+
false => &data[total_bytes_sent - encoded_size.len()..],
100101
};
101102

102103
let stream = &self.stream;

src/encoding.rs

Lines changed: 70 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,20 @@
1-
use std::convert::{TryInto};
1+
use integer_encoding::VarInt;
22

3-
pub type Padding = u32;
4-
pub const PADDING: usize = std::mem::size_of::<Padding>();
3+
/// This is the max required bytes to encode a u64 using the varint encoding scheme.
4+
/// It is size 10=ceil(64/7)
5+
pub const MAX_ENCODED_SIZE: usize = 10;
56

67
/// Encode a message, returning the bytes that must be sent before the message.
7-
pub fn encode_size(message: &[u8]) -> [u8; PADDING] {
8-
(message.len() as Padding).to_le_bytes()
8+
/// A buffer is used to avoid heap allocation.
9+
pub fn encode_size<'a>(message: &[u8], buf: &'a mut [u8; MAX_ENCODED_SIZE]) -> &'a [u8] {
10+
let varint_size = message.len().encode_var(buf);
11+
&buf[..varint_size]
912
}
1013

1114
/// Decodes an encoded value in a buffer.
12-
/// The function returns the message size or none if the buffer is less than [`PADDING`].
13-
pub fn decode_size(data: &[u8]) -> Option<usize> {
14-
if data.len() < PADDING {
15-
return None
16-
}
17-
data[..PADDING].try_into().map(|encoded| Padding::from_le_bytes(encoded) as usize).ok()
15+
/// The function returns the message size and the consumed bytes or none if the buffer is too small.
16+
pub fn decode_size(data: &[u8]) -> Option<(usize, usize)> {
17+
usize::decode_var(data)
1818
}
1919

2020
/// Used to decoded one message from several/partial data chunks
@@ -34,8 +34,8 @@ impl Decoder {
3434
fn try_decode(&mut self, data: &[u8], mut decoded_callback: impl FnMut(&[u8])) {
3535
let mut next_data = data;
3636
loop {
37-
if let Some(expected_size) = decode_size(&next_data) {
38-
let remaining = &next_data[PADDING..];
37+
if let Some((expected_size, used_bytes)) = decode_size(&next_data) {
38+
let remaining = &next_data[used_bytes..];
3939
if remaining.len() >= expected_size {
4040
let (decoded, not_decoded) = remaining.split_at(expected_size);
4141
decoded_callback(decoded);
@@ -55,25 +55,25 @@ impl Decoder {
5555

5656
fn store_and_decoded_data<'a>(&mut self, data: &'a [u8]) -> Option<(&[u8], &'a [u8])> {
5757
// Process frame header
58-
let (expected_size, data) = match decode_size(&self.stored) {
59-
Some(size) => (size, data),
58+
let ((expected_size, used_bytes), data) = match decode_size(&self.stored) {
59+
Some(size_info) => (size_info, data),
6060
None => {
61-
let remaining = PADDING - self.stored.len();
62-
if data.len() >= remaining {
63-
// Now, we can now the size
64-
self.stored.extend_from_slice(&data[..remaining]);
65-
(decode_size(&self.stored).unwrap(), &data[remaining..])
66-
}
67-
else {
68-
// We need more data to know the size
69-
self.stored.extend_from_slice(data);
61+
// we append at most the potential data needed to decode the size
62+
let max_remaining = (MAX_ENCODED_SIZE - self.stored.len()).min(data.len());
63+
self.stored.extend_from_slice(&data[..max_remaining]);
64+
65+
if let Some(x) = decode_size(&self.stored) {
66+
// Now we know the size
67+
(x, &data[max_remaining..])
68+
} else {
69+
// We still don't know the size (data was too small)
7070
return None
7171
}
7272
}
7373
};
7474

7575
// At this point we know at least the expected size of the frame.
76-
let remaining = expected_size - (self.stored.len() - PADDING);
76+
let remaining = expected_size - (self.stored.len() - used_bytes);
7777
if data.len() < remaining {
7878
// We need more data to decoder
7979
self.stored.extend_from_slice(data);
@@ -83,7 +83,7 @@ impl Decoder {
8383
// We can complete a message here
8484
let (to_store, remaining) = data.split_at(remaining);
8585
self.stored.extend_from_slice(to_store);
86-
Some((&self.stored[PADDING..], remaining))
86+
Some((&self.stored[used_bytes..], remaining))
8787
}
8888
}
8989

@@ -118,14 +118,15 @@ mod tests {
118118
use super::*;
119119

120120
const MESSAGE_SIZE: usize = 20; // only works if (X + PADDING ) % 6 == 0
121-
const ENCODED_MESSAGE_SIZE: usize = PADDING + MESSAGE_SIZE;
121+
const ENCODED_MESSAGE_SIZE: usize = 1 + MESSAGE_SIZE; // 1 = log_2(20)/7
122122
const MESSAGE: [u8; MESSAGE_SIZE] = [42; MESSAGE_SIZE];
123123
const MESSAGE_A: [u8; MESSAGE_SIZE] = ['A' as u8; MESSAGE_SIZE];
124124
const MESSAGE_B: [u8; MESSAGE_SIZE] = ['B' as u8; MESSAGE_SIZE];
125125
const MESSAGE_C: [u8; MESSAGE_SIZE] = ['C' as u8; MESSAGE_SIZE];
126126

127127
fn encode_message(buffer: &mut Vec<u8>, message: &[u8]) {
128-
buffer.extend_from_slice(&encode_size(message));
128+
let mut buf = [0; MAX_ENCODED_SIZE];
129+
buffer.extend_from_slice(&*encode_size(message, &mut buf));
129130
buffer.extend_from_slice(message);
130131
}
131132

@@ -135,9 +136,22 @@ mod tests {
135136
encode_message(&mut buffer, &MESSAGE);
136137

137138
assert_eq!(ENCODED_MESSAGE_SIZE, buffer.len());
138-
let expected_size = decode_size(&buffer).unwrap();
139+
let (expected_size, used_bytes) = decode_size(&buffer).unwrap();
139140
assert_eq!(MESSAGE_SIZE, expected_size);
140-
assert_eq!(&MESSAGE, &buffer[PADDING..]);
141+
assert_eq!(used_bytes, 1);
142+
assert_eq!(&MESSAGE, &buffer[used_bytes..]);
143+
}
144+
145+
#[test]
146+
fn encode_one_big_message() {
147+
let mut buffer = Vec::new();
148+
encode_message(&mut buffer, &vec![0; 1000]);
149+
150+
assert_eq!(1002, buffer.len());
151+
let (expected_size, used_bytes) = decode_size(&buffer).unwrap();
152+
assert_eq!(1000, expected_size);
153+
assert_eq!(used_bytes, 2);
154+
assert_eq!(&vec![0; 1000], &buffer[used_bytes..]);
141155
}
142156

143157
#[test]
@@ -307,60 +321,72 @@ mod tests {
307321
}
308322

309323
#[test]
310-
// [ 3B ][ remaining ]
324+
// [ 1B ][ remaining ]
311325
// [ message ]
312326
fn decode_message_after_non_enough_padding() {
327+
let msg = [0; 1000];
313328
let mut buffer = Vec::new();
314-
encode_message(&mut buffer, &MESSAGE);
329+
encode_message(&mut buffer, &msg);
315330

316-
let (start_3b, remaining) = buffer.split_at(3);
331+
let (start_1b, remaining) = buffer.split_at(2);
317332

318333
let mut decoder = Decoder::default();
319334

320335
let mut times_called = 0;
321-
decoder.decode(&start_3b, |_decoded| {
336+
decoder.decode(&start_1b, |_decoded| {
322337
// Should not be called
323338
times_called += 1;
324339
});
325340

326341
assert_eq!(0, times_called);
327-
assert_eq!(3, decoder.stored.len());
342+
assert_eq!(2, decoder.stored.len());
328343

329344
decoder.decode(&remaining, |decoded| {
330345
times_called += 1;
331-
assert_eq!(MESSAGE, decoded);
346+
assert_eq!(msg, decoded);
332347
});
333348

334349
assert_eq!(1, times_called);
335350
assert_eq!(0, decoder.stored.len());
336351
}
337352

338353
#[test]
339-
// [ 3B ][ 1B ]
340-
// [ message ]
341-
fn decode_message_no_size_after_non_enough_padding() {
354+
// [ 1B ][ 1B ][ remaining ]
355+
// [ message ]
356+
fn decode_message_var_size_in_two_data() {
357+
let msg = [0; 1000];
342358
let mut buffer = Vec::new();
343-
encode_message(&mut buffer, &[]);
359+
encode_message(&mut buffer, &msg);
344360

345-
let (start_3b, remaining) = buffer.split_at(3);
361+
let (start_1b, remaining) = buffer.split_at(1);
346362

347363
let mut decoder = Decoder::default();
348364

349365
let mut times_called = 0;
350-
decoder.decode(&start_3b, |_decoded| {
366+
decoder.decode(&start_1b, |_decoded| {
351367
// Should not be called
352368
times_called += 1;
353369
});
354370

355371
assert_eq!(0, times_called);
356-
assert_eq!(3, decoder.stored.len());
372+
assert_eq!(1, decoder.stored.len());
373+
374+
let (next_1b, remaining) = remaining.split_at(1);
357375

358376
let mut times_called = 0;
359-
decoder.decode(&remaining, |_decoded| {
377+
decoder.decode(&next_1b, |_decoded| {
360378
// Should not be called
361379
times_called += 1;
362380
});
363381

382+
assert_eq!(0, times_called);
383+
assert_eq!(2, decoder.stored.len());
384+
385+
decoder.decode(&remaining, |decoded| {
386+
times_called += 1;
387+
assert_eq!(msg, decoded);
388+
});
389+
364390
assert_eq!(1, times_called);
365391
assert_eq!(0, decoder.stored.len());
366392
}

0 commit comments

Comments
 (0)