Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -9070,6 +9070,7 @@ dependencies = [
"bytes",
"criterion",
"ehttp",
"itertools 0.14.0",
"js-sys",
"lz4_flex 0.12.0",
"mimalloc",
Expand All @@ -9092,6 +9093,7 @@ dependencies = [
"wasm-bindgen-futures",
"web-sys",
"web-time",
"xxhash-rust",
]

[[package]]
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ wgpu = { version = "27.0.1", default-features = false, features = [
"fragile-send-sync-non-atomic-wasm",
] }
xshell = "0.2.7"
xxhash-rust = { version = "0.8", features = ["xxh32"] }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious: why this particular hash?


# ---------------------------------------------------------------------------------
[profile]
Expand Down
2 changes: 2 additions & 0 deletions crates/store/re_log_encoding/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,12 @@ re_tracing.workspace = true

# External:
arrow = { workspace = true, features = ["ipc"] }
itertools.workspace = true
lz4_flex = { workspace = true }
parking_lot.workspace = true
thiserror.workspace = true
tracing.workspace = true
xxhash-rust.workspace = true

# Optional external dependencies:
bytes = { workspace = true, optional = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_log_encoding/src/rrd/decoder/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl<T: DecoderEntrypoint, R: std::io::BufRead> std::iter::Iterator for DecoderI

// …and the underlying decoder already considers that it's done (i.e. it's
// waiting for a whole new stream to begin): time to stop.
Ok(None) if self.decoder.state == DecoderState::StreamHeader => {
Ok(None) if self.decoder.state == DecoderState::WaitingForStreamHeader => {
return None;
}

Expand Down
250 changes: 188 additions & 62 deletions crates/store/re_log_encoding/src/rrd/decoder/state_machine.rs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion crates/store/re_log_encoding/src/rrd/decoder/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ impl<T: DecoderEntrypoint + Unpin, R: AsyncBufRead + Unpin> Stream for DecoderSt

// …and the underlying decoder already considers that it's done (i.e. it's
// waiting for a whole new stream to begin): time to stop.
Ok(None) if decoder.state == DecoderState::StreamHeader => {
Ok(None) if decoder.state == DecoderState::WaitingForStreamHeader => {
return std::task::Poll::Ready(None);
}

Expand Down
199 changes: 166 additions & 33 deletions crates/store/re_log_encoding/src/rrd/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ use std::borrow::Borrow;
use re_build_info::CrateVersion;
use re_chunk::{ChunkError, ChunkResult};
use re_log_types::LogMsg;
use re_sorbet::SorbetError;

use crate::ToTransport as _;
use crate::rrd::{
use crate::{
CodecError, Compression, Encodable as _, EncodingOptions, MessageHeader, MessageKind,
Serializer, StreamHeader,
Serializer, StreamFooter, StreamHeader, ToTransport as _,
};

// ----------------------------------------------------------------------------
Expand All @@ -31,6 +31,9 @@ pub enum EncodeError {

#[error("Chunk error: {0}")]
Chunk(Box<ChunkError>),

#[error("Sorbet error: {0}")]
Sorbet(Box<SorbetError>),
}

const _: () = assert!(
Expand All @@ -50,6 +53,12 @@ impl From<ChunkError> for EncodeError {
}
}

impl From<SorbetError> for EncodeError {
fn from(err: SorbetError) -> Self {
Self::Sorbet(Box::new(err))
}
}

// ----------------------------------------------------------------------------

/// Encode a stream of [`LogMsg`] into an `.rrd` file.
Expand All @@ -64,13 +73,63 @@ pub struct Encoder<W: std::io::Write> {
/// Optional so that we can `take()` it in `into_inner`, while still being allowed to implement `Drop`.
write: Option<W>,

/// So we don't ever successfully write partial messages.
/// How many bytes written out so far?
num_written: u64,

/// * So we don't ever successfully write partial messages.
/// * Because `prost` only supports buffers, not IO traits.
scratch: Vec<u8>,

/// Tracks whether the end-of-stream marker has been written out already.
/// Tracks the state required to build the RRD manifest for this stream.
///
/// If set to `None`, the footer will not be computed.
///
/// Calling [`Self::append_transport`] will automatically disable footers.
footer_state: Option<FooterState>,

/// Tracks whether the end-of-stream marker, and optionally the associated footer, have been
/// written out already.
is_finished: bool,
}

/// The accumulated state used to build the footer when closing the [`Encoder`].
///
/// This is automatically updated when calling [`Encoder::append`].
#[derive(Default)]
struct FooterState {
/// What is the currently active recording ID according to the state of the encoder, if any?
///
/// Put another way: was there a `SetStoreInfo` message earlier in the stream? If so, we will
/// want to override the recording ID of each chunk with that one (because that's the existing
/// behavior, certainly not because it's nice).
recording_id_scope: Option<re_log_types::StoreId>,
}

impl FooterState {
#[expect(clippy::unnecessary_wraps)] // won't stay for long
fn append(
&mut self,
_byte_offset: u64,
_byte_size: u64,
msg: &re_log_types::LogMsg,
) -> Result<(), EncodeError> {
match msg {
LogMsg::SetStoreInfo(msg) => {
self.recording_id_scope = Some(msg.info.store_id.clone());
}

LogMsg::ArrowMsg(_, _) | LogMsg::BlueprintActivationCommand(_) => {}
}

Ok(())
}

#[expect(clippy::unnecessary_wraps, clippy::unused_self)] // won't stay for long
fn finish(self) -> Result<crate::RrdFooter, EncodeError> {
Ok(crate::RrdFooter {})
}
}

impl Encoder<Vec<u8>> {
pub fn local() -> Result<Self, EncodeError> {
Self::new_eager(
Expand Down Expand Up @@ -126,7 +185,9 @@ impl<W: std::io::Write> Encoder<W> {
serializer: options.serializer,
compression: options.compression,
write: Some(write),
num_written: out.len() as u64,
scratch: Vec::new(),
footer_state: Some(FooterState::default()),
is_finished: false,
})
}
Expand All @@ -137,22 +198,54 @@ impl<W: std::io::Write> Encoder<W> {
return Err(EncodeError::AlreadyFinished);
}

if self.write.is_none() {
let Some(w) = self.write.as_mut() else {
return Err(EncodeError::AlreadyUnwrapped);
}
};

re_tracing::profile_function!();

let message = message.to_transport(self.compression)?;
// Safety: the compression settings of this message are consistent with this stream.
#[expect(unsafe_code)]
unsafe {
self.append_transport(&message)
let transport = message.to_transport(self.compression)?;

let byte_offset_excluding_header =
self.num_written + crate::MessageHeader::ENCODED_SIZE_BYTES as u64;

self.scratch.clear();
let n = match self.serializer {
Serializer::Protobuf => {
transport.to_rrd_bytes(&mut self.scratch)?;
let n = w
.write_all(&self.scratch)
.map(|_| self.scratch.len() as u64)
.map_err(EncodeError::Write)?;
self.num_written += n;
n
}
};

let byte_size_excluding_header = n - crate::MessageHeader::ENCODED_SIZE_BYTES as u64;

if let Some(footer_state) = self.footer_state.as_mut() {
footer_state.append(
byte_offset_excluding_header,
byte_size_excluding_header,
Comment on lines +229 to +230
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: could use re_span::Span for this if you like

message,
)?;
}

Ok(n)
}

/// Instructs the encoder to _not_ emit a footer at the end of the stream.
///
/// This cannot be reverted.
pub fn do_not_emit_footer(&mut self) {
self.footer_state = None;
}

/// Returns the size in bytes of the encoded data.
///
/// ⚠️ This implies [`Self::do_not_emit_footer`]. ⚠️
///
/// ## Safety
///
/// `message` must respect the global settings of the encoder (e.g. the compression used),
Expand All @@ -166,19 +259,27 @@ impl<W: std::io::Write> Encoder<W> {
return Err(EncodeError::AlreadyFinished);
}

re_tracing::profile_function!();

// We cannot update the RRD manifest without decoding the message, which would defeat the
// entire purposes of using this method in the first place.
// Therefore, we disable footers if and when this method is used.
self.do_not_emit_footer();

let Some(w) = self.write.as_mut() else {
return Err(EncodeError::AlreadyUnwrapped);
};

re_tracing::profile_function!();

self.scratch.clear();
match self.serializer {
Serializer::Protobuf => {
message.to_rrd_bytes(&mut self.scratch)?;
w.write_all(&self.scratch)
.map(|_| self.scratch.len() as _)
.map_err(EncodeError::Write)
let n = w
.write_all(&self.scratch)
.map(|_| self.scratch.len() as u64)
.map_err(EncodeError::Write)?;
self.num_written += n;
Ok(n)
}
}
}
Expand All @@ -200,22 +301,52 @@ impl<W: std::io::Write> Encoder<W> {
return Err(EncodeError::AlreadyUnwrapped);
};

match self.serializer {
Serializer::Protobuf => {
// TODO(cmc): the extra heap-alloc and copy could be easily avoided with the
// introduction of an InMemoryWriter trait or similar. In practice it makes no
// difference and the cognitive overhead of this crate is already through the roof.
let mut header = Vec::new();
MessageHeader {
kind: MessageKind::End,
len: 0,
}
.to_rrd_bytes(&mut header)?;
w.write_all(&header)?;
}
self.is_finished = true;

let Some(footer_state) = self.footer_state.take() else {
return Ok(());
};

// TODO(cmc): the extra heap-allocs and copies could be easily avoided with the
// introduction of an InMemoryWriter trait or similar. In practice it makes no
// difference and the cognitive overhead of this crate is already through the roof.

use re_protos::external::prost::Message as _;

// Message Header (::End)

let rrd_footer = footer_state.finish()?;
let rrd_footer = rrd_footer.to_transport(())?;

let mut out_header = Vec::new();
MessageHeader {
kind: MessageKind::End,
len: rrd_footer.encoded_len() as u64,
}
.to_rrd_bytes(&mut out_header)?;
w.write_all(&out_header).map_err(EncodeError::Write)?;
self.num_written += out_header.len() as u64;

self.is_finished = true;
let end_msg_byte_offset_from_start_excluding_header = self.num_written;

// Message payload (re_protos::RrdFooter)

let mut out_rrd_footer = Vec::new();
rrd_footer.to_rrd_bytes(&mut out_rrd_footer)?;
w.write_all(&out_rrd_footer).map_err(EncodeError::Write)?;
self.num_written += out_rrd_footer.len() as u64;

// StreamFooter

let mut out_stream_footer = Vec::new();
StreamFooter::from_rrd_footer_bytes(
end_msg_byte_offset_from_start_excluding_header,
&out_rrd_footer,
)
.to_rrd_bytes(&mut out_stream_footer)?;
w.write_all(&out_stream_footer)
.map_err(EncodeError::Write)?;
self.num_written += out_stream_footer.len() as u64;

Ok(())
}
Expand Down Expand Up @@ -255,8 +386,6 @@ impl<W: std::io::Write> Encoder<W> {
}
}

// TODO(cmc): It seems a bit suspicious to me that we send an EOS marker on drop, but don't flush.
// But I don't want to change any flushing behavior at the moment, so I'll keep it that way for now.
impl<W: std::io::Write> std::ops::Drop for Encoder<W> {
fn drop(&mut self) {
if self.write.is_none() {
Expand All @@ -267,5 +396,9 @@ impl<W: std::io::Write> std::ops::Drop for Encoder<W> {
if let Err(err) = self.finish() {
re_log::warn!("encoder couldn't be finished: {err}");
}

if let Err(err) = self.flush_blocking() {
re_log::warn!("encoder couldn't be flushed: {err}");
}
}
}
Loading
Loading