diff --git a/Cargo.lock b/Cargo.lock index 6cce5834d5ed..9dfb5db8fbf6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9071,6 +9071,7 @@ dependencies = [ "bytes", "criterion", "ehttp", + "itertools 0.14.0", "js-sys", "lz4_flex 0.12.0", "mimalloc", @@ -9082,6 +9083,7 @@ dependencies = [ "re_protos", "re_smart_channel", "re_sorbet", + "re_span", "re_tracing", "re_types", "similar-asserts", @@ -9093,6 +9095,7 @@ dependencies = [ "wasm-bindgen-futures", "web-sys", "web-time", + "xxhash-rust", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index ccf133a1bd40..1100b3eb0fa7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -429,6 +429,7 @@ wgpu = { version = "27.0.1", default-features = false, features = [ "fragile-send-sync-non-atomic-wasm", ] } xshell = "0.2.7" +xxhash-rust = { version = "0.8", features = ["xxh32"] } # --------------------------------------------------------------------------------- [profile] diff --git a/crates/store/re_log_encoding/Cargo.toml b/crates/store/re_log_encoding/Cargo.toml index de13fa1aefdb..f8072c390f7f 100644 --- a/crates/store/re_log_encoding/Cargo.toml +++ b/crates/store/re_log_encoding/Cargo.toml @@ -49,14 +49,17 @@ re_log.workspace = true re_protos.workspace = true re_smart_channel.workspace = true re_sorbet.workspace = true +re_span.workspace = true re_tracing.workspace = true # External: arrow = { workspace = true, features = ["ipc"] } +itertools.workspace = true lz4_flex = { workspace = true } parking_lot.workspace = true thiserror.workspace = true tracing.workspace = true +xxhash-rust.workspace = true # Optional external dependencies: bytes = { workspace = true, optional = true } diff --git a/crates/store/re_log_encoding/src/rrd/decoder/iterator.rs b/crates/store/re_log_encoding/src/rrd/decoder/iterator.rs index 170e57cbb1bd..f1b3cc1f8b70 100644 --- a/crates/store/re_log_encoding/src/rrd/decoder/iterator.rs +++ b/crates/store/re_log_encoding/src/rrd/decoder/iterator.rs @@ -139,7 +139,7 @@ impl std::iter::Iterator for DecoderI // …and the underlying decoder already considers that it's done (i.e. it's // waiting for a whole new stream to begin): time to stop. - Ok(None) if self.decoder.state == DecoderState::StreamHeader => { + Ok(None) if self.decoder.state == DecoderState::WaitingForStreamHeader => { return None; } diff --git a/crates/store/re_log_encoding/src/rrd/decoder/state_machine.rs b/crates/store/re_log_encoding/src/rrd/decoder/state_machine.rs index 436198ad6f4d..058f92157927 100644 --- a/crates/store/re_log_encoding/src/rrd/decoder/state_machine.rs +++ b/crates/store/re_log_encoding/src/rrd/decoder/state_machine.rs @@ -2,12 +2,14 @@ use std::collections::VecDeque; use std::io::Cursor; use std::io::Read as _; +use itertools::Itertools as _; use re_build_info::CrateVersion; -use crate::CachingApplicationIdInjector; -use crate::rrd::{ - CodecError, Decodable as _, DecodeError, DecoderEntrypoint, EncodingOptions, Serializer, - StreamHeader, +use crate::StreamFooter; +use crate::rrd::MessageHeader; +use crate::{ + CachingApplicationIdInjector, CodecError, Decodable as _, DecodeError, DecoderEntrypoint, + EncodingOptions, Serializer, StreamHeader, }; // --- @@ -54,39 +56,57 @@ pub struct Decoder { /// The application id cache used for migrating old data. pub(crate) app_id_cache: CachingApplicationIdInjector, - pub(crate) _decodable: std::marker::PhantomData, + _decodable: std::marker::PhantomData, } -/// /// ```text,ignore -/// StreamHeader -/// | -/// v -/// MessageHeader -/// ^ | -/// | | -/// ---Message<-- +/// +--> WaitingForStreamHeader -----> Aborted +/// | | +/// | v +/// +--- WaitingForMessageHeader <---+ +/// | | | +/// | v | +/// | WaitingForMessagePayload ---+ +/// | | +/// | v +/// +--- WaitingForStreamFooter /// ``` #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum DecoderState { - /// The beginning of the stream. + /// The beginning of a stream. /// - /// The stream header contains the magic bytes (e.g. `RRF2`), the encoded version, and the + /// The [`StreamHeader`] contains the magic bytes (e.g. `RRF2`), the encoded version, and the /// encoding options. /// - /// After the stream header is read once, the state machine will only ever switch between - /// `MessageHeader` and `Message` - StreamHeader, + /// We can come back to this state at any point because multiple RRD streams might have been + /// concatenated together (e.g. `cat *.rrd | rerun`). + WaitingForStreamHeader, - /// The beginning of a Protobuf message. - MessageHeader, + /// Stream in progress. + /// + /// The [`MessageHeader`] indicates what kind of payload this is and how large it is. + /// + /// After the [`StreamHeader`] is read once, the state machine will only ever switch between + /// [`Self::WaitingForMessageHeader`] and [`Self::WaitingForMessagePayload`], until either a + /// footer or another concatenated RRD stream shows up. + WaitingForMessageHeader, - /// The message content, serialized using `Protobuf`. + /// A [`MessageHeader`] was parsed, now we're waiting for the associated payload. /// /// Compression is only applied to individual `ArrowMsg`s, instead of the entire stream. - Message(crate::rrd::MessageHeader), + WaitingForMessagePayload(MessageHeader), - /// Stop reading. + /// We hit a message of kind `MessageKind::End`, which means a footer must be following it. + /// + /// The [`StreamFooter`] contains information about where the RRD manifests can be found, but we + /// won't be doing anything with it in this case since we're just going through the data in order. + /// + /// Once the footer is parsed, we can wait for a new stream to begin. + WaitingForStreamFooter, + + /// The stream entered an irrecoverable state and cannot yield data anymore. However, most of the + /// valuable data was already decoded, so we merely log an error and stop yielding more + /// messages rather than bubbling up the error all the way to the end user. Aborted, } @@ -98,7 +118,7 @@ impl Decoder { // Note: `options` are filled in once we read `FileHeader`, so this value does not matter. options: EncodingOptions::PROTOBUF_UNCOMPRESSED, byte_chunks: ByteChunkBuffer::new(), - state: DecoderState::StreamHeader, + state: DecoderState::WaitingForStreamHeader, app_id_cache: CachingApplicationIdInjector::default(), _decodable: std::marker::PhantomData::, } @@ -121,7 +141,7 @@ impl Decoder { })) = result { re_log::warn_once!( - "Dropping message without application id which arrived before `SetStoreInfo` \ + "dropping message without application id which arrived before `SetStoreInfo` \ (kind: {store_kind}, recording id: {recording_id}." ); } else { @@ -132,29 +152,56 @@ impl Decoder { /// Read the next message in the stream. fn try_read_impl(&mut self) -> Result, DecodeError> { + // Enable this for easy debugging of the state machine. + if false { + use bytes::Buf as _; + let num_bytes = self + .byte_chunks + .queue + .iter() + .map(|v| v.remaining()) + .sum::(); + + eprintln!("state: {:?} (bytes available: {num_bytes})", self.state); + + let mut peeked = [0u8; 32]; + self.byte_chunks.try_peek(&mut peeked); + let peeked = peeked + .into_iter() + .map(|b| match b { + b'a'..=b'z' | b'A'..=b'Z' | b'0'..=b'9' => String::from(b as char), + v => v.to_string(), + }) + .join(", "); + + eprintln!("upcoming 32 bytes: [{peeked}]"); + } + match self.state { - DecoderState::StreamHeader => { + DecoderState::WaitingForStreamHeader => { let is_first_header = self.byte_chunks.num_read() == 0; let position = self.byte_chunks.num_read(); if let Some(header_data) = self.byte_chunks.try_read(StreamHeader::ENCODED_SIZE_BYTES) { - re_log::trace!(?header_data, "Decoding StreamHeader"); - // header contains version and compression options let version_and_options = StreamHeader::from_rrd_bytes(&header_data) .and_then(|h| h.to_version_and_options()); let (version, options) = match version_and_options { Ok(ok) => ok, Err(err) => { - // We expected a header, but didn't find one! if is_first_header { + // We expected a header, but didn't find one! return Err(err.into()); } else { + // A bunch of weird trailing bytes means we're now in an irrecoverable state, but + // it doesn't change the fact that we've just successfully yielded an entire stream's + // worth of data. So, instead of bubbling up errors all the way to the end user, + // merely log one here stop the state machine forever. re_log::error!( is_first_header, position, - "Trailing bytes in rrd stream: {header_data:?} ({err})" + "trailing bytes in rrd stream: {header_data:?} ({err})" ); self.state = DecoderState::Aborted; return Ok(None); @@ -165,34 +212,38 @@ impl Decoder { re_log::trace!( version = version.to_string(), ?options, - "Found Stream Header" + "found StreamHeader" ); self.version = Some(version); self.options = options; match self.options.serializer { - Serializer::Protobuf => self.state = DecoderState::MessageHeader, + Serializer::Protobuf => self.state = DecoderState::WaitingForMessageHeader, } // we might have data left in the current byte chunk, immediately try to read // length of the next message. return self.try_read(); } + + // Not enough data yet -- wait to be fed and called back once again. } - DecoderState::MessageHeader => { - let mut peeked = [0u8; crate::rrd::MessageHeader::ENCODED_SIZE_BYTES]; + DecoderState::WaitingForMessageHeader => { + let mut peeked = [0u8; MessageHeader::ENCODED_SIZE_BYTES]; if self.byte_chunks.try_peek(&mut peeked) == peeked.len() { - let header = match crate::rrd::MessageHeader::from_rrd_bytes(&peeked) { + let header = match MessageHeader::from_rrd_bytes(&peeked) { Ok(header) => header, - Err(crate::rrd::CodecError::HeaderDecoding(_)) => { - // We failed to decode a `MessageHeader`: it might be because the - // stream is corrupt, or it might be because it just switched to a - // different, concatenated recording without having the courtesy of - // announcing it via an EOS marker. - self.state = DecoderState::StreamHeader; + Err(crate::rrd::CodecError::FrameDecoding(_)) => { + // We failed to decode a `MessageHeader`: it might be because the stream is corrupt, + // or it might be because it just switched to a different, concatenated recording + // without having the courtesy of announcing it via an EOS marker. + // + // TODO(cmc): These kinds of peeking shenanigans should never be necessary, need to + // write a proposal for RRF3 that addresses these issues and more. + self.state = DecoderState::WaitingForStreamHeader; return self.try_read(); } @@ -200,68 +251,143 @@ impl Decoder { }; self.byte_chunks - .try_read(crate::rrd::MessageHeader::ENCODED_SIZE_BYTES) + .try_read(MessageHeader::ENCODED_SIZE_BYTES) .expect("reading cannot fail if peeking worked"); - re_log::trace!(?header, "MessageHeader"); + re_log::trace!(?header, "found MessageHeader"); - self.state = DecoderState::Message(header); - // we might have data left in the current byte chunk, immediately try to read - // the message content. + self.state = DecoderState::WaitingForMessagePayload(header); return self.try_read(); } + + // Not enough data yet -- wait to be fed and called back once again. } - DecoderState::Message(header) => { + DecoderState::WaitingForMessagePayload(header) => { let start_offset = self.byte_chunks.num_read() as u64; if let Some(bytes) = self.byte_chunks.try_read(header.len as usize) { - re_log::trace!(?header, "Read message"); - let bytes_len = bytes.len() as u64; let byte_span = re_chunk::Span { start: start_offset, len: bytes_len, }; let message = match T::decode( - bytes, + bytes.clone(), byte_span, header.kind, &mut self.app_id_cache, self.version, ) { Ok(msg) => msg, + Err(err) => { // We successfully parsed a header, but decided to drop the message altogether. // We must go back to looking for headers, or the decoder will just be stuck in a dead // state forever. - self.state = DecoderState::MessageHeader; + self.state = DecoderState::WaitingForMessageHeader; return Err(err.into()); } }; if let Some(message) = message { - re_log::trace!("Decoded new message"); - - self.state = DecoderState::MessageHeader; + self.state = DecoderState::WaitingForMessageHeader; return Ok(Some(message)); } else { - re_log::trace!("End of stream - expecting a new Streamheader"); + re_log::trace!( + "End of stream - expecting either a StreamFooter or a new Streamheader" + ); + + if !bytes.is_empty() { + let rrd_footer = + re_protos::log_msg::v1alpha1::RrdFooter::from_rrd_bytes(&bytes)?; + _ = rrd_footer; // TODO(cmc): we'll use that in the next PR, promise. + + // A non-empty ::End message means there must be a footer ahead, no exception. + self.state = DecoderState::WaitingForStreamFooter; + } else { + // There are 2 possible scenarios where we can end up here: + // * The recording doesn't contain any data messages (e.g. it's empty except for + // a `SetStoreInfo` message). + // * Backward compatibility: the payload is empty (i.e. `header.len == 0`) because the + // `End` message was written by a legacy encoder that predates the introduction of footers. + // + // Either way, we have to expect a footer next, since we don't know for sure which scenario + // we're in. We could check the encoder version and such, but that's just superfluous complexity + // since `WaitingForStreamFooter` already knows how to deal with optional footers anyway. + self.state = DecoderState::WaitingForStreamFooter; + } - // `None` means an end-of-stream marker was hit, but there might be another concatenated - // stream behind, so try to start all over again. - self.state = DecoderState::StreamHeader; return self.try_read(); } } + + // Not enough data yet -- wait to be fed and called back once again. } - DecoderState::Aborted => { - return Ok(None); + DecoderState::WaitingForStreamFooter => { + // NOTE: We're not peeking here! If we enter this state, then there must be a footer + // ahead, no exception, otherwise that's a violation of the framing protocol. + let position = self.byte_chunks.num_read(); + if let Some(bytes) = self + .byte_chunks + .try_read(crate::rrd::StreamFooter::ENCODED_SIZE_BYTES) + { + match crate::rrd::StreamFooter::from_rrd_bytes(&bytes) { + Ok(footer) => { + re_log::trace!(?footer, "found StreamFooter"); + + let StreamFooter { + fourcc: _, + identifier: _, + rrd_footer_byte_span_from_start_excluding_header, + crc_excluding_header: _, + } = footer; + + let rrd_footer_end = + rrd_footer_byte_span_from_start_excluding_header.end(); + + if rrd_footer_end > position as u64 { + // The RRD footer cannot possibly end after the stream footer starts, since it must + // be part of an ::End message. + re_log::error!( + position, + bytes = ?bytes, + ?footer, + err = "offsets are invalid", + "corrupt footer in rrd stream" + ); + } + + // And now we start all over. + self.state = DecoderState::WaitingForStreamHeader; + return self.try_read(); + } + + Err(err) => { + // A corrupt footer means we're now in an irrecoverable state, but it doesn't change the + // fact that we've just successfully yielded an entire stream's worth of data. So, instead + // of bubbling up errors all the way to the end user, merely log one here stop the state + // machine forever. + re_log::error!( + position, + bytes = ?bytes, + %err, + "corrupt footer in rrd stream" + ); + self.state = DecoderState::Aborted; + return Ok(None); + } + } + } + + // Not enough data yet -- wait to be fed and called back once again. } + + DecoderState::Aborted => return Ok(None), } - Ok(None) + Ok(None) // Not enough data yet -- wait to be fed and called back once again. } } diff --git a/crates/store/re_log_encoding/src/rrd/decoder/stream.rs b/crates/store/re_log_encoding/src/rrd/decoder/stream.rs index 5e12329e2e26..e4b05a408f78 100644 --- a/crates/store/re_log_encoding/src/rrd/decoder/stream.rs +++ b/crates/store/re_log_encoding/src/rrd/decoder/stream.rs @@ -151,7 +151,7 @@ impl Stream for DecoderSt // …and the underlying decoder already considers that it's done (i.e. it's // waiting for a whole new stream to begin): time to stop. - Ok(None) if decoder.state == DecoderState::StreamHeader => { + Ok(None) if decoder.state == DecoderState::WaitingForStreamHeader => { return std::task::Poll::Ready(None); } diff --git a/crates/store/re_log_encoding/src/rrd/encoder.rs b/crates/store/re_log_encoding/src/rrd/encoder.rs index 8502f29b3182..dad0b973a346 100644 --- a/crates/store/re_log_encoding/src/rrd/encoder.rs +++ b/crates/store/re_log_encoding/src/rrd/encoder.rs @@ -5,11 +5,11 @@ use std::borrow::Borrow; use re_build_info::CrateVersion; use re_chunk::{ChunkError, ChunkResult}; use re_log_types::LogMsg; +use re_sorbet::SorbetError; -use crate::ToTransport as _; -use crate::rrd::{ +use crate::{ CodecError, Compression, Encodable as _, EncodingOptions, MessageHeader, MessageKind, - Serializer, StreamHeader, + Serializer, StreamFooter, StreamHeader, ToTransport as _, }; // ---------------------------------------------------------------------------- @@ -31,6 +31,9 @@ pub enum EncodeError { #[error("Chunk error: {0}")] Chunk(Box), + + #[error("Sorbet error: {0}")] + Sorbet(Box), } const _: () = assert!( @@ -50,6 +53,12 @@ impl From for EncodeError { } } +impl From for EncodeError { + fn from(err: SorbetError) -> Self { + Self::Sorbet(Box::new(err)) + } +} + // ---------------------------------------------------------------------------- /// Encode a stream of [`LogMsg`] into an `.rrd` file. @@ -64,13 +73,62 @@ pub struct Encoder { /// Optional so that we can `take()` it in `into_inner`, while still being allowed to implement `Drop`. write: Option, - /// So we don't ever successfully write partial messages. + /// How many bytes written out so far? + num_written: u64, + + /// * So we don't ever successfully write partial messages. + /// * Because `prost` only supports buffers, not IO traits. scratch: Vec, - /// Tracks whether the end-of-stream marker has been written out already. + /// Tracks the state required to build the RRD footer for this stream. + /// + /// If set to `None`, the footer will not be computed. + /// + /// Calling [`Self::append_transport`] will automatically disable footers. + footer_state: Option, + + /// Tracks whether the end-of-stream marker, and optionally the associated footer, have been + /// written out already. is_finished: bool, } +/// The accumulated state used to build the footer when closing the [`Encoder`]. +/// +/// This is automatically updated when calling [`Encoder::append`]. +#[derive(Default)] +struct FooterState { + /// What is the currently active recording ID according to the state of the encoder, if any? + /// + /// Put another way: was there a `SetStoreInfo` message earlier in the stream? If so, we will + /// want to override the recording ID of each chunk with that one (because that's the existing + /// behavior, certainly not because it's nice). + recording_id_scope: Option, +} + +impl FooterState { + #[expect(clippy::unnecessary_wraps)] // won't stay for long + fn append( + &mut self, + _byte_span_excluding_header: re_span::Span, + msg: &re_log_types::LogMsg, + ) -> Result<(), EncodeError> { + match msg { + LogMsg::SetStoreInfo(msg) => { + self.recording_id_scope = Some(msg.info.store_id.clone()); + } + + LogMsg::ArrowMsg(_, _) | LogMsg::BlueprintActivationCommand(_) => {} + } + + Ok(()) + } + + #[expect(clippy::unnecessary_wraps, clippy::unused_self)] // won't stay for long + fn finish(self) -> Result { + Ok(crate::RrdFooter {}) + } +} + impl Encoder> { pub fn local() -> Result { Self::new_eager( @@ -126,7 +184,9 @@ impl Encoder { serializer: options.serializer, compression: options.compression, write: Some(write), + num_written: out.len() as u64, scratch: Vec::new(), + footer_state: Some(FooterState::default()), is_finished: false, }) } @@ -137,22 +197,55 @@ impl Encoder { return Err(EncodeError::AlreadyFinished); } - if self.write.is_none() { + let Some(w) = self.write.as_mut() else { return Err(EncodeError::AlreadyUnwrapped); - } + }; re_tracing::profile_function!(); - let message = message.to_transport(self.compression)?; - // Safety: the compression settings of this message are consistent with this stream. - #[expect(unsafe_code)] - unsafe { - self.append_transport(&message) + let transport = message.to_transport(self.compression)?; + + let byte_offset_excluding_header = + self.num_written + crate::MessageHeader::ENCODED_SIZE_BYTES as u64; + + self.scratch.clear(); + let n = match self.serializer { + Serializer::Protobuf => { + transport.to_rrd_bytes(&mut self.scratch)?; + let n = w + .write_all(&self.scratch) + .map(|_| self.scratch.len() as u64) + .map_err(EncodeError::Write)?; + self.num_written += n; + n + } + }; + + let byte_size_excluding_header = n - crate::MessageHeader::ENCODED_SIZE_BYTES as u64; + + let byte_span_excluding_header = re_span::Span { + start: byte_offset_excluding_header, + len: byte_size_excluding_header, + }; + + if let Some(footer_state) = self.footer_state.as_mut() { + footer_state.append(byte_span_excluding_header, message)?; } + + Ok(n) + } + + /// Instructs the encoder to _not_ emit a footer at the end of the stream. + /// + /// This cannot be reverted. + pub fn do_not_emit_footer(&mut self) { + self.footer_state = None; } /// Returns the size in bytes of the encoded data. /// + /// ⚠️ This implies [`Self::do_not_emit_footer`]. ⚠️ + /// /// ## Safety /// /// `message` must respect the global settings of the encoder (e.g. the compression used), @@ -166,19 +259,27 @@ impl Encoder { return Err(EncodeError::AlreadyFinished); } + re_tracing::profile_function!(); + + // We cannot update the RRD manifest without decoding the message, which would defeat the + // entire purposes of using this method in the first place. + // Therefore, we disable footers if and when this method is used. + self.do_not_emit_footer(); + let Some(w) = self.write.as_mut() else { return Err(EncodeError::AlreadyUnwrapped); }; - re_tracing::profile_function!(); - self.scratch.clear(); match self.serializer { Serializer::Protobuf => { message.to_rrd_bytes(&mut self.scratch)?; - w.write_all(&self.scratch) - .map(|_| self.scratch.len() as _) - .map_err(EncodeError::Write) + let n = w + .write_all(&self.scratch) + .map(|_| self.scratch.len() as u64) + .map_err(EncodeError::Write)?; + self.num_written += n; + Ok(n) } } } @@ -200,22 +301,52 @@ impl Encoder { return Err(EncodeError::AlreadyUnwrapped); }; - match self.serializer { - Serializer::Protobuf => { - // TODO(cmc): the extra heap-alloc and copy could be easily avoided with the - // introduction of an InMemoryWriter trait or similar. In practice it makes no - // difference and the cognitive overhead of this crate is already through the roof. - let mut header = Vec::new(); - MessageHeader { - kind: MessageKind::End, - len: 0, - } - .to_rrd_bytes(&mut header)?; - w.write_all(&header)?; - } + self.is_finished = true; + + let Some(footer_state) = self.footer_state.take() else { + return Ok(()); + }; + + // TODO(cmc): the extra heap-allocs and copies could be easily avoided with the + // introduction of an InMemoryWriter trait or similar. In practice it makes no + // difference and the cognitive overhead of this crate is already through the roof. + + use re_protos::external::prost::Message as _; + + // Message Header (::End) + + let rrd_footer = footer_state.finish()?; + let rrd_footer = rrd_footer.to_transport(())?; + + let mut out_header = Vec::new(); + MessageHeader { + kind: MessageKind::End, + len: rrd_footer.encoded_len() as u64, } + .to_rrd_bytes(&mut out_header)?; + w.write_all(&out_header).map_err(EncodeError::Write)?; + self.num_written += out_header.len() as u64; - self.is_finished = true; + let end_msg_byte_offset_from_start_excluding_header = self.num_written; + + // Message payload (re_protos::RrdFooter) + + let mut out_rrd_footer = Vec::new(); + rrd_footer.to_rrd_bytes(&mut out_rrd_footer)?; + w.write_all(&out_rrd_footer).map_err(EncodeError::Write)?; + self.num_written += out_rrd_footer.len() as u64; + + // StreamFooter + + let mut out_stream_footer = Vec::new(); + StreamFooter::from_rrd_footer_bytes( + end_msg_byte_offset_from_start_excluding_header, + &out_rrd_footer, + ) + .to_rrd_bytes(&mut out_stream_footer)?; + w.write_all(&out_stream_footer) + .map_err(EncodeError::Write)?; + self.num_written += out_stream_footer.len() as u64; Ok(()) } @@ -255,8 +386,6 @@ impl Encoder { } } -// TODO(cmc): It seems a bit suspicious to me that we send an EOS marker on drop, but don't flush. -// But I don't want to change any flushing behavior at the moment, so I'll keep it that way for now. impl std::ops::Drop for Encoder { fn drop(&mut self) { if self.write.is_none() { @@ -267,5 +396,9 @@ impl std::ops::Drop for Encoder { if let Err(err) = self.finish() { re_log::warn!("encoder couldn't be finished: {err}"); } + + if let Err(err) = self.flush_blocking() { + re_log::warn!("encoder couldn't be flushed: {err}"); + } } } diff --git a/crates/store/re_log_encoding/src/rrd/errors.rs b/crates/store/re_log_encoding/src/rrd/errors.rs index 2e7533dbcea5..ad745e295c01 100644 --- a/crates/store/re_log_encoding/src/rrd/errors.rs +++ b/crates/store/re_log_encoding/src/rrd/errors.rs @@ -1,4 +1,7 @@ use re_build_info::CrateVersion; +use re_chunk::ChunkError; + +pub type CodecResult = Result; /// Possible errors when encoding and decoding RRD data. /// @@ -25,8 +28,17 @@ pub enum CodecError { #[error("Data was from an old, incompatible Rerun version")] OldRrdVersion, - #[error("Failed to decode message header {0}")] - HeaderDecoding(String), + /// Something went wrong when attempting to decode any kind of RRD frame. + /// + /// There are 3 kinds of RRD frames: + /// * [`crate::StreamHeader`] + /// * [`crate::MessageHeader`] + /// * [`crate::StreamFooter`] + #[error("Failed to decode frame: {0}")] + FrameDecoding(String), + + #[error("CRC check failed: expected {expected:08x} but got {got:08x}")] + CrcMismatch { expected: u32, got: u32 }, #[error("Arrow IPC deserialization error: {0}")] ArrowDeserialization(::arrow::error::ArrowError), @@ -43,6 +55,9 @@ pub enum CodecError { #[error("Could not convert type from protobuf: {0}")] TypeConversion(Box), + #[error("Invalid chunk: {0}")] + Chunk(Box), + /// This is returned when `ArrowMsg` or `BlueprintActivationCommand` are received with a legacy /// store id (missing the application id) before the corresponding `SetStoreInfo` message. In /// that case, the best effort is to recover by dropping such message with a warning. @@ -79,6 +94,12 @@ impl From for CodecError { } } +impl From for CodecError { + fn from(value: ChunkError) -> Self { + Self::Chunk(Box::new(value)) + } +} + impl From for CodecError { fn from(value: re_protos::common::v1alpha1::ext::StoreIdMissingApplicationIdError) -> Self { Self::StoreIdMissingApplicationId { diff --git a/crates/store/re_log_encoding/src/rrd/footer/instances.rs b/crates/store/re_log_encoding/src/rrd/footer/instances.rs new file mode 100644 index 000000000000..ef13112b2624 --- /dev/null +++ b/crates/store/re_log_encoding/src/rrd/footer/instances.rs @@ -0,0 +1,16 @@ +/// This is the payload that is carried in messages of type `::End` in RRD streams. +/// +/// It keeps track of various useful information about the associated recording. +/// +/// During normal operations, there can only be a single `::End` message in an RRD stream, and +/// therefore a single `RrdFooter`. +/// It is possible to break that invariant by concatenating streams using external tools, +/// e.g. by doing something like `cat *.rrd > all_my_recordings.rrd`. +/// Passing that stream back through Rerun tools, e.g. `cat *.rrd | rerun rrd merge > all_my_recordings.rrd`, +/// would once again guarantee that only one `::End` message is present though. +/// I.e. that invariant holds as long as one stays within our ecosystem of tools. +/// +/// This is an application-level type, the associated transport-level type can be found +/// over at [`re_protos::log_msg::v1alpha1::RrdFooter`]. +#[derive(Default, Debug)] +pub struct RrdFooter {} diff --git a/crates/store/re_log_encoding/src/rrd/footer/mod.rs b/crates/store/re_log_encoding/src/rrd/footer/mod.rs new file mode 100644 index 000000000000..20e209ec5ec6 --- /dev/null +++ b/crates/store/re_log_encoding/src/rrd/footer/mod.rs @@ -0,0 +1,3 @@ +mod instances; + +pub use self::instances::RrdFooter; diff --git a/crates/store/re_log_encoding/src/rrd/frames.rs b/crates/store/re_log_encoding/src/rrd/frames.rs new file mode 100644 index 000000000000..b62bcf651c51 --- /dev/null +++ b/crates/store/re_log_encoding/src/rrd/frames.rs @@ -0,0 +1,451 @@ +use crate::rrd::{Decodable, Encodable, OptionsError}; + +// --- FileHeader --- + +pub use re_build_info::CrateVersion; // convenience +pub use re_protos::log_msg::v1alpha1::ext::Compression; // convenience + +/// How we serialize the data. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +#[repr(u8)] +pub enum Serializer { + Protobuf = 2, +} + +// TODO(cmc): None of these options make sense to have at the global scope and/or in the StreamHeader. +// * Global scope: that makes the decoder stateful in a very bad way (think e.g. about loading +// specific chunks straight from footer metadata). +// * StreamHeader: both of these are concerns that only apply to message payloads, and should therefore +// be flags in the MessageHeader. +// In practice I believe both are effectively completely ignored everywhere it matters. They need +// to go away for real though. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct EncodingOptions { + pub compression: Compression, + pub serializer: Serializer, +} + +impl EncodingOptions { + pub const ENCODED_SIZE_BYTES: usize = 4; +} + +impl EncodingOptions { + pub const PROTOBUF_COMPRESSED: Self = Self { + compression: Compression::LZ4, + serializer: Serializer::Protobuf, + }; + pub const PROTOBUF_UNCOMPRESSED: Self = Self { + compression: Compression::Off, + serializer: Serializer::Protobuf, + }; +} + +impl Encodable for EncodingOptions { + fn to_rrd_bytes(&self, out: &mut Vec) -> Result { + let Self { + compression, + serializer, + } = *self; + + let before = out.len() as u64; + + out.extend_from_slice(&[ + compression as u8, + serializer as u8, + 0, // reserved + 0, // reserved + ]); + + let n = out.len() as u64 - before; + assert_eq!(Self::ENCODED_SIZE_BYTES as u64, n); + + Ok(n) + } +} + +impl Decodable for EncodingOptions { + fn from_rrd_bytes(data: &[u8]) -> Result { + match data { + &[compression, serializer, 0, 0] => { + let compression = match compression { + 0 => Compression::Off, + 1 => Compression::LZ4, + _ => return Err(OptionsError::UnknownCompression(compression).into()), + }; + let serializer = match serializer { + 1 => return Err(OptionsError::RemovedMsgPackSerializer.into()), + 2 => Serializer::Protobuf, + _ => return Err(OptionsError::UnknownSerializer(serializer).into()), + }; + Ok(Self { + compression, + serializer, + }) + } + + _ => Err(OptionsError::UnknownReservedBytes.into()), + } + } +} + +// --- + +/// The opening frame in an RRD stream. +/// +/// During normal operations, there can only be a single [`StreamHeader`] per RRD stream. +/// It is possible to break that invariant by concatenating streams using external tools, +/// e.g. by doing something like `cat *.rrd > all_my_recordings.rrd`. +/// Passing that stream back through Rerun tools, e.g. `cat *.rrd | rerun rrd merge > all_my_recordings.rrd`, +/// would once again guarantee that only one stream header is present though. +/// I.e. that invariant holds as long as one stays within our ecosystem of tools. +#[derive(Debug, Clone, Copy)] +pub struct StreamHeader { + pub fourcc: [u8; 4], + pub version: [u8; 4], + pub options: EncodingOptions, +} + +impl StreamHeader { + pub const ENCODED_SIZE_BYTES: usize = 12; +} + +impl StreamHeader { + pub fn to_version_and_options( + self, + ) -> Result<(CrateVersion, EncodingOptions), crate::rrd::CodecError> { + { + // We used 0000 for all .rrd files up until 2023-02-27, post 0.2.0 release: + let encoded_version = if self.version == [0, 0, 0, 0] { + CrateVersion::new(0, 2, 0) + } else { + CrateVersion::from_bytes(self.version) + }; + + if encoded_version.major == 0 && encoded_version.minor < 23 { + // We broke compatibility for 0.23 for (hopefully) the last time. + return Err(crate::rrd::CodecError::IncompatibleRerunVersion { + file: Box::new(encoded_version), + local: Box::new(CrateVersion::LOCAL), + }); + } else if encoded_version <= CrateVersion::LOCAL { + // Loading old files should be fine, and if it is not, the chunk migration in re_sorbet should already log a warning. + } else { + re_log::warn_once!( + "Found data stream with Rerun version {encoded_version} which is newer than the local Rerun version ({}). This file may contain data that is not compatible with this version of Rerun. Consider updating Rerun.", + CrateVersion::LOCAL + ); + } + } + + Ok((CrateVersion::from_bytes(self.version), self.options)) + } +} + +impl Encodable for StreamHeader { + fn to_rrd_bytes(&self, out: &mut Vec) -> Result { + let Self { + fourcc, + version, + options, + } = self; + + let before = out.len() as u64; + + out.extend_from_slice(fourcc); + out.extend_from_slice(version); + { + // TODO(cmc): the extra heap-alloc and copy could be easily avoided with the + // introduction of an InMemoryWriter trait or similar. In practice it makes no + // difference and the cognitive overhead of this crate is already through the roof. + let mut options_out = Vec::new(); + options.to_rrd_bytes(&mut options_out)?; + out.extend_from_slice(&options_out); + } + + let n = out.len() as u64 - before; + assert_eq!(Self::ENCODED_SIZE_BYTES as u64, n); + + Ok(n) + } +} + +impl Decodable for StreamHeader { + fn from_rrd_bytes(data: &[u8]) -> Result { + if data.len() != Self::ENCODED_SIZE_BYTES { + return Err(crate::rrd::CodecError::FrameDecoding(format!( + "invalid StreamHeader length (expected {} but got {})", + Self::ENCODED_SIZE_BYTES, + data.len() + ))); + } + + let to_array_4b = |slice: &[u8]| slice.try_into().expect("always returns an Ok() variant"); + + let fourcc = to_array_4b(&data[0..4]); + + // Check magic bytes FIRST + if crate::rrd::OLD_RRD_FOURCC.contains(&fourcc) { + return Err(crate::rrd::CodecError::OldRrdVersion); + } else if fourcc != crate::rrd::RRD_FOURCC { + return Err(crate::rrd::CodecError::NotAnRrd( + crate::rrd::NotAnRrdError { + expected_fourcc: crate::rrd::RRD_FOURCC, + actual_fourcc: fourcc, + }, + )); + } + + let version = to_array_4b(&data[4..8]); + let options = EncodingOptions::from_rrd_bytes(&data[8..])?; + Ok(Self { + fourcc, + version, + options, + }) + } +} + +// --- + +/// The closing frame in an RRD stream. Keeps track of where the [`RrdFooter`] can be found. +/// +/// During normal operations, there can only be a single [`StreamFooter`] per RRD stream. +/// It is possible to break that invariant by concatenating streams using external tools, +/// e.g. by doing something like `cat *.rrd > all_my_recordings.rrd`. +/// Passing that stream back through Rerun tools, e.g. `cat *.rrd | rerun rrd merge > all_my_recordings.rrd`, +/// would once again guarantee that only one stream footer is present though. +/// I.e. that invariant holds as long as one stays within our ecosystem of tools. +/// +/// [`RrdFooter`]: [crate::RrdFooter] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct StreamFooter { + /// Same as the one in the [`StreamHeader`], i.e. [`crate::rrd::RRD_FOURCC`]. + /// + /// Used to go straight to the footer of an RRD stream ("backwards"). + pub fourcc: [u8; 4], // RRF2 + + /// A unique identifier to disambiguate the footer from other frames. + /// + /// Always set to [`Self::RRD_IDENTIFIER`]. + pub identifier: [u8; 4], // FOOT + + /// The span in bytes where the serialized [`RrdFooter`] payload starts end ends, excluding + /// the message header. + /// + /// I.e. a transport-level [`RrdFooter`] can be decoded from the bytes at (pseudo-code): + /// ```text + /// let start = stream_footer.rrd_footer_byte_span_from_start_excluding_header.start; + /// let end = stream_footer.rrd_footer_byte_span_from_start_excluding_header.end(); + /// let bytes = &file[start..end]; + /// let rrd_footer = re_protos::RrdFooter::decode(bytes)?; + /// let rrd_footer = rrd_footer.to_application()?; + /// ``` + /// + /// [`RrdFooter`]: [crate::RrdFooter] + pub rrd_footer_byte_span_from_start_excluding_header: re_span::Span, + + /// Checksum for the [`RrdFooter`] payload. + /// + /// The footer is most often accessed by jumping straight to it, so this is a nice extra safety + /// to make sure that we didn't just get "lucky" (or unlucky, rather) when jumping around and + /// parsing random bytes. + /// + /// For now, the checksum algorithm is hardcoded to `xxh32`, the 32bit variant of the + /// [`xxhash` family of hashing algorithms](https://xxhash.com/). + /// This is fast, HW-accelerated, non-cryptographic hash that is perfect when needing to hash + /// RRD footers, which can potentially get very, very large. + /// + /// [`RrdFooter`]: [crate::RrdFooter] + // + // TODO(cmc): It shouldn't be the job of the StreamFooter to carry checksums for a specific + // message's payload. All frames should have identifiers and CRCs for both themselves and their + // payloads, in which case this CRC would belong in the MessageHeader. + // TODO(cmc): In a potential future RRF3, we might make the choice of checksum algorithm + // configurable via flag. + pub crc_excluding_header: u32, +} + +impl StreamFooter { + pub const ENCODED_SIZE_BYTES: usize = 28; + pub const CRC_SEED: u32 = 7850921; // "RERUN" in base 26 (A=0, Z=25) + pub const RRD_IDENTIFIER: [u8; 4] = *b"FOOT"; + + pub fn new( + rrd_footer_byte_span_from_start_excluding_header: re_span::Span, + crc_excluding_header: u32, + ) -> Self { + Self { + fourcc: crate::RRD_FOURCC, + identifier: Self::RRD_IDENTIFIER, + rrd_footer_byte_span_from_start_excluding_header, + crc_excluding_header, + } + } + + pub fn from_rrd_footer_bytes( + rrd_footer_byte_offset_from_start_excluding_header: u64, + rrd_footer_bytes: &[u8], + ) -> Self { + let crc_excluding_header = xxhash_rust::xxh32::xxh32(rrd_footer_bytes, Self::CRC_SEED); + let rrd_footer_byte_span_from_start_excluding_header = re_span::Span { + start: rrd_footer_byte_offset_from_start_excluding_header, + len: rrd_footer_bytes.len() as u64, + }; + Self { + fourcc: crate::RRD_FOURCC, + identifier: Self::RRD_IDENTIFIER, + rrd_footer_byte_span_from_start_excluding_header, + crc_excluding_header, + } + } +} + +impl Encodable for StreamFooter { + fn to_rrd_bytes(&self, out: &mut Vec) -> Result { + let Self { + fourcc, + identifier, + rrd_footer_byte_span_from_start_excluding_header, + crc_excluding_header, + } = self; + + let before = out.len() as u64; + + out.extend_from_slice(fourcc); + out.extend_from_slice(identifier); + out.extend_from_slice( + &rrd_footer_byte_span_from_start_excluding_header + .start + .to_le_bytes(), + ); + out.extend_from_slice( + &rrd_footer_byte_span_from_start_excluding_header + .len + .to_le_bytes(), + ); + out.extend_from_slice(&crc_excluding_header.to_le_bytes()); + + let n = out.len() as u64 - before; + assert_eq!(Self::ENCODED_SIZE_BYTES as u64, n); + + Ok(n) + } +} + +impl Decodable for StreamFooter { + fn from_rrd_bytes(data: &[u8]) -> Result { + if data.len() != Self::ENCODED_SIZE_BYTES { + return Err(crate::rrd::CodecError::FrameDecoding(format!( + "invalid StreamFooter length (expected {} but got {})", + Self::ENCODED_SIZE_BYTES, + data.len() + ))); + } + + let to_array_4b = |slice: &[u8]| slice.try_into().expect("always returns an Ok() variant"); + + let fourcc: [u8; 4] = to_array_4b(&data[0..4]); + if fourcc != crate::RRD_FOURCC { + return Err(crate::rrd::CodecError::FrameDecoding(format!( + "invalid StreamFooter FourCC (expected {:?} but got {:?})", + crate::RRD_FOURCC, + fourcc, + ))); + } + + let identifier: [u8; 4] = to_array_4b(&data[4..8]); + if identifier != Self::RRD_IDENTIFIER { + return Err(crate::rrd::CodecError::FrameDecoding(format!( + "invalid StreamFooter identifier (expected {:?} but got {:?})", + Self::RRD_IDENTIFIER, + identifier, + ))); + } + + let rrd_footer_byte_span_from_start_excluding_header = re_span::Span { + start: u64::from_le_bytes(data[8..16].try_into().expect("cannot fail, checked above")), + len: u64::from_le_bytes(data[16..24].try_into().expect("cannot fail, checked above")), + }; + let crc = u32::from_le_bytes(data[24..28].try_into().expect("cannot fail, checked above")); + + Ok(Self { + fourcc, + identifier, + rrd_footer_byte_span_from_start_excluding_header, + crc_excluding_header: crc, + }) + } +} + +// --- MessageHeader --- + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[repr(u64)] +pub enum MessageKind { + End = Self::END, + SetStoreInfo = Self::SET_STORE_INFO, + ArrowMsg = Self::ARROW_MSG, + BlueprintActivationCommand = Self::BLUEPRINT_ACTIVATION_COMMAND, +} + +impl MessageKind { + const END: u64 = 0; + const SET_STORE_INFO: u64 = 1; + const ARROW_MSG: u64 = 2; + const BLUEPRINT_ACTIVATION_COMMAND: u64 = 3; +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub struct MessageHeader { + pub kind: MessageKind, + pub len: u64, +} + +impl MessageHeader { + pub const ENCODED_SIZE_BYTES: usize = 16; +} + +impl Encodable for MessageHeader { + fn to_rrd_bytes(&self, out: &mut Vec) -> Result { + let Self { kind, len } = *self; + + let before = out.len() as u64; + + out.extend_from_slice(&(kind as u64).to_le_bytes()); + out.extend_from_slice(&len.to_le_bytes()); + + let n = out.len() as u64 - before; + assert_eq!(Self::ENCODED_SIZE_BYTES as u64, n); + + Ok(n) + } +} + +impl Decodable for MessageHeader { + fn from_rrd_bytes(data: &[u8]) -> Result { + if data.len() != Self::ENCODED_SIZE_BYTES { + return Err(crate::rrd::CodecError::FrameDecoding(format!( + "invalid MessageHeader length (expected {} but got {})", + Self::ENCODED_SIZE_BYTES, + data.len() + ))); + } + + let kind = u64::from_le_bytes(data[0..8].try_into().expect("cannot fail, checked above")); + let kind = match kind { + MessageKind::END => MessageKind::End, + MessageKind::SET_STORE_INFO => MessageKind::SetStoreInfo, + MessageKind::ARROW_MSG => MessageKind::ArrowMsg, + MessageKind::BLUEPRINT_ACTIVATION_COMMAND => MessageKind::BlueprintActivationCommand, + _ => { + return Err(crate::rrd::CodecError::FrameDecoding(format!( + "unknown MessageHeader kind: {kind:?}" + ))); + } + }; + + let len = u64::from_le_bytes(data[8..16].try_into().expect("cannot fail, checked above")); + + Ok(Self { kind, len }) + } +} diff --git a/crates/store/re_log_encoding/src/rrd/headers.rs b/crates/store/re_log_encoding/src/rrd/headers.rs deleted file mode 100644 index f02204ac9a49..000000000000 --- a/crates/store/re_log_encoding/src/rrd/headers.rs +++ /dev/null @@ -1,265 +0,0 @@ -use crate::rrd::{Decodable, Encodable, OptionsError}; - -// --- FileHeader --- - -pub use re_build_info::CrateVersion; // convenience -pub use re_protos::log_msg::v1alpha1::ext::Compression; // convenience - -/// How we serialize the data. -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -#[repr(u8)] -pub enum Serializer { - Protobuf = 2, -} - -#[derive(Clone, Copy, Debug, PartialEq, Eq)] -pub struct EncodingOptions { - pub compression: Compression, - pub serializer: Serializer, -} - -impl EncodingOptions { - pub const ENCODED_SIZE_BYTES: usize = 4; -} - -impl EncodingOptions { - pub const PROTOBUF_COMPRESSED: Self = Self { - compression: Compression::LZ4, - serializer: Serializer::Protobuf, - }; - pub const PROTOBUF_UNCOMPRESSED: Self = Self { - compression: Compression::Off, - serializer: Serializer::Protobuf, - }; -} - -impl Encodable for EncodingOptions { - fn to_rrd_bytes(&self, out: &mut Vec) -> Result { - let Self { - compression, - serializer, - } = *self; - - let before = out.len() as u64; - - out.extend_from_slice(&[ - compression as u8, - serializer as u8, - 0, // reserved - 0, // reserved - ]); - - let n = out.len() as u64 - before; - assert_eq!(Self::ENCODED_SIZE_BYTES as u64, n); - - Ok(n) - } -} - -impl Decodable for EncodingOptions { - fn from_rrd_bytes(data: &[u8]) -> Result { - match data { - &[compression, serializer, 0, 0] => { - let compression = match compression { - 0 => Compression::Off, - 1 => Compression::LZ4, - _ => return Err(OptionsError::UnknownCompression(compression).into()), - }; - let serializer = match serializer { - 1 => return Err(OptionsError::RemovedMsgPackSerializer.into()), - 2 => Serializer::Protobuf, - _ => return Err(OptionsError::UnknownSerializer(serializer).into()), - }; - Ok(Self { - compression, - serializer, - }) - } - - _ => Err(OptionsError::UnknownReservedBytes.into()), - } - } -} - -// --- - -#[derive(Debug, Clone, Copy)] -pub struct StreamHeader { - pub fourcc: [u8; 4], - pub version: [u8; 4], - pub options: EncodingOptions, -} - -impl StreamHeader { - pub const ENCODED_SIZE_BYTES: usize = 12; -} - -impl StreamHeader { - pub fn to_version_and_options( - self, - ) -> Result<(CrateVersion, EncodingOptions), crate::rrd::CodecError> { - { - // We used 0000 for all .rrd files up until 2023-02-27, post 0.2.0 release: - let encoded_version = if self.version == [0, 0, 0, 0] { - CrateVersion::new(0, 2, 0) - } else { - CrateVersion::from_bytes(self.version) - }; - - if encoded_version.major == 0 && encoded_version.minor < 23 { - // We broke compatibility for 0.23 for (hopefully) the last time. - return Err(crate::rrd::CodecError::IncompatibleRerunVersion { - file: Box::new(encoded_version), - local: Box::new(CrateVersion::LOCAL), - }); - } else if encoded_version <= CrateVersion::LOCAL { - // Loading old files should be fine, and if it is not, the chunk migration in re_sorbet should already log a warning. - } else { - re_log::warn_once!( - "Found data stream with Rerun version {encoded_version} which is newer than the local Rerun version ({}). This file may contain data that is not compatible with this version of Rerun. Consider updating Rerun.", - CrateVersion::LOCAL - ); - } - } - - Ok((CrateVersion::from_bytes(self.version), self.options)) - } -} - -impl Encodable for StreamHeader { - fn to_rrd_bytes(&self, out: &mut Vec) -> Result { - let Self { - fourcc, - version, - options, - } = self; - - let before = out.len() as u64; - - out.extend_from_slice(fourcc); - out.extend_from_slice(version); - { - // TODO(cmc): the extra heap-alloc and copy could be easily avoided with the - // introduction of an InMemoryWriter trait or similar. In practice it makes no - // difference and the cognitive overhead of this crate is already through the roof. - let mut options_out = Vec::new(); - options.to_rrd_bytes(&mut options_out)?; - out.extend_from_slice(&options_out); - } - - let n = out.len() as u64 - before; - assert_eq!(Self::ENCODED_SIZE_BYTES as u64, n); - - Ok(n) - } -} - -impl Decodable for StreamHeader { - fn from_rrd_bytes(data: &[u8]) -> Result { - if data.len() != Self::ENCODED_SIZE_BYTES { - return Err(crate::rrd::CodecError::HeaderDecoding(format!( - "invalid StreamHeader length (expected {} but got {})", - Self::ENCODED_SIZE_BYTES, - data.len() - ))); - } - - let to_array_4b = |slice: &[u8]| slice.try_into().expect("always returns an Ok() variant"); - - let fourcc = to_array_4b(&data[0..4]); - - // Check magic bytes FIRST - if crate::rrd::OLD_RRD_FOURCC.contains(&fourcc) { - return Err(crate::rrd::CodecError::OldRrdVersion); - } else if fourcc != crate::rrd::RRD_FOURCC { - return Err(crate::rrd::CodecError::NotAnRrd( - crate::rrd::NotAnRrdError { - expected_fourcc: crate::rrd::RRD_FOURCC, - actual_fourcc: fourcc, - }, - )); - } - - let version = to_array_4b(&data[4..8]); - let options = EncodingOptions::from_rrd_bytes(&data[8..])?; - Ok(Self { - fourcc, - version, - options, - }) - } -} - -// --- MessageHeader --- - -#[derive(Default, Debug, Clone, Copy, PartialEq, Eq)] -#[repr(u64)] -pub enum MessageKind { - #[default] - End = Self::END, - SetStoreInfo = Self::SET_STORE_INFO, - ArrowMsg = Self::ARROW_MSG, - BlueprintActivationCommand = Self::BLUEPRINT_ACTIVATION_COMMAND, -} - -impl MessageKind { - const END: u64 = 0; - const SET_STORE_INFO: u64 = 1; - const ARROW_MSG: u64 = 2; - const BLUEPRINT_ACTIVATION_COMMAND: u64 = 3; -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct MessageHeader { - pub kind: MessageKind, - pub len: u64, -} - -impl MessageHeader { - pub const ENCODED_SIZE_BYTES: usize = 16; -} - -impl Encodable for MessageHeader { - fn to_rrd_bytes(&self, out: &mut Vec) -> Result { - let Self { kind, len } = *self; - - let before = out.len() as u64; - - out.extend_from_slice(&(kind as u64).to_le_bytes()); - out.extend_from_slice(&len.to_le_bytes()); - - let n = out.len() as u64 - before; - assert_eq!(Self::ENCODED_SIZE_BYTES as u64, n); - - Ok(n) - } -} - -impl Decodable for MessageHeader { - fn from_rrd_bytes(data: &[u8]) -> Result { - if data.len() != Self::ENCODED_SIZE_BYTES { - return Err(crate::rrd::CodecError::HeaderDecoding(format!( - "invalid MessageHeader length (expected {} but got {})", - Self::ENCODED_SIZE_BYTES, - data.len() - ))); - } - - let kind = u64::from_le_bytes(data[0..8].try_into().expect("cannot fail, checked above")); - let kind = match kind { - MessageKind::END => MessageKind::End, - MessageKind::SET_STORE_INFO => MessageKind::SetStoreInfo, - MessageKind::ARROW_MSG => MessageKind::ArrowMsg, - MessageKind::BLUEPRINT_ACTIVATION_COMMAND => MessageKind::BlueprintActivationCommand, - _ => { - return Err(crate::rrd::CodecError::HeaderDecoding(format!( - "unknown MessageHeader kind: {kind:?}" - ))); - } - }; - - let len = u64::from_le_bytes(data[8..16].try_into().expect("cannot fail, checked above")); - - Ok(Self { kind, len }) - } -} diff --git a/crates/store/re_log_encoding/src/rrd/log_msg.rs b/crates/store/re_log_encoding/src/rrd/log_msg.rs index bb6b502718d4..105c5c350815 100644 --- a/crates/store/re_log_encoding/src/rrd/log_msg.rs +++ b/crates/store/re_log_encoding/src/rrd/log_msg.rs @@ -40,6 +40,26 @@ impl Encodable for re_protos::log_msg::v1alpha1::log_msg::Msg { } } +impl Encodable for re_protos::log_msg::v1alpha1::ArrowMsg { + fn to_rrd_bytes(&self, out: &mut Vec) -> Result { + use re_protos::external::prost::Message as _; + + let before = out.len() as u64; + self.encode(out)?; + Ok(out.len() as u64 - before) + } +} + +impl Encodable for re_protos::log_msg::v1alpha1::RrdFooter { + fn to_rrd_bytes(&self, out: &mut Vec) -> Result { + use re_protos::external::prost::Message as _; + + let before = out.len() as u64; + self.encode(out)?; + Ok(out.len() as u64 - before) + } +} + // NOTE: This is implemented for `Option<_>` because, in the native RRD protocol, the message kind // might be `MessageKind::End` (signifying end-of-stream), which has no representation in our Protobuf // definitions. I.e. `MessageKind::End` yields `None`. @@ -95,3 +115,10 @@ impl Decodable for re_protos::log_msg::v1alpha1::BlueprintActivationCommand { Ok(Self::decode(data)?) } } + +impl Decodable for re_protos::log_msg::v1alpha1::RrdFooter { + fn from_rrd_bytes(data: &[u8]) -> Result { + use re_protos::external::prost::Message as _; + Ok(Self::decode(data)?) + } +} diff --git a/crates/store/re_log_encoding/src/rrd/mod.rs b/crates/store/re_log_encoding/src/rrd/mod.rs index e6a412d098db..ee92814943a4 100644 --- a/crates/store/re_log_encoding/src/rrd/mod.rs +++ b/crates/store/re_log_encoding/src/rrd/mod.rs @@ -16,7 +16,8 @@ //! state machines that turn collections of `Encodable`s and `Decodable`s into actual RRD streams. mod errors; -mod headers; +mod footer; +mod frames; mod log_msg; #[cfg(feature = "decoder")] @@ -32,10 +33,11 @@ mod file_sink; #[cfg(feature = "stream_from_http")] pub mod stream_from_http; -pub use self::errors::{CodecError, NotAnRrdError, OptionsError}; -pub use self::headers::{ +pub use self::errors::{CodecError, CodecResult, NotAnRrdError, OptionsError}; +pub use self::footer::RrdFooter; +pub use self::frames::{ Compression, CrateVersion, EncodingOptions, MessageHeader, MessageKind, Serializer, - StreamHeader, + StreamFooter, StreamHeader, }; #[cfg(feature = "decoder")] diff --git a/crates/store/re_log_encoding/src/transport_to_app.rs b/crates/store/re_log_encoding/src/transport_to_app.rs index 04fb7098a759..aa3b4b94ceae 100644 --- a/crates/store/re_log_encoding/src/transport_to_app.rs +++ b/crates/store/re_log_encoding/src/transport_to_app.rs @@ -35,7 +35,7 @@ impl ToTransport for re_log_types::LogMsg { type Context<'a> = crate::rrd::Compression; fn to_transport(&self, compression: Self::Context<'_>) -> Result { - log_msg_to_proto(self, compression) + log_msg_app_to_transport(self, compression) } } @@ -47,7 +47,16 @@ impl ToTransport for re_log_types::ArrowMsg { &self, (store_id, compression): Self::Context<'_>, ) -> Result { - arrow_msg_to_proto(self, store_id, compression) + arrow_msg_app_to_transport(self, store_id, compression) + } +} + +impl ToTransport for crate::RrdFooter { + type Output = re_protos::log_msg::v1alpha1::RrdFooter; + type Context<'a> = (); + + fn to_transport(&self, _: Self::Context<'_>) -> Result { + Ok(Self::Output {}) } } @@ -67,7 +76,7 @@ impl ToApplication for re_protos::log_msg::v1alpha1::log_msg::Msg { &self, (app_id_injector, patched_version): Self::Context<'_>, ) -> Result { - let mut log_msg = log_msg_to_app(app_id_injector, self)?; + let mut log_msg = log_msg_transport_to_app(app_id_injector, self)?; if let Some(patched_version) = patched_version && let re_log_types::LogMsg::SetStoreInfo(msg) = &mut log_msg @@ -104,7 +113,16 @@ impl ToApplication for re_protos::log_msg::v1alpha1::ArrowMsg { type Context<'a> = (); fn to_application(&self, _context: Self::Context<'_>) -> Result { - arrow_msg_to_app(self) + arrow_msg_transport_to_app(self) + } +} + +impl ToApplication for re_protos::log_msg::v1alpha1::RrdFooter { + type Output = crate::RrdFooter; + type Context<'a> = (); + + fn to_application(&self, _context: Self::Context<'_>) -> Result { + Ok(Self::Output {}) } } @@ -118,7 +136,7 @@ impl ToApplication for re_protos::log_msg::v1alpha1::ArrowMsg { /// /// The provided [`ApplicationIdInjector`] must be shared across all calls for the same stream. #[tracing::instrument(level = "trace", skip_all)] -fn log_msg_to_app( +fn log_msg_transport_to_app( app_id_injector: &mut I, message: &re_protos::log_msg::v1alpha1::log_msg::Msg, ) -> Result { @@ -134,7 +152,7 @@ fn log_msg_to_app( } Msg::ArrowMsg(arrow_msg) => { - let encoded = arrow_msg_to_app(arrow_msg)?; + let encoded = arrow_msg_transport_to_app(arrow_msg)?; //TODO(#10730): clean that up when removing 0.24 back compat let store_id: re_log_types::StoreId = match arrow_msg @@ -194,7 +212,7 @@ fn log_msg_to_app( /// Converts a transport-level `ArrowMsg` to its application-level counterpart. #[tracing::instrument(level = "trace", skip_all)] -fn arrow_msg_to_app( +fn arrow_msg_transport_to_app( arrow_msg: &re_protos::log_msg::v1alpha1::ArrowMsg, ) -> Result { re_tracing::profile_function!(); @@ -239,7 +257,7 @@ fn arrow_msg_to_app( /// Converts an application-level `LogMsg` to its transport-level counterpart. #[tracing::instrument(level = "trace", skip_all)] -fn log_msg_to_proto( +fn log_msg_app_to_transport( message: &re_log_types::LogMsg, compression: crate::rrd::Compression, ) -> Result { @@ -251,7 +269,7 @@ fn log_msg_to_proto( } re_log_types::LogMsg::ArrowMsg(store_id, arrow_msg) => { - let arrow_msg = arrow_msg_to_proto(arrow_msg, store_id.clone(), compression)?; + let arrow_msg = arrow_msg_app_to_transport(arrow_msg, store_id.clone(), compression)?; re_protos::log_msg::v1alpha1::log_msg::Msg::ArrowMsg(arrow_msg) } @@ -267,7 +285,7 @@ fn log_msg_to_proto( /// Converts an application-level `ArrowMsg` to its transport-level counterpart. #[tracing::instrument(level = "trace", skip_all)] -fn arrow_msg_to_proto( +fn arrow_msg_app_to_transport( arrow_msg: &re_log_types::ArrowMsg, store_id: re_log_types::StoreId, compression: crate::rrd::Compression, diff --git a/crates/store/re_log_encoding/tests/footer_roundtrip.rs b/crates/store/re_log_encoding/tests/footer_roundtrip.rs new file mode 100644 index 000000000000..17048b1f1901 --- /dev/null +++ b/crates/store/re_log_encoding/tests/footer_roundtrip.rs @@ -0,0 +1,57 @@ +use re_chunk::RowId; +use re_log_encoding::{Decodable as _, Encoder, ToApplication as _}; +use re_log_types::{LogMsg, StoreId}; + +#[test] +fn footer_empty() { + fn generate_store_id() -> StoreId { + StoreId::recording("my_app", "my_empty_recording") + } + + fn generate_recording() -> impl Iterator { + let store_id = generate_store_id(); + + std::iter::once(LogMsg::SetStoreInfo(re_log_types::SetStoreInfo { + row_id: *RowId::ZERO, + info: re_log_types::StoreInfo { + store_id: store_id.clone(), + cloned_from: None, + store_source: re_log_types::StoreSource::Unknown, + store_version: Some(re_build_info::CrateVersion::new(1, 2, 3)), + is_partial: false, + }, + })) + } + + let msgs_encoded = Encoder::encode(generate_recording().map(Ok)).unwrap(); + + let stream_footer_start = msgs_encoded + .len() + .checked_sub(re_log_encoding::StreamFooter::ENCODED_SIZE_BYTES) + .unwrap(); + let stream_footer = + re_log_encoding::StreamFooter::from_rrd_bytes(&msgs_encoded[stream_footer_start..]) + .unwrap(); + + let rrd_footer_range = stream_footer + .rrd_footer_byte_span_from_start_excluding_header + .try_cast::() + .unwrap() + .range(); + let rrd_footer_bytes = &msgs_encoded[rrd_footer_range]; + + { + let crc = re_log_encoding::StreamFooter::from_rrd_footer_bytes( + stream_footer + .rrd_footer_byte_span_from_start_excluding_header + .start, + rrd_footer_bytes, + ) + .crc_excluding_header; + similar_asserts::assert_eq!(stream_footer.crc_excluding_header, crc); + } + + let rrd_footer = + re_protos::log_msg::v1alpha1::RrdFooter::from_rrd_bytes(rrd_footer_bytes).unwrap(); + let _rrd_footer = rrd_footer.to_application(()).unwrap(); +} diff --git a/crates/store/re_protos/proto/rerun/v1alpha1/log_msg.proto b/crates/store/re_protos/proto/rerun/v1alpha1/log_msg.proto index 4201cbc5f846..4850f5058129 100644 --- a/crates/store/re_protos/proto/rerun/v1alpha1/log_msg.proto +++ b/crates/store/re_protos/proto/rerun/v1alpha1/log_msg.proto @@ -227,3 +227,19 @@ message StoreVersion { // See `CrateVersion` in `re_build_info`. int32 crate_version_bits = 1; } + +// This is the payload that is carried in messages of type `::End` in RRD streams. +// +// It keeps track of various useful information about the associated recording. +// +// During normal operations, there can only be a single `::End` message in an RRD stream, and +// therefore a single `RrdFooter`. +// It is possible to break that invariant by concatenating streams using external tools, +// e.g. by doing something like `cat *.rrd > all_my_recordings.rrd`. +// Passing that stream back through Rerun tools, e.g. `cat *.rrd | rerun rrd merge > all_my_recordings.rrd`, +// would once again guarantee that only one `::End` message is present though. +// I.e. that invariant holds as long as one stays within our ecosystem of tools. +// +// This is a transport-level type, the associated application-level type can be found +// in `re_log_encoding::RrdFooter`. +message RrdFooter {} diff --git a/crates/store/re_protos/src/v1alpha1/rerun.log_msg.v1alpha1.rs b/crates/store/re_protos/src/v1alpha1/rerun.log_msg.v1alpha1.rs index eab236a7d738..2a44d4dc9038 100644 --- a/crates/store/re_protos/src/v1alpha1/rerun.log_msg.v1alpha1.rs +++ b/crates/store/re_protos/src/v1alpha1/rerun.log_msg.v1alpha1.rs @@ -269,6 +269,32 @@ impl ::prost::Name for StoreVersion { "/rerun.log_msg.v1alpha1.StoreVersion".into() } } +/// This is the payload that is carried in messages of type `::End` in RRD streams. +/// +/// It keeps track of various useful information about the associated recording. +/// +/// During normal operations, there can only be a single `::End` message in an RRD stream, and +/// therefore a single `RrdFooter`. +/// It is possible to break that invariant by concatenating streams using external tools, +/// e.g. by doing something like `cat *.rrd > all_my_recordings.rrd`. +/// Passing that stream back through Rerun tools, e.g. `cat *.rrd | rerun rrd merge > all_my_recordings.rrd`, +/// would once again guarantee that only one `::End` message is present though. +/// I.e. that invariant holds as long as one stays within our ecosystem of tools. +/// +/// This is a transport-level type, the associated application-level type can be found +/// in `re_log_encoding::RrdFooter`. +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct RrdFooter {} +impl ::prost::Name for RrdFooter { + const NAME: &'static str = "RrdFooter"; + const PACKAGE: &'static str = "rerun.log_msg.v1alpha1"; + fn full_name() -> ::prost::alloc::string::String { + "rerun.log_msg.v1alpha1.RrdFooter".into() + } + fn type_url() -> ::prost::alloc::string::String { + "/rerun.log_msg.v1alpha1.RrdFooter".into() + } +} /// The type of compression used on the payload. #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] diff --git a/crates/store/re_server/src/store/layer.rs b/crates/store/re_server/src/store/layer.rs index 05d6fc063f85..24d954a49d78 100644 --- a/crates/store/re_server/src/store/layer.rs +++ b/crates/store/re_server/src/store/layer.rs @@ -1,8 +1,8 @@ +use std::collections::HashMap; + use arrow::array::RecordBatch; use arrow::datatypes::Schema; use arrow::error::ArrowError; -use sha2::Digest as _; -use std::collections::HashMap; use re_byte_size::SizeBytes as _; use re_chunk_store::ChunkStoreHandle; @@ -68,6 +68,7 @@ impl Layer { schema_ipc }; + use sha2::Digest as _; let mut hash = [0u8; 32]; let mut hasher = sha2::Sha256::new(); hasher.update(&partition_schema_ipc); diff --git a/rerun_py/rerun_sdk/rerun/recording_stream.py b/rerun_py/rerun_sdk/rerun/recording_stream.py index f726ff8a278a..a570b12dfbc3 100644 --- a/rerun_py/rerun_sdk/rerun/recording_stream.py +++ b/rerun_py/rerun_sdk/rerun/recording_stream.py @@ -486,6 +486,7 @@ def flush(self, *, timeout_sec: float = 1e38) -> None: """ bindings.flush(timeout_sec=timeout_sec, recording=self.to_native()) + # TODO(RR-3065): SDK should flush both IO and app-level logic when a recording gets GC'd def __del__(self) -> None: # type: ignore[no-untyped-def] recording = self.to_native() # TODO(jleibs): I'm 98% sure this flush is redundant, but removing it requires more thorough testing.