From 80ff0a61815376fb56250d3864170539bf7d00a7 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Tue, 2 Dec 2025 12:50:37 +0100 Subject: [PATCH 1/6] RRD footers: everything framing --- Cargo.lock | 2 + Cargo.toml | 1 + crates/store/re_log_encoding/Cargo.toml | 2 + .../src/rrd/decoder/iterator.rs | 2 +- .../src/rrd/decoder/state_machine.rs | 250 +++++++++++++----- .../re_log_encoding/src/rrd/decoder/stream.rs | 2 +- .../store/re_log_encoding/src/rrd/encoder.rs | 199 +++++++++++--- .../store/re_log_encoding/src/rrd/errors.rs | 25 +- .../src/rrd/footer/instances.rs | 14 + .../re_log_encoding/src/rrd/footer/mod.rs | 3 + .../src/rrd/{headers.rs => frames.rs} | 191 ++++++++++++- .../store/re_log_encoding/src/rrd/log_msg.rs | 27 ++ crates/store/re_log_encoding/src/rrd/mod.rs | 10 +- .../re_log_encoding/src/transport_to_app.rs | 38 ++- .../re_log_encoding/tests/footer_roundtrip.rs | 55 ++++ .../proto/rerun/v1alpha1/log_msg.proto | 16 ++ .../src/v1alpha1/rerun.log_msg.v1alpha1.rs | 26 ++ crates/store/re_server/src/store/layer.rs | 5 +- rerun_py/rerun_sdk/rerun/recording_stream.py | 1 + 19 files changed, 749 insertions(+), 120 deletions(-) create mode 100644 crates/store/re_log_encoding/src/rrd/footer/instances.rs create mode 100644 crates/store/re_log_encoding/src/rrd/footer/mod.rs rename crates/store/re_log_encoding/src/rrd/{headers.rs => frames.rs} (50%) create mode 100644 crates/store/re_log_encoding/tests/footer_roundtrip.rs diff --git a/Cargo.lock b/Cargo.lock index 3316a4fdd8df..481b7838629f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9070,6 +9070,7 @@ dependencies = [ "bytes", "criterion", "ehttp", + "itertools 0.14.0", "js-sys", "lz4_flex 0.12.0", "mimalloc", @@ -9092,6 +9093,7 @@ dependencies = [ "wasm-bindgen-futures", "web-sys", "web-time", + "xxhash-rust", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index d58625d90347..cb30cd9ac6e9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -427,6 +427,7 @@ wgpu = { version = "27.0.1", default-features = false, features = [ "fragile-send-sync-non-atomic-wasm", ] } xshell = "0.2.7" +xxhash-rust = { version = "0.8", features = ["xxh32"] } # --------------------------------------------------------------------------------- [profile] diff --git a/crates/store/re_log_encoding/Cargo.toml b/crates/store/re_log_encoding/Cargo.toml index de13fa1aefdb..904cb7d245bc 100644 --- a/crates/store/re_log_encoding/Cargo.toml +++ b/crates/store/re_log_encoding/Cargo.toml @@ -53,10 +53,12 @@ re_tracing.workspace = true # External: arrow = { workspace = true, features = ["ipc"] } +itertools.workspace = true lz4_flex = { workspace = true } parking_lot.workspace = true thiserror.workspace = true tracing.workspace = true +xxhash-rust.workspace = true # Optional external dependencies: bytes = { workspace = true, optional = true } diff --git a/crates/store/re_log_encoding/src/rrd/decoder/iterator.rs b/crates/store/re_log_encoding/src/rrd/decoder/iterator.rs index 170e57cbb1bd..f1b3cc1f8b70 100644 --- a/crates/store/re_log_encoding/src/rrd/decoder/iterator.rs +++ b/crates/store/re_log_encoding/src/rrd/decoder/iterator.rs @@ -139,7 +139,7 @@ impl std::iter::Iterator for DecoderI // …and the underlying decoder already considers that it's done (i.e. it's // waiting for a whole new stream to begin): time to stop. - Ok(None) if self.decoder.state == DecoderState::StreamHeader => { + Ok(None) if self.decoder.state == DecoderState::WaitingForStreamHeader => { return None; } diff --git a/crates/store/re_log_encoding/src/rrd/decoder/state_machine.rs b/crates/store/re_log_encoding/src/rrd/decoder/state_machine.rs index 436198ad6f4d..fd535a2cc876 100644 --- a/crates/store/re_log_encoding/src/rrd/decoder/state_machine.rs +++ b/crates/store/re_log_encoding/src/rrd/decoder/state_machine.rs @@ -2,12 +2,14 @@ use std::collections::VecDeque; use std::io::Cursor; use std::io::Read as _; +use itertools::Itertools as _; use re_build_info::CrateVersion; -use crate::CachingApplicationIdInjector; -use crate::rrd::{ - CodecError, Decodable as _, DecodeError, DecoderEntrypoint, EncodingOptions, Serializer, - StreamHeader, +use crate::StreamFooter; +use crate::rrd::MessageHeader; +use crate::{ + CachingApplicationIdInjector, CodecError, Decodable as _, DecodeError, DecoderEntrypoint, + EncodingOptions, Serializer, StreamHeader, }; // --- @@ -54,39 +56,55 @@ pub struct Decoder { /// The application id cache used for migrating old data. pub(crate) app_id_cache: CachingApplicationIdInjector, - pub(crate) _decodable: std::marker::PhantomData, + _decodable: std::marker::PhantomData, } -/// /// ```text,ignore -/// StreamHeader -/// | -/// v -/// MessageHeader -/// ^ | -/// | | -/// ---Message<-- +/// +--> WaitingForStreamHeader -----> Aborted +/// | | +/// | v +/// +--- WaitingForMessageHeader <---+ +/// | | | +/// | v | +/// | WaitingForMessagePayload ---+ +/// | | +/// | v +/// +--- WaitingForStreamFooter /// ``` #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum DecoderState { - /// The beginning of the stream. + /// The beginning of a stream. /// - /// The stream header contains the magic bytes (e.g. `RRF2`), the encoded version, and the + /// The [`StreamHeader`] contains the magic bytes (e.g. `RRF2`), the encoded version, and the /// encoding options. /// - /// After the stream header is read once, the state machine will only ever switch between - /// `MessageHeader` and `Message` - StreamHeader, + /// We can come back to this state at any point because multiple RRD streams might have been + /// concatenated together (e.g. `cat *.rrd | rerun`). + WaitingForStreamHeader, - /// The beginning of a Protobuf message. - MessageHeader, + /// Stream in progress. + /// + /// The [`MessageHeader`] indicates what kind of payload this is and how large it is. + /// + /// We will rebound indefinitely + WaitingForMessageHeader, - /// The message content, serialized using `Protobuf`. + /// A [`MessageHeader`] was parsed, now we're waiting for the associated payload. /// /// Compression is only applied to individual `ArrowMsg`s, instead of the entire stream. - Message(crate::rrd::MessageHeader), + WaitingForMessagePayload(MessageHeader), - /// Stop reading. + /// We hit a message of kind `MessageKind::End`, which means a footer must be following it. + /// + /// The [`StreamFooter`] contains information about where the RRD manifests can be found, but we + /// won't be doing anything with it in this case since we're just going through the data in order. + /// + /// Once the footer is parsed, we can wait for a new stream to begin. + WaitingForStreamFooter, + + /// The stream entered an irrecoverable state and cannot yield data anymore. However, most of the + /// valuable data was already decoded, so we merely log an error and stop yielding more + /// messages rather than bubbling up the error all the way to the end user. Aborted, } @@ -98,7 +116,7 @@ impl Decoder { // Note: `options` are filled in once we read `FileHeader`, so this value does not matter. options: EncodingOptions::PROTOBUF_UNCOMPRESSED, byte_chunks: ByteChunkBuffer::new(), - state: DecoderState::StreamHeader, + state: DecoderState::WaitingForStreamHeader, app_id_cache: CachingApplicationIdInjector::default(), _decodable: std::marker::PhantomData::, } @@ -121,7 +139,7 @@ impl Decoder { })) = result { re_log::warn_once!( - "Dropping message without application id which arrived before `SetStoreInfo` \ + "dropping message without application id which arrived before `SetStoreInfo` \ (kind: {store_kind}, recording id: {recording_id}." ); } else { @@ -132,29 +150,56 @@ impl Decoder { /// Read the next message in the stream. fn try_read_impl(&mut self) -> Result, DecodeError> { + // Enable this for easy debugging of the state machine. + if false { + use bytes::Buf as _; + let num_bytes = self + .byte_chunks + .queue + .iter() + .map(|v| v.remaining()) + .sum::(); + + eprintln!("state: {:?} (bytes available: {num_bytes})", self.state); + + let mut peeked = [0u8; 32]; + self.byte_chunks.try_peek(&mut peeked); + let peeked = peeked + .into_iter() + .map(|b| match b { + b'a'..=b'z' | b'A'..=b'Z' | b'0'..=b'9' => String::from(b as char), + v => v.to_string(), + }) + .join(", "); + + eprintln!("upcoming 32 bytes: [{peeked}]"); + } + match self.state { - DecoderState::StreamHeader => { + DecoderState::WaitingForStreamHeader => { let is_first_header = self.byte_chunks.num_read() == 0; let position = self.byte_chunks.num_read(); if let Some(header_data) = self.byte_chunks.try_read(StreamHeader::ENCODED_SIZE_BYTES) { - re_log::trace!(?header_data, "Decoding StreamHeader"); - // header contains version and compression options let version_and_options = StreamHeader::from_rrd_bytes(&header_data) .and_then(|h| h.to_version_and_options()); let (version, options) = match version_and_options { Ok(ok) => ok, Err(err) => { - // We expected a header, but didn't find one! if is_first_header { + // We expected a header, but didn't find one! return Err(err.into()); } else { + // A bunch of weird trailing bytes means we're now in an irrecoverable state, but + // it doesn't change the fact that we've just successfully yielded an entire stream's + // worth of data. So, instead of bubling up errors all the way to the end user, + // merely log one here stop the state machine forever. re_log::error!( is_first_header, position, - "Trailing bytes in rrd stream: {header_data:?} ({err})" + "trailing bytes in rrd stream: {header_data:?} ({err})" ); self.state = DecoderState::Aborted; return Ok(None); @@ -165,34 +210,38 @@ impl Decoder { re_log::trace!( version = version.to_string(), ?options, - "Found Stream Header" + "found StreamHeader" ); self.version = Some(version); self.options = options; match self.options.serializer { - Serializer::Protobuf => self.state = DecoderState::MessageHeader, + Serializer::Protobuf => self.state = DecoderState::WaitingForMessageHeader, } // we might have data left in the current byte chunk, immediately try to read // length of the next message. return self.try_read(); } + + // Not enough data yet -- wait to be fed and called back once again. } - DecoderState::MessageHeader => { - let mut peeked = [0u8; crate::rrd::MessageHeader::ENCODED_SIZE_BYTES]; + DecoderState::WaitingForMessageHeader => { + let mut peeked = [0u8; MessageHeader::ENCODED_SIZE_BYTES]; if self.byte_chunks.try_peek(&mut peeked) == peeked.len() { - let header = match crate::rrd::MessageHeader::from_rrd_bytes(&peeked) { + let header = match MessageHeader::from_rrd_bytes(&peeked) { Ok(header) => header, - Err(crate::rrd::CodecError::HeaderDecoding(_)) => { - // We failed to decode a `MessageHeader`: it might be because the - // stream is corrupt, or it might be because it just switched to a - // different, concatenated recording without having the courtesy of - // announcing it via an EOS marker. - self.state = DecoderState::StreamHeader; + Err(crate::rrd::CodecError::FrameDecoding(_)) => { + // We failed to decode a `MessageHeader`: it might be because the stream is corrupt, + // or it might be because it just switched to a different, concatenated recording + // without having the courtesy of announcing it via an EOS marker. + // + // TODO(cmc): These kinds of peeking shenanigans should never be necessary, need to + // write a proposal for RRF3 that addresses these issues and more. + self.state = DecoderState::WaitingForStreamHeader; return self.try_read(); } @@ -200,68 +249,145 @@ impl Decoder { }; self.byte_chunks - .try_read(crate::rrd::MessageHeader::ENCODED_SIZE_BYTES) + .try_read(MessageHeader::ENCODED_SIZE_BYTES) .expect("reading cannot fail if peeking worked"); - re_log::trace!(?header, "MessageHeader"); + re_log::trace!(?header, "found MessageHeader"); - self.state = DecoderState::Message(header); - // we might have data left in the current byte chunk, immediately try to read - // the message content. + self.state = DecoderState::WaitingForMessagePayload(header); return self.try_read(); } + + // Not enough data yet -- wait to be fed and called back once again. } - DecoderState::Message(header) => { + DecoderState::WaitingForMessagePayload(header) => { let start_offset = self.byte_chunks.num_read() as u64; if let Some(bytes) = self.byte_chunks.try_read(header.len as usize) { - re_log::trace!(?header, "Read message"); - let bytes_len = bytes.len() as u64; let byte_span = re_chunk::Span { start: start_offset, len: bytes_len, }; let message = match T::decode( - bytes, + bytes.clone(), byte_span, header.kind, &mut self.app_id_cache, self.version, ) { Ok(msg) => msg, + Err(err) => { // We successfully parsed a header, but decided to drop the message altogether. // We must go back to looking for headers, or the decoder will just be stuck in a dead // state forever. - self.state = DecoderState::MessageHeader; + self.state = DecoderState::WaitingForMessageHeader; return Err(err.into()); } }; if let Some(message) = message { - re_log::trace!("Decoded new message"); - - self.state = DecoderState::MessageHeader; + self.state = DecoderState::WaitingForMessageHeader; return Ok(Some(message)); } else { - re_log::trace!("End of stream - expecting a new Streamheader"); + re_log::trace!( + "End of stream - expecting either a StreamFooter or a new Streamheader" + ); + + if !bytes.is_empty() { + let rrd_footer = + re_protos::log_msg::v1alpha1::RrdFooter::from_rrd_bytes(&bytes)?; + _ = rrd_footer; // TODO(cmc): we'll use that in the next PR, promise. + + // A non-empty ::End message means there must be a footer ahead, no exception. + self.state = DecoderState::WaitingForStreamFooter; + } else { + // There are 2 possible scenarios where we can end up here: + // * The recording doesn't contain any data messages (e.g. it's empty except for + // a `SetStoreInfo` message). + // * Backward compatibility: the payload is empty (i.e. `header.len == 0`) because the + // `End` message was written by a legacy encoder that predates the introduction of footers. + // + // Either way, we have to expect a footer next, since we don't know for sure which scenario + // we're in. We could check the encoder version and such, but that's just superfluous complexity + // since `WaitingForStreamFooter` already knows how to deal with optional footers anyway. + self.state = DecoderState::WaitingForStreamFooter; + } - // `None` means an end-of-stream marker was hit, but there might be another concatenated - // stream behind, so try to start all over again. - self.state = DecoderState::StreamHeader; return self.try_read(); } } + + // Not enough data yet -- wait to be fed and called back once again. } - DecoderState::Aborted => { - return Ok(None); + DecoderState::WaitingForStreamFooter => { + // NOTE: We're not peeking here! If we enter this state, then there must be a footer + // ahead, no exception, otherwise that's a violation of the framing protocol. + let position = self.byte_chunks.num_read(); + if let Some(bytes) = self + .byte_chunks + .try_read(crate::rrd::StreamFooter::ENCODED_SIZE_BYTES) + { + match crate::rrd::StreamFooter::from_rrd_bytes(&bytes) { + Ok(footer) => { + re_log::trace!(?footer, "found StreamFooter"); + + let StreamFooter { + fourcc: _, + identifier: _, + rrd_footer_byte_offset_from_start_excluding_header: + rrd_manifest_byte_offset_from_start_excluding_header, + rrd_footer_byte_size_excluding_header: + rrd_manifest_byte_size_excluding_header, + crc_excluding_header: _, + } = footer; + + let start = rrd_manifest_byte_offset_from_start_excluding_header; + let end = start + rrd_manifest_byte_size_excluding_header; + + if end > position as u64 { + // The RRD manifest cannot possibly end after the footer starts. + re_log::error!( + position = self.byte_chunks.num_read(), + bytes = ?bytes, + ?footer, + err = "offsets are invalid", + "corrupt footer in rrd stream" + ); + } + + // And now we start all over. + self.state = DecoderState::WaitingForStreamHeader; + return self.try_read(); + } + + Err(err) => { + // A corrupt footer means we're now in an irrecoverable state, but it doesn't change the + // fact that we've just successfully yielded an entire stream's worth of data. So, instead + // of bubling up errors all the way to the end user, merely log one here stop the state + // machine forever. + re_log::error!( + position = self.byte_chunks.num_read(), + bytes = ?bytes, + %err, + "corrupt footer in rrd stream" + ); + self.state = DecoderState::Aborted; + return Ok(None); + } + } + } + + // Not enough data yet -- wait to be fed and called back once again. } + + DecoderState::Aborted => return Ok(None), } - Ok(None) + Ok(None) // Not enough data yet -- wait to be fed and called back once again. } } @@ -585,7 +711,7 @@ mod tests { fn stream_irregular_chunks_protobuf() { // this attempts to stress-test `try_read` with byte chunks of various sizes - let (input, data) = test_data(EncodingOptions::PROTOBUF_COMPRESSED, 16); + let (input, data) = test_data(EncodingOptions::PROTOBUF_COMPRESSED, 6); let mut data = Cursor::new(data); let mut decoder = DecoderApp::new(); diff --git a/crates/store/re_log_encoding/src/rrd/decoder/stream.rs b/crates/store/re_log_encoding/src/rrd/decoder/stream.rs index 5e12329e2e26..e4b05a408f78 100644 --- a/crates/store/re_log_encoding/src/rrd/decoder/stream.rs +++ b/crates/store/re_log_encoding/src/rrd/decoder/stream.rs @@ -151,7 +151,7 @@ impl Stream for DecoderSt // …and the underlying decoder already considers that it's done (i.e. it's // waiting for a whole new stream to begin): time to stop. - Ok(None) if decoder.state == DecoderState::StreamHeader => { + Ok(None) if decoder.state == DecoderState::WaitingForStreamHeader => { return std::task::Poll::Ready(None); } diff --git a/crates/store/re_log_encoding/src/rrd/encoder.rs b/crates/store/re_log_encoding/src/rrd/encoder.rs index 8502f29b3182..c63bf1f978d2 100644 --- a/crates/store/re_log_encoding/src/rrd/encoder.rs +++ b/crates/store/re_log_encoding/src/rrd/encoder.rs @@ -5,11 +5,11 @@ use std::borrow::Borrow; use re_build_info::CrateVersion; use re_chunk::{ChunkError, ChunkResult}; use re_log_types::LogMsg; +use re_sorbet::SorbetError; -use crate::ToTransport as _; -use crate::rrd::{ +use crate::{ CodecError, Compression, Encodable as _, EncodingOptions, MessageHeader, MessageKind, - Serializer, StreamHeader, + Serializer, StreamFooter, StreamHeader, ToTransport as _, }; // ---------------------------------------------------------------------------- @@ -31,6 +31,9 @@ pub enum EncodeError { #[error("Chunk error: {0}")] Chunk(Box), + + #[error("Sorbet error: {0}")] + Sorbet(Box), } const _: () = assert!( @@ -50,6 +53,12 @@ impl From for EncodeError { } } +impl From for EncodeError { + fn from(err: SorbetError) -> Self { + Self::Sorbet(Box::new(err)) + } +} + // ---------------------------------------------------------------------------- /// Encode a stream of [`LogMsg`] into an `.rrd` file. @@ -64,13 +73,63 @@ pub struct Encoder { /// Optional so that we can `take()` it in `into_inner`, while still being allowed to implement `Drop`. write: Option, - /// So we don't ever successfully write partial messages. + /// How many bytes written out so far? + num_written: u64, + + /// * So we don't ever successfully write partial messages. + /// * Because `prost` only supports buffers, not IO traits. scratch: Vec, - /// Tracks whether the end-of-stream marker has been written out already. + /// Tracks the state required to build the RRD manifest for this stream. + /// + /// If set to `None`, the footer will not be computed. + /// + /// Calling [`Self::append_transport`] will automatically disable footers. + footer_state: Option, + + /// Tracks whether the end-of-stream marker, and optionally the associated footer, have been + /// written out already. is_finished: bool, } +/// The accumulated state used to build the footer when closing the [`Encoder`]. +/// +/// This is automatically updated when calling [`Encoder::append`]. +#[derive(Default)] +struct FooterState { + /// What is the currently active recording ID according to the state of the encoder, if any? + /// + /// Put another way: was there a `SetStoreInfo` message earlier in the stream? If so, we will + /// want to override the recording ID of each chunk with that one (because that's the existing + /// behavior, certainly not because it's nice). + recording_id_scope: Option, +} + +impl FooterState { + #[expect(clippy::unnecessary_wraps)] // won't stay for long + fn append( + &mut self, + _byte_offset: u64, + _byte_size: u64, + msg: &re_log_types::LogMsg, + ) -> Result<(), EncodeError> { + match msg { + LogMsg::SetStoreInfo(msg) => { + self.recording_id_scope = Some(msg.info.store_id.clone()); + } + + LogMsg::ArrowMsg(_, _) | LogMsg::BlueprintActivationCommand(_) => {} + } + + Ok(()) + } + + #[expect(clippy::unnecessary_wraps, clippy::unused_self)] // won't stay for long + fn finish(self) -> Result { + Ok(crate::RrdFooter {}) + } +} + impl Encoder> { pub fn local() -> Result { Self::new_eager( @@ -126,7 +185,9 @@ impl Encoder { serializer: options.serializer, compression: options.compression, write: Some(write), + num_written: out.len() as u64, scratch: Vec::new(), + footer_state: Some(FooterState::default()), is_finished: false, }) } @@ -137,22 +198,54 @@ impl Encoder { return Err(EncodeError::AlreadyFinished); } - if self.write.is_none() { + let Some(w) = self.write.as_mut() else { return Err(EncodeError::AlreadyUnwrapped); - } + }; re_tracing::profile_function!(); - let message = message.to_transport(self.compression)?; - // Safety: the compression settings of this message are consistent with this stream. - #[expect(unsafe_code)] - unsafe { - self.append_transport(&message) + let transport = message.to_transport(self.compression)?; + + let byte_offset_excluding_header = + self.num_written + crate::MessageHeader::ENCODED_SIZE_BYTES as u64; + + self.scratch.clear(); + let n = match self.serializer { + Serializer::Protobuf => { + transport.to_rrd_bytes(&mut self.scratch)?; + let n = w + .write_all(&self.scratch) + .map(|_| self.scratch.len() as u64) + .map_err(EncodeError::Write)?; + self.num_written += n; + n + } + }; + + let byte_size_excluding_header = n - crate::MessageHeader::ENCODED_SIZE_BYTES as u64; + + if let Some(footer_state) = self.footer_state.as_mut() { + footer_state.append( + byte_offset_excluding_header, + byte_size_excluding_header, + message, + )?; } + + Ok(n) + } + + /// Instructs the encoder to _not_ emit a footer at the end of the stream. + /// + /// This cannot be reverted. + pub fn do_not_emit_footer(&mut self) { + self.footer_state = None; } /// Returns the size in bytes of the encoded data. /// + /// ⚠️ This implies [`Self::do_not_emit_footer`]. ⚠️ + /// /// ## Safety /// /// `message` must respect the global settings of the encoder (e.g. the compression used), @@ -166,19 +259,27 @@ impl Encoder { return Err(EncodeError::AlreadyFinished); } + re_tracing::profile_function!(); + + // We cannot update the RRD manifest without decoding the message, which would defeat the + // entire purposes of using this method in the first place. + // Therefore, we disable footers if and when this method is used. + self.do_not_emit_footer(); + let Some(w) = self.write.as_mut() else { return Err(EncodeError::AlreadyUnwrapped); }; - re_tracing::profile_function!(); - self.scratch.clear(); match self.serializer { Serializer::Protobuf => { message.to_rrd_bytes(&mut self.scratch)?; - w.write_all(&self.scratch) - .map(|_| self.scratch.len() as _) - .map_err(EncodeError::Write) + let n = w + .write_all(&self.scratch) + .map(|_| self.scratch.len() as u64) + .map_err(EncodeError::Write)?; + self.num_written += n; + Ok(n) } } } @@ -200,22 +301,52 @@ impl Encoder { return Err(EncodeError::AlreadyUnwrapped); }; - match self.serializer { - Serializer::Protobuf => { - // TODO(cmc): the extra heap-alloc and copy could be easily avoided with the - // introduction of an InMemoryWriter trait or similar. In practice it makes no - // difference and the cognitive overhead of this crate is already through the roof. - let mut header = Vec::new(); - MessageHeader { - kind: MessageKind::End, - len: 0, - } - .to_rrd_bytes(&mut header)?; - w.write_all(&header)?; - } + self.is_finished = true; + + let Some(footer_state) = self.footer_state.take() else { + return Ok(()); + }; + + // TODO(cmc): the extra heap-allocs and copies could be easily avoided with the + // introduction of an InMemoryWriter trait or similar. In practice it makes no + // difference and the cognitive overhead of this crate is already through the roof. + + use re_protos::external::prost::Message as _; + + // Message Header (::End) + + let rrd_footer = footer_state.finish()?; + let rrd_footer = rrd_footer.to_transport(())?; + + let mut out_header = Vec::new(); + MessageHeader { + kind: MessageKind::End, + len: rrd_footer.encoded_len() as u64, } + .to_rrd_bytes(&mut out_header)?; + w.write_all(&out_header).map_err(EncodeError::Write)?; + self.num_written += out_header.len() as u64; - self.is_finished = true; + let end_msg_byte_offset_from_start_excluding_header = self.num_written; + + // Message payload (re_protos::RrdFooter) + + let mut out_rrd_footer = Vec::new(); + rrd_footer.to_rrd_bytes(&mut out_rrd_footer)?; + w.write_all(&out_rrd_footer).map_err(EncodeError::Write)?; + self.num_written += out_rrd_footer.len() as u64; + + // StreamFooter + + let mut out_stream_footer = Vec::new(); + StreamFooter::from_rrd_footer_bytes( + end_msg_byte_offset_from_start_excluding_header, + &out_rrd_footer, + ) + .to_rrd_bytes(&mut out_stream_footer)?; + w.write_all(&out_stream_footer) + .map_err(EncodeError::Write)?; + self.num_written += out_stream_footer.len() as u64; Ok(()) } @@ -255,8 +386,6 @@ impl Encoder { } } -// TODO(cmc): It seems a bit suspicious to me that we send an EOS marker on drop, but don't flush. -// But I don't want to change any flushing behavior at the moment, so I'll keep it that way for now. impl std::ops::Drop for Encoder { fn drop(&mut self) { if self.write.is_none() { @@ -267,5 +396,9 @@ impl std::ops::Drop for Encoder { if let Err(err) = self.finish() { re_log::warn!("encoder couldn't be finished: {err}"); } + + if let Err(err) = self.flush_blocking() { + re_log::warn!("encoder couldn't be flushed: {err}"); + } } } diff --git a/crates/store/re_log_encoding/src/rrd/errors.rs b/crates/store/re_log_encoding/src/rrd/errors.rs index 2e7533dbcea5..ad745e295c01 100644 --- a/crates/store/re_log_encoding/src/rrd/errors.rs +++ b/crates/store/re_log_encoding/src/rrd/errors.rs @@ -1,4 +1,7 @@ use re_build_info::CrateVersion; +use re_chunk::ChunkError; + +pub type CodecResult = Result; /// Possible errors when encoding and decoding RRD data. /// @@ -25,8 +28,17 @@ pub enum CodecError { #[error("Data was from an old, incompatible Rerun version")] OldRrdVersion, - #[error("Failed to decode message header {0}")] - HeaderDecoding(String), + /// Something went wrong when attempting to decode any kind of RRD frame. + /// + /// There are 3 kinds of RRD frames: + /// * [`crate::StreamHeader`] + /// * [`crate::MessageHeader`] + /// * [`crate::StreamFooter`] + #[error("Failed to decode frame: {0}")] + FrameDecoding(String), + + #[error("CRC check failed: expected {expected:08x} but got {got:08x}")] + CrcMismatch { expected: u32, got: u32 }, #[error("Arrow IPC deserialization error: {0}")] ArrowDeserialization(::arrow::error::ArrowError), @@ -43,6 +55,9 @@ pub enum CodecError { #[error("Could not convert type from protobuf: {0}")] TypeConversion(Box), + #[error("Invalid chunk: {0}")] + Chunk(Box), + /// This is returned when `ArrowMsg` or `BlueprintActivationCommand` are received with a legacy /// store id (missing the application id) before the corresponding `SetStoreInfo` message. In /// that case, the best effort is to recover by dropping such message with a warning. @@ -79,6 +94,12 @@ impl From for CodecError { } } +impl From for CodecError { + fn from(value: ChunkError) -> Self { + Self::Chunk(Box::new(value)) + } +} + impl From for CodecError { fn from(value: re_protos::common::v1alpha1::ext::StoreIdMissingApplicationIdError) -> Self { Self::StoreIdMissingApplicationId { diff --git a/crates/store/re_log_encoding/src/rrd/footer/instances.rs b/crates/store/re_log_encoding/src/rrd/footer/instances.rs new file mode 100644 index 000000000000..576e3707fa0c --- /dev/null +++ b/crates/store/re_log_encoding/src/rrd/footer/instances.rs @@ -0,0 +1,14 @@ +/// This is the message type that is passed in the footer of RRD streams. +/// +/// It is possible to break that invariant by concatenating streams using external tools, +/// e.g. by doing something like `cat *.rrd > all_my_recordings.rrd`. +/// Passing that stream back through Rerun tools, e.g. `cat *.rrd | rerun rrd route > all_my_recordings.rrd`, +/// would once again guarantee that only one footer is present though. +/// I.e. that invariant holds as long as one stays within our ecosystem of tools. +/// +/// It is transported using the `MessageKind::End` tag. +/// +/// This is an application-level type, the associated transport-level type can be found +/// over at [`re_protos::log_msg::v1alpha1::RrdFooter`]. +#[derive(Default, Debug)] +pub struct RrdFooter {} diff --git a/crates/store/re_log_encoding/src/rrd/footer/mod.rs b/crates/store/re_log_encoding/src/rrd/footer/mod.rs new file mode 100644 index 000000000000..20e209ec5ec6 --- /dev/null +++ b/crates/store/re_log_encoding/src/rrd/footer/mod.rs @@ -0,0 +1,3 @@ +mod instances; + +pub use self::instances::RrdFooter; diff --git a/crates/store/re_log_encoding/src/rrd/headers.rs b/crates/store/re_log_encoding/src/rrd/frames.rs similarity index 50% rename from crates/store/re_log_encoding/src/rrd/headers.rs rename to crates/store/re_log_encoding/src/rrd/frames.rs index f02204ac9a49..68a2692f26a9 100644 --- a/crates/store/re_log_encoding/src/rrd/headers.rs +++ b/crates/store/re_log_encoding/src/rrd/frames.rs @@ -12,6 +12,13 @@ pub enum Serializer { Protobuf = 2, } +// TODO(cmc): None of these options make sense to have at the global scope and/or in the StreamHeader. +// * Global scope: that makes the decoder stateful in a very bad way (think e.g. about loading +// specific chunks straight from footer metadata). +// * StreamHeader: both of these are concerns that only apply to message payloads, and should therefore +// be flags in the MessageHeader. +// In practice I believe both are effectively completely ignored everywhere it matters. They need +// to go away for real though. #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub struct EncodingOptions { pub compression: Compression, @@ -83,6 +90,15 @@ impl Decodable for EncodingOptions { // --- +/// The first frame in an RRD stream. +/// +/// During normal operations, there can only be a single [`StreamHeader`] per RRD stream. +/// +/// It is possible to break that invariant by concatenating streams using external tools, +/// e.g. by doing something like `cat *.rrd > all_my_recordings.rrd`. +/// Passing that stream back through Rerun tools, e.g. `cat *.rrd | rerun rrd route > all_my_recordings.rrd`, +/// would once again guarantee that only one header is present though. +/// I.e. that invariant holds as long as one stays within our ecosystem of tools. #[derive(Debug, Clone, Copy)] pub struct StreamHeader { pub fourcc: [u8; 4], @@ -157,7 +173,7 @@ impl Encodable for StreamHeader { impl Decodable for StreamHeader { fn from_rrd_bytes(data: &[u8]) -> Result { if data.len() != Self::ENCODED_SIZE_BYTES { - return Err(crate::rrd::CodecError::HeaderDecoding(format!( + return Err(crate::rrd::CodecError::FrameDecoding(format!( "invalid StreamHeader length (expected {} but got {})", Self::ENCODED_SIZE_BYTES, data.len() @@ -190,12 +206,177 @@ impl Decodable for StreamHeader { } } +// --- + +/// The last frame in an RRD stream. +/// +/// During normal operations, there can only be a single [`StreamFooter`] per RRD stream. +/// +/// It is possible to break that invariant by concatenating streams using external tools, +/// e.g. by doing something like `cat *.rrd > all_my_recordings.rrd`. +/// Passing that stream back through Rerun tools, e.g. `cat *.rrd | rerun rrd route > all_my_recordings.rrd`, +/// would once again guarantee that only one footer is present though. +/// I.e. that invariant holds as long as one stays within our ecosystem of tools. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct StreamFooter { + /// Same as the one in the [`StreamHeader`], i.e. [`crate::rrd::RRD_FOURCC`]. + /// + /// Used to go straight to the footer of an RRD stream ("backwards"). + pub fourcc: [u8; 4], // RRF2 + + /// A unique identifier to disambiguate the footer from other frames. + /// + /// Always set to [`Self::RRD_IDENTIFIER`]. + pub identifier: [u8; 4], // FOOT + + /// The position in bytes where the serialized [`RrdFooter`] payload starts, excluding the + /// message header. + /// + /// I.e. a transport-level [`RrdFooter`] can be decoded from the bytes at (pseudo-code): + /// ```text + /// let start = stream_footer.rrd_footer_byte_offset_from_start_excluding_header; + /// let end = start + stream_footer.rrd_footer_byte_size_excluding_header; + /// let bytes = &file[start..end]; + /// let rrd_footer = re_protos::RrdFooter::decode(bytes)?; + /// let rrd_footer = rrd_footer.to_application()?; + /// ``` + /// + /// [`RrdFooter`]: [crate::RrdFooter] + pub rrd_footer_byte_offset_from_start_excluding_header: u64, + + /// The size in bytes of the serialized [`RrdFooter`] payload, excluding the message header. + /// + /// This is guaranteed to be the same value as the `len` found in the associated message + /// header, but duplicating it here makes it possible for decoders to get everything they + /// need using a single IO. + /// + /// [`RrdFooter`]: [crate::RrdFooter] + pub rrd_footer_byte_size_excluding_header: u64, + + /// Checksum for the [`RrdFooter`] payload. + /// + /// The footer is most often accessed by jumping straight to it, so this is a nice extra safety + /// to make sure that we didn't just get "lucky" (or unlucky, rather) when jumping around and + /// parsing random bytes. + /// + /// [`RrdFooter`]: [crate::RrdFooter] + // + // TODO(cmc): It shouldn't be the job of the StreamFooter to carry checksums for a specific + // message's payload. All frames should have identifiers and CRCs for both themselves and their + // payloads, in which case this CRC would belong in the MessageHeader. + pub crc_excluding_header: u32, +} + +impl StreamFooter { + pub const ENCODED_SIZE_BYTES: usize = 28; + pub const CRC_SEED: u32 = 7850921; // "RERUN" in base 26 (A=0, Z=25) + pub const RRD_IDENTIFIER: [u8; 4] = *b"FOOT"; + + pub fn new( + rrd_footer_byte_offset_from_start_excluding_header: u64, + rrd_footer_byte_size_excluding_header: u64, + crc_excluding_header: u32, + ) -> Self { + Self { + fourcc: crate::RRD_FOURCC, + identifier: Self::RRD_IDENTIFIER, + rrd_footer_byte_offset_from_start_excluding_header, + rrd_footer_byte_size_excluding_header, + crc_excluding_header, + } + } + + pub fn from_rrd_footer_bytes( + rrd_footer_byte_offset_from_start_excluding_header: u64, + rrd_footer_bytes: &[u8], + ) -> Self { + let crc_excluding_header = xxhash_rust::xxh32::xxh32(rrd_footer_bytes, Self::CRC_SEED); + Self { + fourcc: crate::RRD_FOURCC, + identifier: Self::RRD_IDENTIFIER, + rrd_footer_byte_offset_from_start_excluding_header, + rrd_footer_byte_size_excluding_header: rrd_footer_bytes.len() as u64, + crc_excluding_header, + } + } +} + +impl Encodable for StreamFooter { + fn to_rrd_bytes(&self, out: &mut Vec) -> Result { + let Self { + fourcc, + identifier, + rrd_footer_byte_offset_from_start_excluding_header, + rrd_footer_byte_size_excluding_header, + crc_excluding_header: crc, + } = self; + + let before = out.len() as u64; + + out.extend_from_slice(fourcc); + out.extend_from_slice(identifier); + out.extend_from_slice(&rrd_footer_byte_offset_from_start_excluding_header.to_le_bytes()); + out.extend_from_slice(&rrd_footer_byte_size_excluding_header.to_le_bytes()); + out.extend_from_slice(&crc.to_le_bytes()); + + let n = out.len() as u64 - before; + assert_eq!(Self::ENCODED_SIZE_BYTES as u64, n); + + Ok(n) + } +} + +impl Decodable for StreamFooter { + fn from_rrd_bytes(data: &[u8]) -> Result { + if data.len() != Self::ENCODED_SIZE_BYTES { + return Err(crate::rrd::CodecError::FrameDecoding(format!( + "invalid StreamFooter length (expected {} but got {})", + Self::ENCODED_SIZE_BYTES, + data.len() + ))); + } + + let to_array_4b = |slice: &[u8]| slice.try_into().expect("always returns an Ok() variant"); + + let fourcc: [u8; 4] = to_array_4b(&data[0..4]); + if fourcc != crate::RRD_FOURCC { + return Err(crate::rrd::CodecError::FrameDecoding(format!( + "invalid StreamFooter FourCC (expected {:?} but got {:?})", + crate::RRD_FOURCC, + fourcc, + ))); + } + + let identifier: [u8; 4] = to_array_4b(&data[4..8]); + if identifier != Self::RRD_IDENTIFIER { + return Err(crate::rrd::CodecError::FrameDecoding(format!( + "invalid StreamFooter identifier (expected {:?} but got {:?})", + Self::RRD_IDENTIFIER, + identifier, + ))); + } + + let rrd_footer_byte_offset_from_start_excluding_header = + u64::from_le_bytes(data[8..16].try_into().expect("cannot fail, checked above")); + let rrd_footer_byte_size_excluding_header = + u64::from_le_bytes(data[16..24].try_into().expect("cannot fail, checked above")); + let crc = u32::from_le_bytes(data[24..28].try_into().expect("cannot fail, checked above")); + + Ok(Self { + fourcc, + identifier, + rrd_footer_byte_offset_from_start_excluding_header, + rrd_footer_byte_size_excluding_header, + crc_excluding_header: crc, + }) + } +} + // --- MessageHeader --- -#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] #[repr(u64)] pub enum MessageKind { - #[default] End = Self::END, SetStoreInfo = Self::SET_STORE_INFO, ArrowMsg = Self::ARROW_MSG, @@ -238,7 +419,7 @@ impl Encodable for MessageHeader { impl Decodable for MessageHeader { fn from_rrd_bytes(data: &[u8]) -> Result { if data.len() != Self::ENCODED_SIZE_BYTES { - return Err(crate::rrd::CodecError::HeaderDecoding(format!( + return Err(crate::rrd::CodecError::FrameDecoding(format!( "invalid MessageHeader length (expected {} but got {})", Self::ENCODED_SIZE_BYTES, data.len() @@ -252,7 +433,7 @@ impl Decodable for MessageHeader { MessageKind::ARROW_MSG => MessageKind::ArrowMsg, MessageKind::BLUEPRINT_ACTIVATION_COMMAND => MessageKind::BlueprintActivationCommand, _ => { - return Err(crate::rrd::CodecError::HeaderDecoding(format!( + return Err(crate::rrd::CodecError::FrameDecoding(format!( "unknown MessageHeader kind: {kind:?}" ))); } diff --git a/crates/store/re_log_encoding/src/rrd/log_msg.rs b/crates/store/re_log_encoding/src/rrd/log_msg.rs index bb6b502718d4..105c5c350815 100644 --- a/crates/store/re_log_encoding/src/rrd/log_msg.rs +++ b/crates/store/re_log_encoding/src/rrd/log_msg.rs @@ -40,6 +40,26 @@ impl Encodable for re_protos::log_msg::v1alpha1::log_msg::Msg { } } +impl Encodable for re_protos::log_msg::v1alpha1::ArrowMsg { + fn to_rrd_bytes(&self, out: &mut Vec) -> Result { + use re_protos::external::prost::Message as _; + + let before = out.len() as u64; + self.encode(out)?; + Ok(out.len() as u64 - before) + } +} + +impl Encodable for re_protos::log_msg::v1alpha1::RrdFooter { + fn to_rrd_bytes(&self, out: &mut Vec) -> Result { + use re_protos::external::prost::Message as _; + + let before = out.len() as u64; + self.encode(out)?; + Ok(out.len() as u64 - before) + } +} + // NOTE: This is implemented for `Option<_>` because, in the native RRD protocol, the message kind // might be `MessageKind::End` (signifying end-of-stream), which has no representation in our Protobuf // definitions. I.e. `MessageKind::End` yields `None`. @@ -95,3 +115,10 @@ impl Decodable for re_protos::log_msg::v1alpha1::BlueprintActivationCommand { Ok(Self::decode(data)?) } } + +impl Decodable for re_protos::log_msg::v1alpha1::RrdFooter { + fn from_rrd_bytes(data: &[u8]) -> Result { + use re_protos::external::prost::Message as _; + Ok(Self::decode(data)?) + } +} diff --git a/crates/store/re_log_encoding/src/rrd/mod.rs b/crates/store/re_log_encoding/src/rrd/mod.rs index e6a412d098db..ee92814943a4 100644 --- a/crates/store/re_log_encoding/src/rrd/mod.rs +++ b/crates/store/re_log_encoding/src/rrd/mod.rs @@ -16,7 +16,8 @@ //! state machines that turn collections of `Encodable`s and `Decodable`s into actual RRD streams. mod errors; -mod headers; +mod footer; +mod frames; mod log_msg; #[cfg(feature = "decoder")] @@ -32,10 +33,11 @@ mod file_sink; #[cfg(feature = "stream_from_http")] pub mod stream_from_http; -pub use self::errors::{CodecError, NotAnRrdError, OptionsError}; -pub use self::headers::{ +pub use self::errors::{CodecError, CodecResult, NotAnRrdError, OptionsError}; +pub use self::footer::RrdFooter; +pub use self::frames::{ Compression, CrateVersion, EncodingOptions, MessageHeader, MessageKind, Serializer, - StreamHeader, + StreamFooter, StreamHeader, }; #[cfg(feature = "decoder")] diff --git a/crates/store/re_log_encoding/src/transport_to_app.rs b/crates/store/re_log_encoding/src/transport_to_app.rs index 04fb7098a759..aa3b4b94ceae 100644 --- a/crates/store/re_log_encoding/src/transport_to_app.rs +++ b/crates/store/re_log_encoding/src/transport_to_app.rs @@ -35,7 +35,7 @@ impl ToTransport for re_log_types::LogMsg { type Context<'a> = crate::rrd::Compression; fn to_transport(&self, compression: Self::Context<'_>) -> Result { - log_msg_to_proto(self, compression) + log_msg_app_to_transport(self, compression) } } @@ -47,7 +47,16 @@ impl ToTransport for re_log_types::ArrowMsg { &self, (store_id, compression): Self::Context<'_>, ) -> Result { - arrow_msg_to_proto(self, store_id, compression) + arrow_msg_app_to_transport(self, store_id, compression) + } +} + +impl ToTransport for crate::RrdFooter { + type Output = re_protos::log_msg::v1alpha1::RrdFooter; + type Context<'a> = (); + + fn to_transport(&self, _: Self::Context<'_>) -> Result { + Ok(Self::Output {}) } } @@ -67,7 +76,7 @@ impl ToApplication for re_protos::log_msg::v1alpha1::log_msg::Msg { &self, (app_id_injector, patched_version): Self::Context<'_>, ) -> Result { - let mut log_msg = log_msg_to_app(app_id_injector, self)?; + let mut log_msg = log_msg_transport_to_app(app_id_injector, self)?; if let Some(patched_version) = patched_version && let re_log_types::LogMsg::SetStoreInfo(msg) = &mut log_msg @@ -104,7 +113,16 @@ impl ToApplication for re_protos::log_msg::v1alpha1::ArrowMsg { type Context<'a> = (); fn to_application(&self, _context: Self::Context<'_>) -> Result { - arrow_msg_to_app(self) + arrow_msg_transport_to_app(self) + } +} + +impl ToApplication for re_protos::log_msg::v1alpha1::RrdFooter { + type Output = crate::RrdFooter; + type Context<'a> = (); + + fn to_application(&self, _context: Self::Context<'_>) -> Result { + Ok(Self::Output {}) } } @@ -118,7 +136,7 @@ impl ToApplication for re_protos::log_msg::v1alpha1::ArrowMsg { /// /// The provided [`ApplicationIdInjector`] must be shared across all calls for the same stream. #[tracing::instrument(level = "trace", skip_all)] -fn log_msg_to_app( +fn log_msg_transport_to_app( app_id_injector: &mut I, message: &re_protos::log_msg::v1alpha1::log_msg::Msg, ) -> Result { @@ -134,7 +152,7 @@ fn log_msg_to_app( } Msg::ArrowMsg(arrow_msg) => { - let encoded = arrow_msg_to_app(arrow_msg)?; + let encoded = arrow_msg_transport_to_app(arrow_msg)?; //TODO(#10730): clean that up when removing 0.24 back compat let store_id: re_log_types::StoreId = match arrow_msg @@ -194,7 +212,7 @@ fn log_msg_to_app( /// Converts a transport-level `ArrowMsg` to its application-level counterpart. #[tracing::instrument(level = "trace", skip_all)] -fn arrow_msg_to_app( +fn arrow_msg_transport_to_app( arrow_msg: &re_protos::log_msg::v1alpha1::ArrowMsg, ) -> Result { re_tracing::profile_function!(); @@ -239,7 +257,7 @@ fn arrow_msg_to_app( /// Converts an application-level `LogMsg` to its transport-level counterpart. #[tracing::instrument(level = "trace", skip_all)] -fn log_msg_to_proto( +fn log_msg_app_to_transport( message: &re_log_types::LogMsg, compression: crate::rrd::Compression, ) -> Result { @@ -251,7 +269,7 @@ fn log_msg_to_proto( } re_log_types::LogMsg::ArrowMsg(store_id, arrow_msg) => { - let arrow_msg = arrow_msg_to_proto(arrow_msg, store_id.clone(), compression)?; + let arrow_msg = arrow_msg_app_to_transport(arrow_msg, store_id.clone(), compression)?; re_protos::log_msg::v1alpha1::log_msg::Msg::ArrowMsg(arrow_msg) } @@ -267,7 +285,7 @@ fn log_msg_to_proto( /// Converts an application-level `ArrowMsg` to its transport-level counterpart. #[tracing::instrument(level = "trace", skip_all)] -fn arrow_msg_to_proto( +fn arrow_msg_app_to_transport( arrow_msg: &re_log_types::ArrowMsg, store_id: re_log_types::StoreId, compression: crate::rrd::Compression, diff --git a/crates/store/re_log_encoding/tests/footer_roundtrip.rs b/crates/store/re_log_encoding/tests/footer_roundtrip.rs new file mode 100644 index 000000000000..41fb45996065 --- /dev/null +++ b/crates/store/re_log_encoding/tests/footer_roundtrip.rs @@ -0,0 +1,55 @@ +use re_chunk::RowId; +use re_log_encoding::{Decodable as _, Encoder, ToApplication as _}; +use re_log_types::{LogMsg, StoreId}; + +#[test] +fn footer_empty() { + fn generate_store_id() -> StoreId { + StoreId::recording("my_app", "my_empty_recording") + } + + fn generate_recording() -> impl Iterator { + let store_id = generate_store_id(); + + std::iter::once(LogMsg::SetStoreInfo(re_log_types::SetStoreInfo { + row_id: *RowId::ZERO, + info: re_log_types::StoreInfo { + store_id: store_id.clone(), + cloned_from: None, + store_source: re_log_types::StoreSource::Unknown, + store_version: Some(re_build_info::CrateVersion::new(1, 2, 3)), + is_partial: false, + }, + })) + } + + let msgs_encoded = Encoder::encode(generate_recording().map(Ok)).unwrap(); + + let stream_footer_start = msgs_encoded + .len() + .checked_sub(re_log_encoding::StreamFooter::ENCODED_SIZE_BYTES) + .unwrap(); + let stream_footer = + re_log_encoding::StreamFooter::from_rrd_bytes(&msgs_encoded[stream_footer_start..]) + .unwrap(); + + let rrd_footer_start = + stream_footer.rrd_footer_byte_offset_from_start_excluding_header as usize; + let rrd_footer_end = rrd_footer_start + .checked_add(stream_footer.rrd_footer_byte_size_excluding_header as usize) + .unwrap(); + let rrd_footer_bytes = &msgs_encoded[rrd_footer_start..rrd_footer_end]; + + { + let crc = re_log_encoding::StreamFooter::from_rrd_footer_bytes( + rrd_footer_start as u64, + rrd_footer_bytes, + ) + .crc_excluding_header; + similar_asserts::assert_eq!(stream_footer.crc_excluding_header, crc); + } + + let rrd_footer = + re_protos::log_msg::v1alpha1::RrdFooter::from_rrd_bytes(rrd_footer_bytes).unwrap(); + let _rrd_footer = rrd_footer.to_application(()).unwrap(); +} diff --git a/crates/store/re_protos/proto/rerun/v1alpha1/log_msg.proto b/crates/store/re_protos/proto/rerun/v1alpha1/log_msg.proto index 4201cbc5f846..f23a9572a611 100644 --- a/crates/store/re_protos/proto/rerun/v1alpha1/log_msg.proto +++ b/crates/store/re_protos/proto/rerun/v1alpha1/log_msg.proto @@ -227,3 +227,19 @@ message StoreVersion { // See `CrateVersion` in `re_build_info`. int32 crate_version_bits = 1; } + +// This is the message type that is passed in the footer of RRD streams. +// +// During normal operations, there can only be a single footer in an RRD stream. +// +// It is possible to break that invariant by concatenating streams using external tools, +// e.g. by doing something like `cat *.rrd > all_my_recordings.rrd`. +// Passing that stream back through Rerun tools, e.g. `cat *.rrd | rerun rrd route > all_my_recordings.rrd`, +// would once again guarantee that only one footer is present though. +// I.e. that invariant holds as long as one stays within our ecosystem of tools. +// +// It is transported using the `MessageKind::End` tag. +// +// This is a transport-level type, the associated application-level type can be found +// in `re_log_encoding::RrdFooter`. +message RrdFooter {} diff --git a/crates/store/re_protos/src/v1alpha1/rerun.log_msg.v1alpha1.rs b/crates/store/re_protos/src/v1alpha1/rerun.log_msg.v1alpha1.rs index eab236a7d738..0228793804d4 100644 --- a/crates/store/re_protos/src/v1alpha1/rerun.log_msg.v1alpha1.rs +++ b/crates/store/re_protos/src/v1alpha1/rerun.log_msg.v1alpha1.rs @@ -269,6 +269,32 @@ impl ::prost::Name for StoreVersion { "/rerun.log_msg.v1alpha1.StoreVersion".into() } } +/// This is the message type that is passed in the footer of RRD streams. +/// +/// During normal operations, there can only be a single footer in an RRD stream. +/// +/// It is possible to break that invariant by concatenating streams using external tools, +/// e.g. by doing something like `cat *.rrd > all_my_recordings.rrd`. +/// Passing that stream back through Rerun tools, e.g. `cat *.rrd | rerun rrd route > all_my_recordings.rrd`, +/// would once again guarantee that only one footer is present though. +/// I.e. that invariant holds as long as one stays within our ecosystem of tools. +/// +/// It is transported using the `MessageKind::End` tag. +/// +/// This is a transport-level type, the associated application-level type can be found +/// in `re_log_encoding::RrdFooter`. +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct RrdFooter {} +impl ::prost::Name for RrdFooter { + const NAME: &'static str = "RrdFooter"; + const PACKAGE: &'static str = "rerun.log_msg.v1alpha1"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.log_msg.v1alpha1.RrdFooter".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.log_msg.v1alpha1.RrdFooter".into() + } +} /// The type of compression used on the payload. #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] diff --git a/crates/store/re_server/src/store/layer.rs b/crates/store/re_server/src/store/layer.rs index 05d6fc063f85..24d954a49d78 100644 --- a/crates/store/re_server/src/store/layer.rs +++ b/crates/store/re_server/src/store/layer.rs @@ -1,8 +1,8 @@ +use std::collections::HashMap; + use arrow::array::RecordBatch; use arrow::datatypes::Schema; use arrow::error::ArrowError; -use sha2::Digest as _; -use std::collections::HashMap; use re_byte_size::SizeBytes as _; use re_chunk_store::ChunkStoreHandle; @@ -68,6 +68,7 @@ impl Layer { schema_ipc }; + use sha2::Digest as _; let mut hash = [0u8; 32]; let mut hasher = sha2::Sha256::new(); hasher.update(&partition_schema_ipc); diff --git a/rerun_py/rerun_sdk/rerun/recording_stream.py b/rerun_py/rerun_sdk/rerun/recording_stream.py index f726ff8a278a..a570b12dfbc3 100644 --- a/rerun_py/rerun_sdk/rerun/recording_stream.py +++ b/rerun_py/rerun_sdk/rerun/recording_stream.py @@ -486,6 +486,7 @@ def flush(self, *, timeout_sec: float = 1e38) -> None: """ bindings.flush(timeout_sec=timeout_sec, recording=self.to_native()) + # TODO(RR-3065): SDK should flush both IO and app-level logic when a recording gets GC'd def __del__(self) -> None: # type: ignore[no-untyped-def] recording = self.to_native() # TODO(jleibs): I'm 98% sure this flush is redundant, but removing it requires more thorough testing. From eb24b372c3f66573f8f4c64054b7e15364a4486b Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Tue, 2 Dec 2025 15:05:03 +0100 Subject: [PATCH 2/6] Apply suggestions from code review Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../re_log_encoding/src/rrd/decoder/state_machine.rs | 8 +++++--- crates/store/re_log_encoding/src/rrd/footer/instances.rs | 2 ++ 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/crates/store/re_log_encoding/src/rrd/decoder/state_machine.rs b/crates/store/re_log_encoding/src/rrd/decoder/state_machine.rs index fd535a2cc876..b4fcbd258858 100644 --- a/crates/store/re_log_encoding/src/rrd/decoder/state_machine.rs +++ b/crates/store/re_log_encoding/src/rrd/decoder/state_machine.rs @@ -86,7 +86,9 @@ pub enum DecoderState { /// /// The [`MessageHeader`] indicates what kind of payload this is and how large it is. /// - /// We will rebound indefinitely + /// After the [`StreamHeader`] is read once, the state machine will only ever switch between + /// [`Self::WaitingForMessageHeader`] and [`Self::WaitingForMessagePayload`], until either a + /// footer or another concatenated RRD stream shows up. WaitingForMessageHeader, /// A [`MessageHeader`] was parsed, now we're waiting for the associated payload. @@ -194,7 +196,7 @@ impl Decoder { } else { // A bunch of weird trailing bytes means we're now in an irrecoverable state, but // it doesn't change the fact that we've just successfully yielded an entire stream's - // worth of data. So, instead of bubling up errors all the way to the end user, + // worth of data. So, instead of bubbling up errors all the way to the end user, // merely log one here stop the state machine forever. re_log::error!( is_first_header, @@ -367,7 +369,7 @@ impl Decoder { Err(err) => { // A corrupt footer means we're now in an irrecoverable state, but it doesn't change the // fact that we've just successfully yielded an entire stream's worth of data. So, instead - // of bubling up errors all the way to the end user, merely log one here stop the state + // of bubbling up errors all the way to the end user, merely log one here stop the state // machine forever. re_log::error!( position = self.byte_chunks.num_read(), diff --git a/crates/store/re_log_encoding/src/rrd/footer/instances.rs b/crates/store/re_log_encoding/src/rrd/footer/instances.rs index 576e3707fa0c..8302b5ddecfe 100644 --- a/crates/store/re_log_encoding/src/rrd/footer/instances.rs +++ b/crates/store/re_log_encoding/src/rrd/footer/instances.rs @@ -1,5 +1,7 @@ /// This is the message type that is passed in the footer of RRD streams. /// +/// During normal operations, there can only be a single [`RrdFooter`] per RRD stream. +/// /// It is possible to break that invariant by concatenating streams using external tools, /// e.g. by doing something like `cat *.rrd > all_my_recordings.rrd`. /// Passing that stream back through Rerun tools, e.g. `cat *.rrd | rerun rrd route > all_my_recordings.rrd`, From 5f9f8a62e9f18f785f6e8c4503295b688ad9e8af Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Tue, 2 Dec 2025 15:56:22 +0100 Subject: [PATCH 3/6] Apply suggestions from code review --- crates/store/re_log_encoding/src/rrd/decoder/state_machine.rs | 2 +- crates/store/re_log_encoding/src/rrd/encoder.rs | 2 +- crates/store/re_log_encoding/src/rrd/frames.rs | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/store/re_log_encoding/src/rrd/decoder/state_machine.rs b/crates/store/re_log_encoding/src/rrd/decoder/state_machine.rs index b4fcbd258858..86162ef7832b 100644 --- a/crates/store/re_log_encoding/src/rrd/decoder/state_machine.rs +++ b/crates/store/re_log_encoding/src/rrd/decoder/state_machine.rs @@ -713,7 +713,7 @@ mod tests { fn stream_irregular_chunks_protobuf() { // this attempts to stress-test `try_read` with byte chunks of various sizes - let (input, data) = test_data(EncodingOptions::PROTOBUF_COMPRESSED, 6); + let (input, data) = test_data(EncodingOptions::PROTOBUF_COMPRESSED, 16); let mut data = Cursor::new(data); let mut decoder = DecoderApp::new(); diff --git a/crates/store/re_log_encoding/src/rrd/encoder.rs b/crates/store/re_log_encoding/src/rrd/encoder.rs index c63bf1f978d2..541ce07d6faf 100644 --- a/crates/store/re_log_encoding/src/rrd/encoder.rs +++ b/crates/store/re_log_encoding/src/rrd/encoder.rs @@ -80,7 +80,7 @@ pub struct Encoder { /// * Because `prost` only supports buffers, not IO traits. scratch: Vec, - /// Tracks the state required to build the RRD manifest for this stream. + /// Tracks the state required to build the RRD footer for this stream. /// /// If set to `None`, the footer will not be computed. /// diff --git a/crates/store/re_log_encoding/src/rrd/frames.rs b/crates/store/re_log_encoding/src/rrd/frames.rs index 68a2692f26a9..af4af4f47b96 100644 --- a/crates/store/re_log_encoding/src/rrd/frames.rs +++ b/crates/store/re_log_encoding/src/rrd/frames.rs @@ -96,7 +96,7 @@ impl Decodable for EncodingOptions { /// /// It is possible to break that invariant by concatenating streams using external tools, /// e.g. by doing something like `cat *.rrd > all_my_recordings.rrd`. -/// Passing that stream back through Rerun tools, e.g. `cat *.rrd | rerun rrd route > all_my_recordings.rrd`, +/// Passing that stream back through Rerun tools, e.g. `cat *.rrd | rerun rrd merge > all_my_recordings.rrd`, /// would once again guarantee that only one header is present though. /// I.e. that invariant holds as long as one stays within our ecosystem of tools. #[derive(Debug, Clone, Copy)] @@ -214,7 +214,7 @@ impl Decodable for StreamHeader { /// /// It is possible to break that invariant by concatenating streams using external tools, /// e.g. by doing something like `cat *.rrd > all_my_recordings.rrd`. -/// Passing that stream back through Rerun tools, e.g. `cat *.rrd | rerun rrd route > all_my_recordings.rrd`, +/// Passing that stream back through Rerun tools, e.g. `cat *.rrd | rerun rrd merge > all_my_recordings.rrd`, /// would once again guarantee that only one footer is present though. /// I.e. that invariant holds as long as one stays within our ecosystem of tools. #[derive(Debug, Clone, Copy, PartialEq, Eq)] From 2a710f4768f2c3f91fec8e5d4db5a3a2a139e5fa Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Tue, 2 Dec 2025 16:13:17 +0100 Subject: [PATCH 4/6] review --- .../src/rrd/decoder/state_machine.rs | 21 ++++++++++--------- .../src/rrd/footer/instances.rs | 12 +++++------ .../store/re_log_encoding/src/rrd/frames.rs | 12 +++++------ .../proto/rerun/v1alpha1/log_msg.proto | 12 +++++------ .../src/v1alpha1/rerun.log_msg.v1alpha1.rs | 12 +++++------ 5 files changed, 35 insertions(+), 34 deletions(-) diff --git a/crates/store/re_log_encoding/src/rrd/decoder/state_machine.rs b/crates/store/re_log_encoding/src/rrd/decoder/state_machine.rs index 86162ef7832b..8fcad35ff1bb 100644 --- a/crates/store/re_log_encoding/src/rrd/decoder/state_machine.rs +++ b/crates/store/re_log_encoding/src/rrd/decoder/state_machine.rs @@ -340,20 +340,21 @@ impl Decoder { let StreamFooter { fourcc: _, identifier: _, - rrd_footer_byte_offset_from_start_excluding_header: - rrd_manifest_byte_offset_from_start_excluding_header, - rrd_footer_byte_size_excluding_header: - rrd_manifest_byte_size_excluding_header, + rrd_footer_byte_offset_from_start_excluding_header, + rrd_footer_byte_size_excluding_header, crc_excluding_header: _, } = footer; - let start = rrd_manifest_byte_offset_from_start_excluding_header; - let end = start + rrd_manifest_byte_size_excluding_header; + let rrd_footer_start = + rrd_footer_byte_offset_from_start_excluding_header; + let rrd_footer_end = + rrd_footer_start + rrd_footer_byte_size_excluding_header; - if end > position as u64 { - // The RRD manifest cannot possibly end after the footer starts. + if rrd_footer_end > position as u64 { + // The RRD footer cannot possibly end after the stream footer starts, since it must + // be part of an ::End message. re_log::error!( - position = self.byte_chunks.num_read(), + position, bytes = ?bytes, ?footer, err = "offsets are invalid", @@ -372,7 +373,7 @@ impl Decoder { // of bubbling up errors all the way to the end user, merely log one here stop the state // machine forever. re_log::error!( - position = self.byte_chunks.num_read(), + position, bytes = ?bytes, %err, "corrupt footer in rrd stream" diff --git a/crates/store/re_log_encoding/src/rrd/footer/instances.rs b/crates/store/re_log_encoding/src/rrd/footer/instances.rs index 8302b5ddecfe..ef13112b2624 100644 --- a/crates/store/re_log_encoding/src/rrd/footer/instances.rs +++ b/crates/store/re_log_encoding/src/rrd/footer/instances.rs @@ -1,15 +1,15 @@ -/// This is the message type that is passed in the footer of RRD streams. +/// This is the payload that is carried in messages of type `::End` in RRD streams. /// -/// During normal operations, there can only be a single [`RrdFooter`] per RRD stream. +/// It keeps track of various useful information about the associated recording. /// +/// During normal operations, there can only be a single `::End` message in an RRD stream, and +/// therefore a single `RrdFooter`. /// It is possible to break that invariant by concatenating streams using external tools, /// e.g. by doing something like `cat *.rrd > all_my_recordings.rrd`. -/// Passing that stream back through Rerun tools, e.g. `cat *.rrd | rerun rrd route > all_my_recordings.rrd`, -/// would once again guarantee that only one footer is present though. +/// Passing that stream back through Rerun tools, e.g. `cat *.rrd | rerun rrd merge > all_my_recordings.rrd`, +/// would once again guarantee that only one `::End` message is present though. /// I.e. that invariant holds as long as one stays within our ecosystem of tools. /// -/// It is transported using the `MessageKind::End` tag. -/// /// This is an application-level type, the associated transport-level type can be found /// over at [`re_protos::log_msg::v1alpha1::RrdFooter`]. #[derive(Default, Debug)] diff --git a/crates/store/re_log_encoding/src/rrd/frames.rs b/crates/store/re_log_encoding/src/rrd/frames.rs index af4af4f47b96..7e6954bca8e5 100644 --- a/crates/store/re_log_encoding/src/rrd/frames.rs +++ b/crates/store/re_log_encoding/src/rrd/frames.rs @@ -90,14 +90,13 @@ impl Decodable for EncodingOptions { // --- -/// The first frame in an RRD stream. +/// The opening frame in an RRD stream. /// /// During normal operations, there can only be a single [`StreamHeader`] per RRD stream. -/// /// It is possible to break that invariant by concatenating streams using external tools, /// e.g. by doing something like `cat *.rrd > all_my_recordings.rrd`. /// Passing that stream back through Rerun tools, e.g. `cat *.rrd | rerun rrd merge > all_my_recordings.rrd`, -/// would once again guarantee that only one header is present though. +/// would once again guarantee that only one stream header is present though. /// I.e. that invariant holds as long as one stays within our ecosystem of tools. #[derive(Debug, Clone, Copy)] pub struct StreamHeader { @@ -208,15 +207,16 @@ impl Decodable for StreamHeader { // --- -/// The last frame in an RRD stream. +/// The closing frame in an RRD stream. Keeps track of where the [`RrdFooter`] can be found. /// /// During normal operations, there can only be a single [`StreamFooter`] per RRD stream. -/// /// It is possible to break that invariant by concatenating streams using external tools, /// e.g. by doing something like `cat *.rrd > all_my_recordings.rrd`. /// Passing that stream back through Rerun tools, e.g. `cat *.rrd | rerun rrd merge > all_my_recordings.rrd`, -/// would once again guarantee that only one footer is present though. +/// would once again guarantee that only one stream footer is present though. /// I.e. that invariant holds as long as one stays within our ecosystem of tools. +/// +/// [`RrdFooter`]: [crate::RrdFooter] #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct StreamFooter { /// Same as the one in the [`StreamHeader`], i.e. [`crate::rrd::RRD_FOURCC`]. diff --git a/crates/store/re_protos/proto/rerun/v1alpha1/log_msg.proto b/crates/store/re_protos/proto/rerun/v1alpha1/log_msg.proto index f23a9572a611..4850f5058129 100644 --- a/crates/store/re_protos/proto/rerun/v1alpha1/log_msg.proto +++ b/crates/store/re_protos/proto/rerun/v1alpha1/log_msg.proto @@ -228,18 +228,18 @@ message StoreVersion { int32 crate_version_bits = 1; } -// This is the message type that is passed in the footer of RRD streams. +// This is the payload that is carried in messages of type `::End` in RRD streams. // -// During normal operations, there can only be a single footer in an RRD stream. +// It keeps track of various useful information about the associated recording. // +// During normal operations, there can only be a single `::End` message in an RRD stream, and +// therefore a single `RrdFooter`. // It is possible to break that invariant by concatenating streams using external tools, // e.g. by doing something like `cat *.rrd > all_my_recordings.rrd`. -// Passing that stream back through Rerun tools, e.g. `cat *.rrd | rerun rrd route > all_my_recordings.rrd`, -// would once again guarantee that only one footer is present though. +// Passing that stream back through Rerun tools, e.g. `cat *.rrd | rerun rrd merge > all_my_recordings.rrd`, +// would once again guarantee that only one `::End` message is present though. // I.e. that invariant holds as long as one stays within our ecosystem of tools. // -// It is transported using the `MessageKind::End` tag. -// // This is a transport-level type, the associated application-level type can be found // in `re_log_encoding::RrdFooter`. message RrdFooter {} diff --git a/crates/store/re_protos/src/v1alpha1/rerun.log_msg.v1alpha1.rs b/crates/store/re_protos/src/v1alpha1/rerun.log_msg.v1alpha1.rs index 0228793804d4..2a44d4dc9038 100644 --- a/crates/store/re_protos/src/v1alpha1/rerun.log_msg.v1alpha1.rs +++ b/crates/store/re_protos/src/v1alpha1/rerun.log_msg.v1alpha1.rs @@ -269,18 +269,18 @@ impl ::prost::Name for StoreVersion { "/rerun.log_msg.v1alpha1.StoreVersion".into() } } -/// This is the message type that is passed in the footer of RRD streams. +/// This is the payload that is carried in messages of type `::End` in RRD streams. /// -/// During normal operations, there can only be a single footer in an RRD stream. +/// It keeps track of various useful information about the associated recording. /// +/// During normal operations, there can only be a single `::End` message in an RRD stream, and +/// therefore a single `RrdFooter`. /// It is possible to break that invariant by concatenating streams using external tools, /// e.g. by doing something like `cat *.rrd > all_my_recordings.rrd`. -/// Passing that stream back through Rerun tools, e.g. `cat *.rrd | rerun rrd route > all_my_recordings.rrd`, -/// would once again guarantee that only one footer is present though. +/// Passing that stream back through Rerun tools, e.g. `cat *.rrd | rerun rrd merge > all_my_recordings.rrd`, +/// would once again guarantee that only one `::End` message is present though. /// I.e. that invariant holds as long as one stays within our ecosystem of tools. /// -/// It is transported using the `MessageKind::End` tag. -/// /// This is a transport-level type, the associated application-level type can be found /// in `re_log_encoding::RrdFooter`. #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] From 833396c18c9bdeacadcdc30b98b6e63b64aaa164 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Wed, 3 Dec 2025 09:35:07 +0100 Subject: [PATCH 5/6] document the checksum algorithm used --- crates/store/re_log_encoding/src/rrd/frames.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/crates/store/re_log_encoding/src/rrd/frames.rs b/crates/store/re_log_encoding/src/rrd/frames.rs index 7e6954bca8e5..3e2374696c18 100644 --- a/crates/store/re_log_encoding/src/rrd/frames.rs +++ b/crates/store/re_log_encoding/src/rrd/frames.rs @@ -259,11 +259,18 @@ pub struct StreamFooter { /// to make sure that we didn't just get "lucky" (or unlucky, rather) when jumping around and /// parsing random bytes. /// + /// For now, the checksum algorithm is hardcoded to `xxh32`, the 32bit variant of the + /// [`xxhash` family of hashing algorithms](https://xxhash.com/). + /// This is fast, HW-accelerated, non-cryptographic hash that is perfect when needing to hash + /// RRD footers, which can potentially get very, very large. + /// /// [`RrdFooter`]: [crate::RrdFooter] // // TODO(cmc): It shouldn't be the job of the StreamFooter to carry checksums for a specific // message's payload. All frames should have identifiers and CRCs for both themselves and their // payloads, in which case this CRC would belong in the MessageHeader. + // TODO(cmc): In a potential future RRF3, we might make the choice of checksum algorithm + // configurable via flag. pub crc_excluding_header: u32, } From f11d7c3d94774588243a344268c10b5ab4e29b59 Mon Sep 17 00:00:00 2001 From: Clement Rey Date: Wed, 3 Dec 2025 09:54:57 +0100 Subject: [PATCH 6/6] use more Spans --- Cargo.lock | 1 + crates/store/re_log_encoding/Cargo.toml | 1 + .../src/rrd/decoder/state_machine.rs | 7 +-- .../store/re_log_encoding/src/rrd/encoder.rs | 14 ++--- .../store/re_log_encoding/src/rrd/frames.rs | 62 +++++++++---------- .../re_log_encoding/tests/footer_roundtrip.rs | 16 ++--- 6 files changed, 50 insertions(+), 51 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 010780e3fc24..9dfb5db8fbf6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9083,6 +9083,7 @@ dependencies = [ "re_protos", "re_smart_channel", "re_sorbet", + "re_span", "re_tracing", "re_types", "similar-asserts", diff --git a/crates/store/re_log_encoding/Cargo.toml b/crates/store/re_log_encoding/Cargo.toml index 904cb7d245bc..f8072c390f7f 100644 --- a/crates/store/re_log_encoding/Cargo.toml +++ b/crates/store/re_log_encoding/Cargo.toml @@ -49,6 +49,7 @@ re_log.workspace = true re_protos.workspace = true re_smart_channel.workspace = true re_sorbet.workspace = true +re_span.workspace = true re_tracing.workspace = true # External: diff --git a/crates/store/re_log_encoding/src/rrd/decoder/state_machine.rs b/crates/store/re_log_encoding/src/rrd/decoder/state_machine.rs index 8fcad35ff1bb..058f92157927 100644 --- a/crates/store/re_log_encoding/src/rrd/decoder/state_machine.rs +++ b/crates/store/re_log_encoding/src/rrd/decoder/state_machine.rs @@ -340,15 +340,12 @@ impl Decoder { let StreamFooter { fourcc: _, identifier: _, - rrd_footer_byte_offset_from_start_excluding_header, - rrd_footer_byte_size_excluding_header, + rrd_footer_byte_span_from_start_excluding_header, crc_excluding_header: _, } = footer; - let rrd_footer_start = - rrd_footer_byte_offset_from_start_excluding_header; let rrd_footer_end = - rrd_footer_start + rrd_footer_byte_size_excluding_header; + rrd_footer_byte_span_from_start_excluding_header.end(); if rrd_footer_end > position as u64 { // The RRD footer cannot possibly end after the stream footer starts, since it must diff --git a/crates/store/re_log_encoding/src/rrd/encoder.rs b/crates/store/re_log_encoding/src/rrd/encoder.rs index 541ce07d6faf..dad0b973a346 100644 --- a/crates/store/re_log_encoding/src/rrd/encoder.rs +++ b/crates/store/re_log_encoding/src/rrd/encoder.rs @@ -109,8 +109,7 @@ impl FooterState { #[expect(clippy::unnecessary_wraps)] // won't stay for long fn append( &mut self, - _byte_offset: u64, - _byte_size: u64, + _byte_span_excluding_header: re_span::Span, msg: &re_log_types::LogMsg, ) -> Result<(), EncodeError> { match msg { @@ -224,12 +223,13 @@ impl Encoder { let byte_size_excluding_header = n - crate::MessageHeader::ENCODED_SIZE_BYTES as u64; + let byte_span_excluding_header = re_span::Span { + start: byte_offset_excluding_header, + len: byte_size_excluding_header, + }; + if let Some(footer_state) = self.footer_state.as_mut() { - footer_state.append( - byte_offset_excluding_header, - byte_size_excluding_header, - message, - )?; + footer_state.append(byte_span_excluding_header, message)?; } Ok(n) diff --git a/crates/store/re_log_encoding/src/rrd/frames.rs b/crates/store/re_log_encoding/src/rrd/frames.rs index 3e2374696c18..b62bcf651c51 100644 --- a/crates/store/re_log_encoding/src/rrd/frames.rs +++ b/crates/store/re_log_encoding/src/rrd/frames.rs @@ -229,29 +229,20 @@ pub struct StreamFooter { /// Always set to [`Self::RRD_IDENTIFIER`]. pub identifier: [u8; 4], // FOOT - /// The position in bytes where the serialized [`RrdFooter`] payload starts, excluding the - /// message header. + /// The span in bytes where the serialized [`RrdFooter`] payload starts end ends, excluding + /// the message header. /// /// I.e. a transport-level [`RrdFooter`] can be decoded from the bytes at (pseudo-code): /// ```text - /// let start = stream_footer.rrd_footer_byte_offset_from_start_excluding_header; - /// let end = start + stream_footer.rrd_footer_byte_size_excluding_header; + /// let start = stream_footer.rrd_footer_byte_span_from_start_excluding_header.start; + /// let end = stream_footer.rrd_footer_byte_span_from_start_excluding_header.end(); /// let bytes = &file[start..end]; /// let rrd_footer = re_protos::RrdFooter::decode(bytes)?; /// let rrd_footer = rrd_footer.to_application()?; /// ``` /// /// [`RrdFooter`]: [crate::RrdFooter] - pub rrd_footer_byte_offset_from_start_excluding_header: u64, - - /// The size in bytes of the serialized [`RrdFooter`] payload, excluding the message header. - /// - /// This is guaranteed to be the same value as the `len` found in the associated message - /// header, but duplicating it here makes it possible for decoders to get everything they - /// need using a single IO. - /// - /// [`RrdFooter`]: [crate::RrdFooter] - pub rrd_footer_byte_size_excluding_header: u64, + pub rrd_footer_byte_span_from_start_excluding_header: re_span::Span, /// Checksum for the [`RrdFooter`] payload. /// @@ -280,15 +271,13 @@ impl StreamFooter { pub const RRD_IDENTIFIER: [u8; 4] = *b"FOOT"; pub fn new( - rrd_footer_byte_offset_from_start_excluding_header: u64, - rrd_footer_byte_size_excluding_header: u64, + rrd_footer_byte_span_from_start_excluding_header: re_span::Span, crc_excluding_header: u32, ) -> Self { Self { fourcc: crate::RRD_FOURCC, identifier: Self::RRD_IDENTIFIER, - rrd_footer_byte_offset_from_start_excluding_header, - rrd_footer_byte_size_excluding_header, + rrd_footer_byte_span_from_start_excluding_header, crc_excluding_header, } } @@ -298,11 +287,14 @@ impl StreamFooter { rrd_footer_bytes: &[u8], ) -> Self { let crc_excluding_header = xxhash_rust::xxh32::xxh32(rrd_footer_bytes, Self::CRC_SEED); + let rrd_footer_byte_span_from_start_excluding_header = re_span::Span { + start: rrd_footer_byte_offset_from_start_excluding_header, + len: rrd_footer_bytes.len() as u64, + }; Self { fourcc: crate::RRD_FOURCC, identifier: Self::RRD_IDENTIFIER, - rrd_footer_byte_offset_from_start_excluding_header, - rrd_footer_byte_size_excluding_header: rrd_footer_bytes.len() as u64, + rrd_footer_byte_span_from_start_excluding_header, crc_excluding_header, } } @@ -313,18 +305,25 @@ impl Encodable for StreamFooter { let Self { fourcc, identifier, - rrd_footer_byte_offset_from_start_excluding_header, - rrd_footer_byte_size_excluding_header, - crc_excluding_header: crc, + rrd_footer_byte_span_from_start_excluding_header, + crc_excluding_header, } = self; let before = out.len() as u64; out.extend_from_slice(fourcc); out.extend_from_slice(identifier); - out.extend_from_slice(&rrd_footer_byte_offset_from_start_excluding_header.to_le_bytes()); - out.extend_from_slice(&rrd_footer_byte_size_excluding_header.to_le_bytes()); - out.extend_from_slice(&crc.to_le_bytes()); + out.extend_from_slice( + &rrd_footer_byte_span_from_start_excluding_header + .start + .to_le_bytes(), + ); + out.extend_from_slice( + &rrd_footer_byte_span_from_start_excluding_header + .len + .to_le_bytes(), + ); + out.extend_from_slice(&crc_excluding_header.to_le_bytes()); let n = out.len() as u64 - before; assert_eq!(Self::ENCODED_SIZE_BYTES as u64, n); @@ -363,17 +362,16 @@ impl Decodable for StreamFooter { ))); } - let rrd_footer_byte_offset_from_start_excluding_header = - u64::from_le_bytes(data[8..16].try_into().expect("cannot fail, checked above")); - let rrd_footer_byte_size_excluding_header = - u64::from_le_bytes(data[16..24].try_into().expect("cannot fail, checked above")); + let rrd_footer_byte_span_from_start_excluding_header = re_span::Span { + start: u64::from_le_bytes(data[8..16].try_into().expect("cannot fail, checked above")), + len: u64::from_le_bytes(data[16..24].try_into().expect("cannot fail, checked above")), + }; let crc = u32::from_le_bytes(data[24..28].try_into().expect("cannot fail, checked above")); Ok(Self { fourcc, identifier, - rrd_footer_byte_offset_from_start_excluding_header, - rrd_footer_byte_size_excluding_header, + rrd_footer_byte_span_from_start_excluding_header, crc_excluding_header: crc, }) } diff --git a/crates/store/re_log_encoding/tests/footer_roundtrip.rs b/crates/store/re_log_encoding/tests/footer_roundtrip.rs index 41fb45996065..17048b1f1901 100644 --- a/crates/store/re_log_encoding/tests/footer_roundtrip.rs +++ b/crates/store/re_log_encoding/tests/footer_roundtrip.rs @@ -33,16 +33,18 @@ fn footer_empty() { re_log_encoding::StreamFooter::from_rrd_bytes(&msgs_encoded[stream_footer_start..]) .unwrap(); - let rrd_footer_start = - stream_footer.rrd_footer_byte_offset_from_start_excluding_header as usize; - let rrd_footer_end = rrd_footer_start - .checked_add(stream_footer.rrd_footer_byte_size_excluding_header as usize) - .unwrap(); - let rrd_footer_bytes = &msgs_encoded[rrd_footer_start..rrd_footer_end]; + let rrd_footer_range = stream_footer + .rrd_footer_byte_span_from_start_excluding_header + .try_cast::() + .unwrap() + .range(); + let rrd_footer_bytes = &msgs_encoded[rrd_footer_range]; { let crc = re_log_encoding::StreamFooter::from_rrd_footer_bytes( - rrd_footer_start as u64, + stream_footer + .rrd_footer_byte_span_from_start_excluding_header + .start, rrd_footer_bytes, ) .crc_excluding_header;