Skip to content

Commit 440537c

Browse files
committed
RRD footers: everything framing
1 parent 95e5494 commit 440537c

File tree

19 files changed

+750
-120
lines changed

19 files changed

+750
-120
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9070,6 +9070,7 @@ dependencies = [
90709070
"bytes",
90719071
"criterion",
90729072
"ehttp",
9073+
"itertools 0.14.0",
90739074
"js-sys",
90749075
"lz4_flex 0.12.0",
90759076
"mimalloc",
@@ -9092,6 +9093,7 @@ dependencies = [
90929093
"wasm-bindgen-futures",
90939094
"web-sys",
90949095
"web-time",
9096+
"xxhash-rust",
90959097
]
90969098

90979099
[[package]]

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,7 @@ wgpu = { version = "27.0.1", default-features = false, features = [
427427
"fragile-send-sync-non-atomic-wasm",
428428
] }
429429
xshell = "0.2.7"
430+
xxhash-rust = { version = "0.8", features = ["xxh32"] }
430431

431432
# ---------------------------------------------------------------------------------
432433
[profile]

crates/store/re_log_encoding/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,12 @@ re_tracing.workspace = true
5353

5454
# External:
5555
arrow = { workspace = true, features = ["ipc"] }
56+
itertools.workspace = true
5657
lz4_flex = { workspace = true }
5758
parking_lot.workspace = true
5859
thiserror.workspace = true
5960
tracing.workspace = true
61+
xxhash-rust.workspace = true
6062

6163
# Optional external dependencies:
6264
bytes = { workspace = true, optional = true }

crates/store/re_log_encoding/src/rrd/decoder/iterator.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ impl<T: DecoderEntrypoint, R: std::io::BufRead> std::iter::Iterator for DecoderI
139139

140140
// …and the underlying decoder already considers that it's done (i.e. it's
141141
// waiting for a whole new stream to begin): time to stop.
142-
Ok(None) if self.decoder.state == DecoderState::StreamHeader => {
142+
Ok(None) if self.decoder.state == DecoderState::WaitingForStreamHeader => {
143143
return None;
144144
}
145145

@@ -570,3 +570,4 @@ mod tests {
570570
}
571571
}
572572
}
573+

crates/store/re_log_encoding/src/rrd/decoder/state_machine.rs

Lines changed: 188 additions & 62 deletions
Large diffs are not rendered by default.

crates/store/re_log_encoding/src/rrd/decoder/stream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ impl<T: DecoderEntrypoint + Unpin, R: AsyncBufRead + Unpin> Stream for DecoderSt
151151

