Skip to content

Commit f229f8e

Browse files
authored
StreamDecoder: lift naming confusion regarding chunks (#11452)
This is just a pure renaming pass. `StreamDecoder`'s job is to deal with chunks of bytes, and it used to call those `chunks`. This was extremely confusing to me when trying to understand the code, since _chunk_ is such an overloaded term for us. This PR just makes it as clear as possible that this is about _byte chunks_, not _Rerun chunks_. --- This PR is part of an upcoming series of PRs to pay off organic growth debt in our encoding/decoding stack. * DNM: requires #11450
1 parent 54978bf commit f229f8e

File tree

2 files changed

+74
-70
lines changed

2 files changed

+74
-70
lines changed

crates/store/re_log_encoding/src/decoder/stream.rs

Lines changed: 73 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,22 @@ use std::collections::VecDeque;
22
use std::io::Cursor;
33
use std::io::Read as _;
44

5+
use re_build_info::CrateVersion;
6+
use re_log_types::LogMsg;
7+
58
use crate::EncodingOptions;
69
use crate::FileHeader;
710
use crate::Serializer;
811
use crate::app_id_injector::CachingApplicationIdInjector;
912
use crate::decoder::options_from_bytes;
10-
use re_build_info::CrateVersion;
11-
use re_log_types::LogMsg;
1213

1314
use super::DecodeError;
1415

15-
/// The stream decoder is a state machine which ingests byte chunks
16-
/// and outputs messages once it has enough data to deserialize one.
16+
/// The stream decoder is a state machine which ingests byte chunks and outputs messages once it
17+
/// has enough data to deserialize one.
1718
///
18-
/// Chunks are given to the stream via `StreamDecoder::push_chunk`,
19-
/// and messages are read back via `StreamDecoder::try_read`.
19+
/// Byte chunks are given to the stream via [`StreamDecoder::push_byte_chunk`], and messages are read
20+
/// back via [`StreamDecoder::try_read`].
2021
pub struct StreamDecoder {
2122
/// The Rerun version used to encode the RRD data.
2223
///
@@ -25,10 +26,10 @@ pub struct StreamDecoder {
2526

2627
options: EncodingOptions,
2728

28-
/// Incoming chunks are stored here
29-
chunks: ChunkBuffer,
29+
/// Incoming byte chunks are stored here.
30+
byte_chunks: ByteChunkBuffer,
3031

31-
/// The stream state
32+
/// The stream state.
3233
state: State,
3334

3435
/// The application id cache used for migrating old data.
@@ -49,20 +50,19 @@ pub struct StreamDecoder {
4950
enum State {
5051
/// The beginning of the stream.
5152
///
52-
/// The stream header contains the magic bytes (e.g. `RRF2`),
53-
/// the encoded version, and the encoding options.
53+
/// The stream header contains the magic bytes (e.g. `RRF2`), the encoded version, and the
54+
/// encoding options.
5455
///
55-
/// After the stream header is read once, the state machine
56-
/// will only ever switch between `MessageHeader` and `Message`
56+
/// After the stream header is read once, the state machine will only ever switch between
57+
/// `MessageHeader` and `Message`
5758
StreamHeader,
5859

5960
/// The beginning of a Protobuf message.
6061
MessageHeader,
6162

6263
/// The message content, serialized using `Protobuf`.
6364
///
64-
/// Compression is only applied to individual `ArrowMsg`s, instead of
65-
/// the entire stream.
65+
/// Compression is only applied to individual `ArrowMsg`s, instead of the entire stream.
6666
Message(crate::codec::file::MessageHeader),
6767

6868
/// Stop reading
@@ -74,17 +74,17 @@ impl StreamDecoder {
7474
pub fn new() -> Self {
7575
Self {
7676
version: None,
77-
// Note: `options` are filled in once we read `FileHeader`,
78-
// so this value does not matter.
77+
// Note: `options` are filled in once we read `FileHeader`, so this value does not matter.
7978
options: EncodingOptions::PROTOBUF_UNCOMPRESSED,
80-
chunks: ChunkBuffer::new(),
79+
byte_chunks: ByteChunkBuffer::new(),
8180
state: State::StreamHeader,
8281
app_id_cache: CachingApplicationIdInjector::default(),
8382
}
8483
}
8584

86-
pub fn push_chunk(&mut self, chunk: Vec<u8>) {
87-
self.chunks.push(chunk);
85+
/// Feed a bunch of bytes to the decoding state machine.
86+
pub fn push_byte_chunk(&mut self, byte_chunk: Vec<u8>) {
87+
self.byte_chunks.push(byte_chunk);
8888
}
8989

9090
/// Read the next message in the stream, dropping messages missing application id that cannot
@@ -112,8 +112,8 @@ impl StreamDecoder {
112112
fn try_read_impl(&mut self) -> Result<Option<LogMsg>, DecodeError> {
113113
match self.state {
114114
State::StreamHeader => {
115-
let is_first_header = self.chunks.num_read() == 0;
116-
if let Some(header) = self.chunks.try_read(FileHeader::SIZE) {
115+
let is_first_header = self.byte_chunks.num_read() == 0;
116+
if let Some(header) = self.byte_chunks.try_read(FileHeader::SIZE) {
117117
re_log::trace!(?header, "Decoding StreamHeader");
118118

119119
// header contains version and compression options
@@ -144,29 +144,30 @@ impl StreamDecoder {
144144
Serializer::Protobuf => self.state = State::MessageHeader,
145145
}
146146

147-
// we might have data left in the current chunk,
148-
// immediately try to read length of the next message
147+
// we might have data left in the current byte chunk, immediately try to read
148+
// length of the next message.
149149
return self.try_read();
150150
}
151151
}
152152

153153
State::MessageHeader => {
154154
if let Some(bytes) = self
155-
.chunks
155+
.byte_chunks
156156
.try_read(crate::codec::file::MessageHeader::SIZE_BYTES)
157157
{
158158
let header = crate::codec::file::MessageHeader::from_bytes(bytes)?;
159159

160160
re_log::trace!(?header, "MessageHeader");
161161

162162
self.state = State::Message(header);
163-
// we might have data left in the current chunk,
164-
// immediately try to read the message content
163+
// we might have data left in the current byte chunk, immediately try to read
164+
// the message content.
165165
return self.try_read();
166166
}
167167
}
168+
168169
State::Message(header) => {
169-
if let Some(bytes) = self.chunks.try_read(header.len as usize) {
170+
if let Some(bytes) = self.byte_chunks.try_read(header.len as usize) {
170171
re_log::trace!(?header, "Read message");
171172

172173
let message = crate::codec::file::decoder::decode_bytes_to_app(
@@ -217,14 +218,15 @@ fn propagate_version(message: &mut LogMsg, version: Option<CrateVersion>) {
217218
}
218219
}
219220

220-
type Chunk = Cursor<Vec<u8>>;
221+
/// A bunch of contiguous bytes.
222+
type ByteChunk = Cursor<Vec<u8>>;
221223

222-
struct ChunkBuffer {
223-
/// Any incoming chunks are queued until they are emptied
224-
queue: VecDeque<Chunk>,
224+
struct ByteChunkBuffer {
225+
/// Any incoming byte chunks are queued until they are emptied.
226+
queue: VecDeque<ByteChunk>,
225227

226-
/// This buffer is used as scratch space for any read bytes,
227-
/// so that we can return a contiguous slice from `try_read`.
228+
/// This buffer is used as scratch space for any read bytes, so that we can return a contiguous
229+
/// slice from `try_read`.
228230
buffer: Vec<u8>,
229231

230232
/// How many bytes of valid data are currently in `self.buffer`.
@@ -234,7 +236,7 @@ struct ChunkBuffer {
234236
num_read: usize,
235237
}
236238

237-
impl ChunkBuffer {
239+
impl ByteChunkBuffer {
238240
fn new() -> Self {
239241
Self {
240242
queue: VecDeque::with_capacity(16),
@@ -244,19 +246,19 @@ impl ChunkBuffer {
244246
}
245247
}
246248

247-
fn push(&mut self, chunk: Vec<u8>) {
248-
if chunk.is_empty() {
249+
fn push(&mut self, byte_chunk: Vec<u8>) {
250+
if byte_chunk.is_empty() {
249251
return;
250252
}
251-
self.queue.push_back(Chunk::new(chunk));
253+
self.queue.push_back(ByteChunk::new(byte_chunk));
252254
}
253255

254256
/// How many bytes have been read with [`Self::try_read`] so far?
255257
fn num_read(&self) -> usize {
256258
self.num_read
257259
}
258260

259-
/// Attempt to read exactly `n` bytes out of the queued chunks.
261+
/// Attempt to read exactly `n` bytes out of the queued byte chunks.
260262
///
261263
/// Returns `None` if there is not enough data to return a slice of `n` bytes.
262264
///
@@ -276,13 +278,15 @@ impl ChunkBuffer {
276278
// try to read some bytes from the front of the queue,
277279
// until either:
278280
// - we've read enough to return a slice of `n` bytes
279-
// - we run out of chunks to read
280-
// while also discarding any empty chunks
281+
// - we run out of byte chunks to read
282+
// while also discarding any empty byte chunks
281283
while self.buffer_fill != n {
282-
if let Some(chunk) = self.queue.front_mut() {
284+
if let Some(byte_chunk) = self.queue.front_mut() {
283285
let remainder = &mut self.buffer[self.buffer_fill..];
284-
self.buffer_fill += chunk.read(remainder).expect("failed to read from chunk");
285-
if is_chunk_empty(chunk) {
286+
self.buffer_fill += byte_chunk
287+
.read(remainder)
288+
.expect("failed to read from byte chunk");
289+
if is_byte_chunk_empty(byte_chunk) {
286290
self.queue.pop_front();
287291
}
288292
} else {
@@ -303,8 +307,8 @@ impl ChunkBuffer {
303307
}
304308
}
305309

306-
fn is_chunk_empty(chunk: &Chunk) -> bool {
307-
chunk.position() >= chunk.get_ref().len() as u64
310+
fn is_byte_chunk_empty(byte_chunk: &ByteChunk) -> bool {
311+
byte_chunk.position() >= byte_chunk.get_ref().len() as u64
308312
}
309313

310314
#[cfg(test)]
@@ -381,7 +385,7 @@ mod tests {
381385

382386
assert_message_incomplete!(decoder.try_read());
383387

384-
decoder.push_chunk(data);
388+
decoder.push_byte_chunk(data);
385389

386390
let decoded_messages: Vec<_> = (0..16)
387391
.map(|_| assert_message_ok!(decoder.try_read()))
@@ -398,8 +402,8 @@ mod tests {
398402

399403
assert_message_incomplete!(decoder.try_read());
400404

401-
for chunk in data.chunks(1) {
402-
decoder.push_chunk(chunk.to_vec());
405+
for byte_chunk in data.chunks(1) {
406+
decoder.push_byte_chunk(byte_chunk.to_vec());
403407
}
404408

405409
let decoded_messages: Vec<_> = (0..16)
@@ -419,8 +423,8 @@ mod tests {
419423

420424
assert_message_incomplete!(decoder.try_read());
421425

422-
decoder.push_chunk(data1);
423-
decoder.push_chunk(data2);
426+
decoder.push_byte_chunk(data1);
427+
decoder.push_byte_chunk(data2);
424428

425429
let decoded_messages: Vec<_> = (0..32)
426430
.map(|_| assert_message_ok!(decoder.try_read()))
@@ -437,7 +441,7 @@ mod tests {
437441

438442
assert_message_incomplete!(decoder.try_read());
439443

440-
decoder.push_chunk(data);
444+
decoder.push_byte_chunk(data);
441445

442446
let decoded_messages: Vec<_> = (0..16)
443447
.map(|_| assert_message_ok!(decoder.try_read()))
@@ -454,8 +458,8 @@ mod tests {
454458

455459
assert_message_incomplete!(decoder.try_read());
456460

457-
for chunk in data.chunks(1) {
458-
decoder.push_chunk(chunk.to_vec());
461+
for byte_chunk in data.chunks(1) {
462+
decoder.push_byte_chunk(byte_chunk.to_vec());
459463
}
460464

461465
let decoded_messages: Vec<_> = (0..16)
@@ -474,11 +478,11 @@ mod tests {
474478

475479
// keep pushing 3 chunks of 16 bytes at a time, and attempting to read messages
476480
// until there are no more chunks
477-
let mut chunks = data.chunks(16).peekable();
478-
while chunks.peek().is_some() {
481+
let mut byte_chunks = data.chunks(16).peekable();
482+
while byte_chunks.peek().is_some() {
479483
for _ in 0..3 {
480-
if let Some(chunk) = chunks.next() {
481-
decoder.push_chunk(chunk.to_vec());
484+
if let Some(byte_chunk) = byte_chunks.next() {
485+
decoder.push_byte_chunk(byte_chunk.to_vec());
482486
} else {
483487
break;
484488
}
@@ -494,15 +498,15 @@ mod tests {
494498

495499
#[test]
496500
fn stream_irregular_chunks_protobuf() {
497-
// this attempts to stress-test `try_read` with chunks of various sizes
501+
// this attempts to stress-test `try_read` with byte chunks of various sizes
498502

499503
let (input, data) = test_data(EncodingOptions::PROTOBUF_COMPRESSED, 16);
500504
let mut data = Cursor::new(data);
501505

502506
let mut decoder = StreamDecoder::new();
503507
let mut decoded_messages = vec![];
504508

505-
// read chunks 2xN bytes at a time, where `N` comes from a regular pattern
509+
// read byte chunks 2xN bytes at a time, where `N` comes from a regular pattern
506510
// this is slightly closer to using random numbers while still being
507511
// fully deterministic
508512

@@ -514,7 +518,7 @@ mod tests {
514518
for _ in 0..2 {
515519
let n = data.read(&mut temp[..pattern[pattern_index]]).unwrap();
516520
pattern_index = (pattern_index + 1) % pattern.len();
517-
decoder.push_chunk(temp[..n].to_vec());
521+
decoder.push_byte_chunk(temp[..n].to_vec());
518522
}
519523

520524
while let Some(message) = decoder.try_read().unwrap() {
@@ -527,9 +531,9 @@ mod tests {
527531

528532
#[test]
529533
fn chunk_buffer_read_single_chunk() {
530-
// reading smaller `n` from multiple larger chunks
534+
// reading smaller `n` from multiple larger byte chunks
531535

532-
let mut buffer = ChunkBuffer::new();
536+
let mut buffer = ByteChunkBuffer::new();
533537

534538
let data = &[0, 1, 2, 3, 4];
535539
assert_eq!(None, buffer.try_read(1));
@@ -541,16 +545,16 @@ mod tests {
541545

542546
#[test]
543547
fn chunk_buffer_read_multi_chunk() {
544-
// reading a large `n` from multiple smaller chunks
548+
// reading a large `n` from multiple smaller byte chunks
545549

546-
let mut buffer = ChunkBuffer::new();
550+
let mut buffer = ByteChunkBuffer::new();
547551

548-
let chunks: &[&[u8]] = &[&[0, 1, 2], &[3, 4]];
552+
let byte_chunks: &[&[u8]] = &[&[0, 1, 2], &[3, 4]];
549553

550554
assert_eq!(None, buffer.try_read(1));
551-
buffer.push(chunks[0].to_vec());
555+
buffer.push(byte_chunks[0].to_vec());
552556
assert_eq!(None, buffer.try_read(5));
553-
buffer.push(chunks[1].to_vec());
557+
buffer.push(byte_chunks[1].to_vec());
554558
assert_eq!(Some(&[0, 1, 2, 3, 4][..]), buffer.try_read(5));
555559
assert_eq!(None, buffer.try_read(1));
556560
}
@@ -559,7 +563,7 @@ mod tests {
559563
fn chunk_buffer_read_same_n() {
560564
// reading the same `n` multiple times should not return the same bytes
561565

562-
let mut buffer = ChunkBuffer::new();
566+
let mut buffer = ByteChunkBuffer::new();
563567

564568
let data = &[0, 1, 2, 3];
565569
buffer.push(data.to_vec());

crates/store/re_log_encoding/src/stream_rrd_from_http.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ pub fn stream_rrd_from_http(url: String, on_msg: Arc<HttpMessageCallback>) {
100100
}
101101

102102
re_tracing::profile_scope!("decoding_rrd_stream");
103-
decoder.borrow_mut().push_chunk(chunk);
103+
decoder.borrow_mut().push_byte_chunk(chunk);
104104
loop {
105105
match decoder.borrow_mut().try_read() {
106106
Ok(message) => match message {

0 commit comments

Comments
 (0)