Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -9071,6 +9071,7 @@ dependencies = [
"bytes",
"criterion",
"ehttp",
"itertools 0.14.0",
"js-sys",
"lz4_flex 0.12.0",
"mimalloc",
Expand All @@ -9093,6 +9094,7 @@ dependencies = [
"wasm-bindgen-futures",
"web-sys",
"web-time",
"xxhash-rust",
]

[[package]]
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ wgpu = { version = "27.0.1", default-features = false, features = [
"fragile-send-sync-non-atomic-wasm",
] }
xshell = "0.2.7"
xxhash-rust = { version = "0.8", features = ["xxh32"] }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious: why this particular hash?


# ---------------------------------------------------------------------------------
[profile]
Expand Down
2 changes: 2 additions & 0 deletions crates/store/re_log_encoding/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,12 @@ re_tracing.workspace = true

# External:
arrow = { workspace = true, features = ["ipc"] }
itertools.workspace = true
lz4_flex = { workspace = true }
parking_lot.workspace = true
thiserror.workspace = true
tracing.workspace = true
xxhash-rust.workspace = true

# Optional external dependencies:
bytes = { workspace = true, optional = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_log_encoding/src/rrd/decoder/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl<T: DecoderEntrypoint, R: std::io::BufRead> std::iter::Iterator for DecoderI

// …and the underlying decoder already considers that it's done (i.e. it's
// waiting for a whole new stream to begin): time to stop.
Ok(None) if self.decoder.state == DecoderState::StreamHeader => {
Ok(None) if self.decoder.state == DecoderState::WaitingForStreamHeader => {
return None;
}

Expand Down
251 changes: 190 additions & 61 deletions crates/store/re_log_encoding/src/rrd/decoder/state_machine.rs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion crates/store/re_log_encoding/src/rrd/decoder/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl<T: DecoderEntrypoint + Unpin, R: AsyncBufRead + Unpin> Stream for DecoderSt

// …and the underlying decoder already considers that it's done (i.e. it's
// waiting for a whole new stream to begin): time to stop.
Ok(None) if decoder.state == DecoderState::StreamHeader => {
Ok(None) if decoder.state == DecoderState::WaitingForStreamHeader => {
return std::task::Poll::Ready(None);
}

Expand Down
199 changes: 166 additions & 33 deletions crates/store/re_log_encoding/src/rrd/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use std::borrow::Borrow;
use re_build_info::CrateVersion;
use re_chunk::{ChunkError, ChunkResult};
use re_log_types::LogMsg;
use re_sorbet::SorbetError;

use crate::ToTransport as _;
use crate::rrd::{
use crate::{
CodecError, Compression, Encodable as _, EncodingOptions, MessageHeader, MessageKind,
Serializer, StreamHeader,
Serializer, StreamFooter, StreamHeader, ToTransport as _,
};

// ----------------------------------------------------------------------------
Expand All @@ -31,6 +31,9 @@ pub enum EncodeError {

#[error("Chunk error: {0}")]
Chunk(Box<ChunkError>),

#[error("Sorbet error: {0}")]
Sorbet(Box<SorbetError>),
}

const _: () = assert!(
Expand All @@ -50,6 +53,12 @@ impl From<ChunkError> for EncodeError {
}
}

impl From<SorbetError> for EncodeError {
fn from(err: SorbetError) -> Self {
Self::Sorbet(Box::new(err))
}
}

// ----------------------------------------------------------------------------

/// Encode a stream of [`LogMsg`] into an `.rrd` file.
Expand All @@ -64,13 +73,63 @@ pub struct Encoder<W: std::io::Write> {
/// Optional so that we can `take()` it in `into_inner`, while still being allowed to implement `Drop`.
write: Option<W>,

/// So we don't ever successfully write partial messages.
/// How many bytes written out so far?
num_written: u64,

/// * So we don't ever successfully write partial messages.
/// * Because `prost` only supports buffers, not IO traits.
scratch: Vec<u8>,

/// Tracks whether the end-of-stream marker has been written out already.
/// Tracks the state required to build the RRD footer for this stream.
///
/// If set to `None`, the footer will not be computed.
///
/// Calling [`Self::append_transport`] will automatically disable footers.
footer_state: Option<FooterState>,

/// Tracks whether the end-of-stream marker, and optionally the associated footer, have been
/// written out already.
is_finished: bool,
}

/// The accumulated state used to build the footer when closing the [`Encoder`].
///
/// This is automatically updated when calling [`Encoder::append`].
#[derive(Default)]
struct FooterState {
/// What is the currently active recording ID according to the state of the encoder, if any?
///
/// Put another way: was there a `SetStoreInfo` message earlier in the stream? If so, we will
/// want to override the recording ID of each chunk with that one (because that's the existing
/// behavior, certainly not because it's nice).
recording_id_scope: Option<re_log_types::StoreId>,
}

impl FooterState {
#[expect(clippy::unnecessary_wraps)] // won't stay for long
fn append(
&mut self,
_byte_offset: u64,
_byte_size: u64,
msg: &re_log_types::LogMsg,
) -> Result<(), EncodeError> {
match msg {
LogMsg::SetStoreInfo(msg) => {
self.recording_id_scope = Some(msg.info.store_id.clone());
}

LogMsg::ArrowMsg(_, _) | LogMsg::BlueprintActivationCommand(_) => {}
}

Ok(())
}

#[expect(clippy::unnecessary_wraps, clippy::unused_self)] // won't stay for long
fn finish(self) -> Result<crate::RrdFooter, EncodeError> {
Ok(crate::RrdFooter {})
}
}

impl Encoder<Vec<u8>> {
pub fn local() -> Result<Self, EncodeError> {
Self::new_eager(
Expand Down Expand Up @@ -126,7 +185,9 @@ impl<W: std::io::Write> Encoder<W> {
serializer: options.serializer,
compression: options.compression,
write: Some(write),
num_written: out.len() as u64,
scratch: Vec::new(),
footer_state: Some(FooterState::default()),
is_finished: false,
})
}
Expand All @@ -137,22 +198,54 @@ impl<W: std::io::Write> Encoder<W> {
return Err(EncodeError::AlreadyFinished);
}

if self.write.is_none() {
let Some(w) = self.write.as_mut() else {
return Err(EncodeError::AlreadyUnwrapped);
}
};

re_tracing::profile_function!();

let message = message.to_transport(self.compression)?;
// Safety: the compression settings of this message are consistent with this stream.
#[expect(unsafe_code)]
unsafe {
self.append_transport(&message)
let transport = message.to_transport(self.compression)?;

let byte_offset_excluding_header =
self.num_written + crate::MessageHeader::ENCODED_SIZE_BYTES as u64;

self.scratch.clear();
let n = match self.serializer {
Serializer::Protobuf => {
transport.to_rrd_bytes(&mut self.scratch)?;
let n = w
.write_all(&self.scratch)
.map(|_| self.scratch.len() as u64)
.map_err(EncodeError::Write)?;
self.num_written += n;
n
}
};

let byte_size_excluding_header = n - crate::MessageHeader::ENCODED_SIZE_BYTES as u64;

if let Some(footer_state) = self.footer_state.as_mut() {
footer_state.append(
byte_offset_excluding_header,
byte_size_excluding_header,
Comment on lines +229 to +230
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: could use re_span::Span for this if you like

message,
)?;
}

Ok(n)
}

/// Instructs the encoder to _not_ emit a footer at the end of the stream.
///
/// This cannot be reverted.
pub fn do_not_emit_footer(&mut self) {
self.footer_state = None;
}

/// Returns the size in bytes of the encoded data.
///
/// ⚠️ This implies [`Self::do_not_emit_footer`]. ⚠️
///
/// ## Safety
///
/// `message` must respect the global settings of the encoder (e.g. the compression used),
Expand All @@ -166,19 +259,27 @@ impl<W: std::io::Write> Encoder<W> {
return Err(EncodeError::AlreadyFinished);
}

re_tracing::profile_function!();

// We cannot update the RRD manifest without decoding the message, which would defeat the
// entire purposes of using this method in the first place.
// Therefore, we disable footers if and when this method is used.
self.do_not_emit_footer();

let Some(w) = self.write.as_mut() else {
return Err(EncodeError::AlreadyUnwrapped);
};

re_tracing::profile_function!();

self.scratch.clear();
match self.serializer {
Serializer::Protobuf => {
message.to_rrd_bytes(&mut self.scratch)?;
w.write_all(&self.scratch)
.map(|_| self.scratch.len() as _)
.map_err(EncodeError::Write)
let n = w
.write_all(&self.scratch)
.map(|_| self.scratch.len() as u64)
.map_err(EncodeError::Write)?;
self.num_written += n;
Ok(n)
}
}
}
Expand All @@ -200,22 +301,52 @@ impl<W: std::io::Write> Encoder<W> {
return Err(EncodeError::AlreadyUnwrapped);
};

match self.serializer {
Serializer::Protobuf => {
// TODO(cmc): the extra heap-alloc and copy could be easily avoided with the
// introduction of an InMemoryWriter trait or similar. In practice it makes no
// difference and the cognitive overhead of this crate is already through the roof.
let mut header = Vec::new();
MessageHeader {
kind: MessageKind::End,
len: 0,
}
.to_rrd_bytes(&mut header)?;
w.write_all(&header)?;
}
self.is_finished = true;

let Some(footer_state) = self.footer_state.take() else {
return Ok(());
};

// TODO(cmc): the extra heap-allocs and copies could be easily avoided with the
// introduction of an InMemoryWriter trait or similar. In practice it makes no
// difference and the cognitive overhead of this crate is already through the roof.

use re_protos::external::prost::Message as _;

// Message Header (::End)

let rrd_footer = footer_state.finish()?;
let rrd_footer = rrd_footer.to_transport(())?;

let mut out_header = Vec::new();
MessageHeader {
kind: MessageKind::End,
len: rrd_footer.encoded_len() as u64,
}
.to_rrd_bytes(&mut out_header)?;
w.write_all(&out_header).map_err(EncodeError::Write)?;
self.num_written += out_header.len() as u64;

self.is_finished = true;
let end_msg_byte_offset_from_start_excluding_header = self.num_written;

// Message payload (re_protos::RrdFooter)

let mut out_rrd_footer = Vec::new();
rrd_footer.to_rrd_bytes(&mut out_rrd_footer)?;
w.write_all(&out_rrd_footer).map_err(EncodeError::Write)?;
self.num_written += out_rrd_footer.len() as u64;

// StreamFooter

let mut out_stream_footer = Vec::new();
StreamFooter::from_rrd_footer_bytes(
end_msg_byte_offset_from_start_excluding_header,
&out_rrd_footer,
)
.to_rrd_bytes(&mut out_stream_footer)?;
w.write_all(&out_stream_footer)
.map_err(EncodeError::Write)?;
self.num_written += out_stream_footer.len() as u64;

Ok(())
}
Expand Down Expand Up @@ -255,8 +386,6 @@ impl<W: std::io::Write> Encoder<W> {
}
}

// TODO(cmc): It seems a bit suspicious to me that we send an EOS marker on drop, but don't flush.
// But I don't want to change any flushing behavior at the moment, so I'll keep it that way for now.
impl<W: std::io::Write> std::ops::Drop for Encoder<W> {
fn drop(&mut self) {
if self.write.is_none() {
Expand All @@ -267,5 +396,9 @@ impl<W: std::io::Write> std::ops::Drop for Encoder<W> {
if let Err(err) = self.finish() {
re_log::warn!("encoder couldn't be finished: {err}");
}

if let Err(err) = self.flush_blocking() {
re_log::warn!("encoder couldn't be flushed: {err}");
}
}
}
Loading
Loading