152152
// …and the underlying decoder already considers that it's done (i.e. it's
153153
// waiting for a whole new stream to begin): time to stop.
154-
Ok(None) if decoder.state == DecoderState::StreamHeader => {
154+
Ok(None) if decoder.state == DecoderState::WaitingForStreamHeader => {
155155
return std::task::Poll::Ready(None);
156156
}
157157

crates/store/re_log_encoding/src/rrd/encoder.rs

Lines changed: 166 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@ use std::borrow::Borrow;
55
use re_build_info::CrateVersion;
66
use re_chunk::{ChunkError, ChunkResult};
77
use re_log_types::LogMsg;
8+
use re_sorbet::SorbetError;
89

9-
use crate::ToTransport as _;
10-
use crate::rrd::{
10+
use crate::{
1111
CodecError, Compression, Encodable as _, EncodingOptions, MessageHeader, MessageKind,
12-
Serializer, StreamHeader,
12+
Serializer, StreamFooter, StreamHeader, ToTransport as _,
1313
};
1414

1515
// ----------------------------------------------------------------------------
@@ -31,6 +31,9 @@ pub enum EncodeError {
3131

3232
#[error("Chunk error: {0}")]
3333
Chunk(Box<ChunkError>),
34+
35+
#[error("Sorbet error: {0}")]
36+
Sorbet(Box<SorbetError>),
3437
}
3538

3639
const _: () = assert!(
@@ -50,6 +53,12 @@ impl From<ChunkError> for EncodeError {
5053
}
5154
}
5255

56+
impl From<SorbetError> for EncodeError {
57+
fn from(err: SorbetError) -> Self {
58+
Self::Sorbet(Box::new(err))
59+
}
60+
}
61+
5362
// ----------------------------------------------------------------------------
5463

5564
/// Encode a stream of [`LogMsg`] into an `.rrd` file.
@@ -64,13 +73,63 @@ pub struct Encoder<W: std::io::Write> {
6473
/// Optional so that we can `take()` it in `into_inner`, while still being allowed to implement `Drop`.
6574
write: Option<W>,
6675

67-
/// So we don't ever successfully write partial messages.
76+
/// How many bytes written out so far?
77+
num_written: u64,
78+
79+
/// * So we don't ever successfully write partial messages.
80+
/// * Because `prost` only supports buffers, not IO traits.
6881
scratch: Vec<u8>,
6982

70-
/// Tracks whether the end-of-stream marker has been written out already.
83+
/// Tracks the state required to build the RRD manifest for this stream.
84+
///
85+
/// If set to `None`, the footer will not be computed.
86+
///
87+
/// Calling [`Self::append_transport`] will automatically disable footers.
88+
footer_state: Option<FooterState>,
89+
90+
/// Tracks whether the end-of-stream marker, and optionally the associated footer, have been
91+
/// written out already.
7192
is_finished: bool,
7293
}
7394

95+
/// The accumulated state used to build the footer when closing the [`Encoder`].
96+
///
97+
/// This is automatically updated when calling [`Encoder::append`].
98+
#[derive(Default)]
99+
struct FooterState {
100+
/// What is the currently active recording ID according to the state of the encoder, if any?
101+
///
102+
/// Put another way: was there a `SetStoreInfo` message earlier in the stream? If so, we will
103+
/// want to override the recording ID of each chunk with that one (because that's the existing
104+
/// behavior, certainly not because it's nice).
105+
recording_id_scope: Option<re_log_types::StoreId>,
106+
}
107+
108+
impl FooterState {
109+
#[expect(clippy::unnecessary_wraps)] // won't stay for long
110+
fn append(
111+
&mut self,
112+
_byte_offset: u64,
113+
_byte_size: u64,
114+
msg: &re_log_types::LogMsg,
115+
) -> Result<(), EncodeError> {
116+
match msg {
117+
LogMsg::SetStoreInfo(msg) => {
118+
self.recording_id_scope = Some(msg.info.store_id.clone());
119+
}
120+
121+
LogMsg::ArrowMsg(_, _) | LogMsg::BlueprintActivationCommand(_) => {}
122+
}
123+
124+
Ok(())
125+
}
126+
127+
#[expect(clippy::unnecessary_wraps, clippy::unused_self)] // won't stay for long
128+
fn finish(self) -> Result<crate::RrdFooter, EncodeError> {
129+
Ok(crate::RrdFooter {})
130+
}
131+
}
132+
74133
impl Encoder<Vec<u8>> {
75134
pub fn local() -> Result<Self, EncodeError> {
76135
Self::new_eager(
@@ -126,7 +185,9 @@ impl<W: std::io::Write> Encoder<W> {
126185
serializer: options.serializer,
127186
compression: options.compression,
128187
write: Some(write),
188+
num_written: out.len() as u64,
129189
scratch: Vec::new(),
190+
footer_state: Some(FooterState::default()),
130191
is_finished: false,
131192
})
132193
}
@@ -137,22 +198,54 @@ impl<W: std::io::Write> Encoder<W> {
137198
return Err(EncodeError::AlreadyFinished);
138199
}
139200

140-
if self.write.is_none() {
201+
let Some(w) = self.write.as_mut() else {
141202
return Err(EncodeError::AlreadyUnwrapped);
142-
}
203+
};
143204

144205
re_tracing::profile_function!();
145206

146-
let message = message.to_transport(self.compression)?;
147-
// Safety: the compression settings of this message are consistent with this stream.
148-
#[expect(unsafe_code)]
149-
unsafe {
150-
self.append_transport(&message)
207+
let transport = message.to_transport(self.compression)?;
208+
209+
let byte_offset_excluding_header =
210+
self.num_written + crate::MessageHeader::ENCODED_SIZE_BYTES as u64;
211+
212+
self.scratch.clear();
213+
let n = match self.serializer {
214+
Serializer::Protobuf => {
215+
transport.to_rrd_bytes(&mut self.scratch)?;
216+
let n = w
217+
.write_all(&self.scratch)
218+
.map(|_| self.scratch.len() as u64)
219+
.map_err(EncodeError::Write)?;
220+
self.num_written += n;
221+
n
222+
}
223+
};
224+
225+
let byte_size_excluding_header = n - crate::MessageHeader::ENCODED_SIZE_BYTES as u64;
226+
227+
if let Some(footer_state) = self.footer_state.as_mut() {
228+
footer_state.append(
229+
byte_offset_excluding_header,
230+
byte_size_excluding_header,
231+
message,
232+
)?;
151233
}
234+
235+
Ok(n)
236+
}
237+
238+
/// Instructs the encoder to _not_ emit a footer at the end of the stream.
239+
///
240+
/// This cannot be reverted.
241+
pub fn do_not_emit_footer(&mut self) {
242+
self.footer_state = None;
152243
}
153244

154245
/// Returns the size in bytes of the encoded data.
155246
///
247+
/// ⚠️ This implies [`Self::do_not_emit_footer`]. ⚠️
248+
///
156249
/// ## Safety
157250
///
158251
/// `message` must respect the global settings of the encoder (e.g. the compression used),
@@ -166,19 +259,27 @@ impl<W: std::io::Write> Encoder<W> {
166259
return Err(EncodeError::AlreadyFinished);
167260
}
168261

262+
re_tracing::profile_function!();
263+
264+
// We cannot update the RRD manifest without decoding the message, which would defeat the
265+
// entire purposes of using this method in the first place.
266+
// Therefore, we disable footers if and when this method is used.
267+
self.do_not_emit_footer();
268+
169269
let Some(w) = self.write.as_mut() else {
170270
return Err(EncodeError::AlreadyUnwrapped);
171271
};
172272

173-
re_tracing::profile_function!();
174-
175273
self.scratch.clear();
176274
match self.serializer {
177275
Serializer::Protobuf => {
178276
message.to_rrd_bytes(&mut self.scratch)?;
179-
w.write_all(&self.scratch)
180-
.map(|_| self.scratch.len() as _)
181-
.map_err(EncodeError::Write)
277+
let n = w
278+
.write_all(&self.scratch)
279+
.map(|_| self.scratch.len() as u64)
280+
.map_err(EncodeError::Write)?;
281+
self.num_written += n;
282+
Ok(n)
182283
}
183284
}
184285
}
@@ -200,22 +301,52 @@ impl<W: std::io::Write> Encoder<W> {
200301
return Err(EncodeError::AlreadyUnwrapped);
201302
};
202303

203-
match self.serializer {
204-
Serializer::Protobuf => {
205-
// TODO(cmc): the extra heap-alloc and copy could be easily avoided with the
206-
// introduction of an InMemoryWriter trait or similar. In practice it makes no
207-
// difference and the cognitive overhead of this crate is already through the roof.
208-
let mut header = Vec::new();
209-
MessageHeader {
210-
kind: MessageKind::End,
211-
len: 0,
212-
}
213-
.to_rrd_bytes(&mut header)?;
214-
w.write_all(&header)?;
215-
}
304+
self.is_finished = true;
305+
306+
let Some(footer_state) = self.footer_state.take() else {
307+
return Ok(());
308+
};
309+
310+
// TODO(cmc): the extra heap-allocs and copies could be easily avoided with the
311+
// introduction of an InMemoryWriter trait or similar. In practice it makes no
312+
// difference and the cognitive overhead of this crate is already through the roof.
313+
314+
use re_protos::external::prost::Message as _;
315+
316+
// Message Header (::End)
317+
318+
let rrd_footer = footer_state.finish()?;
319+
let rrd_footer = rrd_footer.to_transport(())?;
320+
321+
let mut out_header = Vec::new();
322+
MessageHeader {
323+
kind: MessageKind::End,
324+
len: rrd_footer.encoded_len() as u64,
216325
}
326+
.to_rrd_bytes(&mut out_header)?;
327+
w.write_all(&out_header).map_err(EncodeError::Write)?;
328+
self.num_written += out_header.len() as u64;
217329

218-
self.is_finished = true;
330+
let end_msg_byte_offset_from_start_excluding_header = self.num_written;
331+
332+
// Message payload (re_protos::RrdFooter)
333+
334+
let mut out_rrd_footer = Vec::new();
335+
rrd_footer.to_rrd_bytes(&mut out_rrd_footer)?;
336+
w.write_all(&out_rrd_footer).map_err(EncodeError::Write)?;
337+
self.num_written += out_rrd_footer.len() as u64;
338+
339+
// StreamFooter
340+
341+
let mut out_stream_footer = Vec::new();
342+
StreamFooter::from_rrd_footer_bytes(
343+
end_msg_byte_offset_from_start_excluding_header,
344+
&out_rrd_footer,
345+
)
346+
.to_rrd_bytes(&mut out_stream_footer)?;
347+
w.write_all(&out_stream_footer)
348+
.map_err(EncodeError::Write)?;
349+
self.num_written += out_stream_footer.len() as u64;
219350

220351
Ok(())
221352
}
@@ -255,8 +386,6 @@ impl<W: std::io::Write> Encoder<W> {
255386
}
256387
}
257388

258-
// TODO(cmc): It seems a bit suspicious to me that we send an EOS marker on drop, but don't flush.
259-
// But I don't want to change any flushing behavior at the moment, so I'll keep it that way for now.
260389
impl<W: std::io::Write> std::ops::Drop for Encoder<W> {
261390
fn drop(&mut self) {
262391
if self.write.is_none() {
@@ -267,5 +396,9 @@ impl<W: std::io::Write> std::ops::Drop for Encoder<W> {
267396
if let Err(err) = self.finish() {
268397
re_log::warn!("encoder couldn't be finished: {err}");
269398
}
399+
400+
if let Err(err) = self.flush_blocking() {
401+
re_log::warn!("encoder couldn't be flushed: {err}");
402+
}
270403
}
271404
}

0 commit comments

Comments
 (0)