diff --git a/Cargo.lock b/Cargo.lock index 749e1b727729..e371ae4b1eb1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1753,7 +1753,7 @@ checksum = "fe6d2e5af09e8c8ad56c969f2157a3d4238cebc7c55f0a517728c38f7b200f81" dependencies = [ "serde", "termcolor", - "unicode-width 0.1.14", + "unicode-width 0.2.1", ] [[package]] @@ -3026,7 +3026,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "330c60081dcc4c72131f8eb70510f1ac07223e5d4163db481a04a0befcffa412" dependencies = [ - "libloading 0.7.4", + "libloading 0.8.8", ] [[package]] @@ -4959,6 +4959,17 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" +[[package]] +name = "lenses" +version = "0.26.0-alpha.1+dev" +dependencies = [ + "anyhow", + "arrow", + "clap", + "insta", + "rerun", +] + [[package]] name = "lexical-core" version = "1.0.5" diff --git a/crates/store/re_chunk/src/chunk.rs b/crates/store/re_chunk/src/chunk.rs index 8d664059021c..ab7c0da49398 100644 --- a/crates/store/re_chunk/src/chunk.rs +++ b/crates/store/re_chunk/src/chunk.rs @@ -1247,7 +1247,7 @@ impl std::fmt::Display for Chunk { re_log::error_once!("couldn't display Chunk: {err}"); std::fmt::Error })?; - re_format_arrow::format_record_batch_with_width(&batch, f.width()).fmt(f) + re_format_arrow::format_record_batch_with_width(&batch, f.width(), f.sign_minus()).fmt(f) } } diff --git a/crates/store/re_chunk/tests/formatting.rs b/crates/store/re_chunk/tests/formatting.rs index 79bd43c56b0f..04890fcf1205 100644 --- a/crates/store/re_chunk/tests/formatting.rs +++ b/crates/store/re_chunk/tests/formatting.rs @@ -55,40 +55,11 @@ fn format_chunk() -> anyhow::Result<()> { Ok(()) } -/// Wrapper struct to help with `insta` snapshot tests. -struct ChunkRedacted(Chunk); - -impl std::fmt::Display for ChunkRedacted { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let batch = self.0.to_record_batch().map_err(|err| { - re_log::error_once!("couldn't display Chunk: {err}"); - std::fmt::Error - })?; - re_format_arrow::format_record_batch_opts( - &batch, - &re_format_arrow::RecordBatchFormatOpts { - transposed: false, - width: f.width(), - include_metadata: true, - include_column_metadata: true, - trim_field_names: false, - trim_metadata_keys: false, - trim_metadata_values: false, - redact_non_deterministic: true, - }, - ) - .fmt(f) - } -} - #[test] fn format_chunk_redacted() -> anyhow::Result<()> { let chunk = create_chunk()?; - insta::assert_snapshot!( - "format_chunk_redacted", - format!("{:240}", ChunkRedacted(chunk)) - ); + insta::assert_snapshot!("format_chunk_redacted", format!("{:-240}", chunk)); Ok(()) } diff --git a/crates/store/re_chunk/tests/snapshots/formatting__format_chunk_redacted.snap b/crates/store/re_chunk/tests/snapshots/formatting__format_chunk_redacted.snap index 83a0081047a1..309824048fd6 100644 --- a/crates/store/re_chunk/tests/snapshots/formatting__format_chunk_redacted.snap +++ b/crates/store/re_chunk/tests/snapshots/formatting__format_chunk_redacted.snap @@ -1,24 +1,24 @@ --- source: crates/store/re_chunk/tests/formatting.rs -expression: "format!(\"{:240}\", ChunkRedacted(chunk))" +expression: "format!(\"{:-240}\", chunk)" --- -┌──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ -│ METADATA: │ -│ * rerun:entity_path: /this/that │ -│ * rerun:heap_size_bytes: [**REDACTED**] │ -│ * rerun:id: [**REDACTED**] │ -│ * sorbet:version: [**REDACTED**] │ -├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ -│ ┌──────────────────────────────────────────────┬────────────────────────────┬───────────────────────────────┬───────────────────────────────────────┬──────────────────────────────────────────┐ │ -│ │ rerun.controls.RowId ┆ frame_nr ┆ log_time ┆ my_index ┆ example.MyPoints:colors │ │ -│ │ --- ┆ --- ┆ --- ┆ --- ┆ --- │ │ -│ │ type: FixedSizeBinary[16] ┆ type: i64 ┆ type: Timestamp(ns) ┆ type: List[nullable u64] ┆ type: List[nullable u32] │ │ -│ │ ARROW:extension:metadata: ┆ rerun:index_name: frame_nr ┆ rerun:index_name: log_time ┆ rerun:component: my_index ┆ rerun:archetype: example.MyPoints │ │ -│ │ {"namespace":"row"} ┆ rerun:is_sorted: true ┆ rerun:is_sorted: true ┆ rerun:component_type: example.MyIndex ┆ rerun:component: example.MyPoints:colors │ │ -│ │ ARROW:extension:name: rerun.datatypes.TUID ┆ rerun:kind: index ┆ rerun:kind: index ┆ rerun:kind: data ┆ rerun:component_type: example.MyColor │ │ -│ │ rerun:is_sorted: true ┆ ┆ ┆ ┆ rerun:kind: data │ │ -│ │ rerun:kind: control ┆ ┆ ┆ ┆ │ │ -│ ╞══════════════════════════════════════════════╪════════════════════════════╪═══════════════════════════════╪═══════════════════════════════════════╪══════════════════════════════════════════╡ │ -│ │ row_[**REDACTED**] ┆ 1 ┆ 2025-01-10T18:43:42.123456789 ┆ [0, 1, 2] ┆ [0, 1, 2] │ │ -│ └──────────────────────────────────────────────┴────────────────────────────┴───────────────────────────────┴───────────────────────────────────────┴──────────────────────────────────────────┘ │ -└──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘ +┌────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ +│ METADATA: │ +│ * entity_path: /this/that │ +│ * heap_size_bytes: [**REDACTED**] │ +│ * id: [**REDACTED**] │ +│ * version: [**REDACTED**] │ +├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ +│ ┌──────────────────────────────────────────────┬──────────────────────┬───────────────────────────────┬─────────────────────────────────┬────────────────────────────────────┐ │ +│ │ RowId ┆ frame_nr ┆ log_time ┆ my_index ┆ example.MyPoints:colors │ │ +│ │ --- ┆ --- ┆ --- ┆ --- ┆ --- │ │ +│ │ type: FixedSizeBinary[16] ┆ type: i64 ┆ type: Timestamp(ns) ┆ type: List[nullable u64] ┆ type: List[nullable u32] │ │ +│ │ ARROW:extension:metadata: ┆ index_name: frame_nr ┆ index_name: log_time ┆ component: my_index ┆ archetype: example.MyPoints │ │ +│ │ {"namespace":"row"} ┆ is_sorted: true ┆ is_sorted: true ┆ component_type: example.MyIndex ┆ component: example.MyPoints:colors │ │ +│ │ ARROW:extension:name: TUID ┆ kind: index ┆ kind: index ┆ kind: data ┆ component_type: example.MyColor │ │ +│ │ is_sorted: true ┆ ┆ ┆ ┆ kind: data │ │ +│ │ kind: control ┆ ┆ ┆ ┆ │ │ +│ ╞══════════════════════════════════════════════╪══════════════════════╪═══════════════════════════════╪═════════════════════════════════╪════════════════════════════════════╡ │ +│ │ row_[**REDACTED**] ┆ 1 ┆ 2025-01-10T18:43:42.123456789 ┆ [0, 1, 2] ┆ [0, 1, 2] │ │ +│ └──────────────────────────────────────────────┴──────────────────────┴───────────────────────────────┴─────────────────────────────────┴────────────────────────────────────┘ │ +└────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘ diff --git a/crates/store/re_dataframe/src/query.rs b/crates/store/re_dataframe/src/query.rs index 964ce6ee80f8..8b6af6208331 100644 --- a/crates/store/re_dataframe/src/query.rs +++ b/crates/store/re_dataframe/src/query.rs @@ -1368,7 +1368,8 @@ mod tests { #[inline] fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let width = 200; - re_format_arrow::format_record_batch_with_width(&self.0, Some(width)).fmt(f) + re_format_arrow::format_record_batch_with_width(&self.0, Some(width), f.sign_minus()) + .fmt(f) } } diff --git a/crates/store/re_format_arrow/src/lib.rs b/crates/store/re_format_arrow/src/lib.rs index b3b49b740600..7633cd8f53c8 100644 --- a/crates/store/re_format_arrow/src/lib.rs +++ b/crates/store/re_format_arrow/src/lib.rs @@ -200,7 +200,7 @@ impl Default for RecordBatchFormatOpts { /// Nicely format this record batch in a way that fits the terminal. pub fn format_record_batch(batch: &arrow::array::RecordBatch) -> Table { - format_record_batch_with_width(batch, None) + format_record_batch_with_width(batch, None, false) } /// Nicely format this record batch using the specified options. @@ -223,6 +223,7 @@ pub fn format_record_batch_opts( pub fn format_record_batch_with_width( batch: &arrow::array::RecordBatch, width: Option, + redact_non_deterministic: bool, ) -> Table { format_dataframe_with_metadata( &batch.schema_ref().metadata.clone().into_iter().collect(), // HashMap -> BTreeMap @@ -236,7 +237,7 @@ pub fn format_record_batch_with_width( trim_field_names: true, trim_metadata_keys: true, trim_metadata_values: true, - redact_non_deterministic: false, + redact_non_deterministic, }, ) } diff --git a/crates/store/re_sorbet/src/chunk_batch.rs b/crates/store/re_sorbet/src/chunk_batch.rs index b394485928b9..d653b52debde 100644 --- a/crates/store/re_sorbet/src/chunk_batch.rs +++ b/crates/store/re_sorbet/src/chunk_batch.rs @@ -114,7 +114,7 @@ impl ChunkBatch { impl std::fmt::Display for ChunkBatch { #[inline] fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - re_format_arrow::format_record_batch_with_width(self, f.width()).fmt(f) + re_format_arrow::format_record_batch_with_width(self, f.width(), f.sign_minus()).fmt(f) } } diff --git a/crates/store/re_sorbet/src/sorbet_batch.rs b/crates/store/re_sorbet/src/sorbet_batch.rs index ca0e976b2c92..977cb27d8841 100644 --- a/crates/store/re_sorbet/src/sorbet_batch.rs +++ b/crates/store/re_sorbet/src/sorbet_batch.rs @@ -129,7 +129,7 @@ impl SorbetBatch { impl std::fmt::Display for SorbetBatch { #[inline] fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - re_format_arrow::format_record_batch_with_width(self, f.width()).fmt(f) + re_format_arrow::format_record_batch_with_width(self, f.width(), f.sign_minus()).fmt(f) } } diff --git a/crates/top/re_sdk/src/lenses.rs b/crates/top/re_sdk/src/lenses.rs new file mode 100644 index 000000000000..9cac80503f62 --- /dev/null +++ b/crates/top/re_sdk/src/lenses.rs @@ -0,0 +1,631 @@ +use re_chunk::{ + Chunk, ChunkComponents, ChunkId, ComponentIdentifier, EntityPath, + external::arrow::array::ListArray, +}; +use re_log_types::{EntityPathFilter, LogMsg, ResolvedEntityPathFilter}; +use re_types::SerializedComponentColumn; + +use crate::sink::LogSink; + +/// A sink which can transform a `LogMsg` and forward the result to an underlying backing `LogSink`. +/// +/// The sink will only forward components that are matched by a lens specified via [`Self::with_lens`]. +pub struct LensesSink { + sink: S, + registry: LensRegistry, +} + +impl LensesSink { + /// Create a new sink with the given lenses. + pub fn new(sink: S) -> Self { + Self { + sink, + registry: Default::default(), + } + } + + /// Adds a [`Lens`] to this sink. + pub fn with_lens(mut self, lens: Lens) -> Self { + self.registry.lenses.push(lens); + self + } +} + +impl LogSink for LensesSink { + fn send(&self, msg: re_log_types::LogMsg) { + match &msg { + LogMsg::SetStoreInfo(_) | LogMsg::BlueprintActivationCommand(_) => { + self.sink.send(msg); + } + LogMsg::ArrowMsg(store_id, arrow_msg) => match Chunk::from_arrow_msg(arrow_msg) { + Ok(chunk) => { + let new_chunks = self.registry.apply(&chunk); + // TODO(grtlr): Should we use `self.sink.send_all` here? + for new_chunk in new_chunks { + match new_chunk.to_arrow_msg() { + Ok(arrow_msg) => { + self.sink + .send(LogMsg::ArrowMsg(store_id.clone(), arrow_msg)); + } + Err(err) => { + re_log::error_once!( + "failed to create log message from chunk: {err}" + ); + } + } + } + } + + Err(err) => { + re_log::error_once!("Failed to convert arrow message to chunk: {err}"); + self.sink.send(msg); + } + }, + } + } + + fn flush_blocking( + &self, + timeout: std::time::Duration, + ) -> Result<(), crate::sink::SinkFlushError> { + self.sink.flush_blocking(timeout) + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } +} + +/// A transformed column result from applying a lens operation. +/// +/// Contains the output of a lens transformation, including the new entity path, +/// the serialized component data, and whether the data should be treated as static. +pub struct TransformedColumn { + /// The entity path where this transformed column should be logged. + pub entity_path: EntityPath, + /// The serialized component column containing the transformed data. + pub column: SerializedComponentColumn, + /// Whether this column represents static data. + pub is_static: bool, +} + +impl TransformedColumn { + /// Creates a new transformed column. + pub fn new(entity_path: EntityPath, column: SerializedComponentColumn) -> Self { + Self { + entity_path, + column, + is_static: false, + } + } + + /// Creates a new static transformed column. + pub fn new_static(entity_path: EntityPath, column: SerializedComponentColumn) -> Self { + Self { + entity_path, + column, + is_static: true, + } + } +} + +type LensFunc = Box Vec + Send + Sync>; + +/// A lens that transforms component data from one form to another. +/// +/// Lenses allow you to extract, transform, and restructure component data +/// as it flows through the logging pipeline. They are applied to chunks +/// that match the specified entity path filter and contain the target component. +pub struct Lens { + /// The entity path to apply the transformation to. + pub filter: ResolvedEntityPathFilter, + + /// The component that we want to select. + pub component: ComponentIdentifier, + + /// A closure that outputs a list of chunks + pub func: LensFunc, +} + +#[derive(Default)] +struct LensRegistry { + lenses: Vec, +} + +impl LensRegistry { + fn relevant(&self, chunk: &Chunk) -> impl Iterator { + self.lenses + .iter() + .filter(|transform| transform.filter.matches(chunk.entity_path())) + } + + /// Applies all relevant lenses to a chunk and returns the transformed chunks. + /// + /// This will only transform component columns that match registered lenses. + /// Other component columns are dropped. To retain original data, use identity + /// lenses or multi-sink configurations. + pub fn apply(&self, chunk: &Chunk) -> Vec { + self.relevant(chunk) + .flat_map(|transform| transform.apply(chunk)) + .collect() + } +} + +impl Lens { + /// Creates a new lens with the specified filter, component, and transformation function. + /// + /// # Arguments + /// * `entity_path_filter` - Filter to match entity paths this lens should apply to + /// * `component` - The component identifier to transform + /// * `func` - Transformation function that takes a ListArray and EntityPath and returns transformed columns + pub fn new( + entity_path_filter: EntityPathFilter, + component: impl Into, + func: F, + ) -> Self + where + F: Fn(ListArray, &EntityPath) -> Vec + Send + Sync + 'static, + { + Self { + filter: entity_path_filter.resolve_without_substitutions(), + component: component.into(), + func: Box::new(func), + } + } + + fn apply(&self, chunk: &Chunk) -> Vec { + let found = chunk + .components() + .iter() + .find(|(descr, _array)| descr.component == self.component); + + // TODO: This means we drop chunks that belong to the same entity but don't have the component. + let Some((_component_descr, list_array)) = found else { + return Default::default(); + }; + + // TODO: + // * unwrap array + // * Guarantee that there is only one component descr + let mut builders = ahash::HashMap::default(); + let results = (self.func)(list_array.clone(), chunk.entity_path()); + for transformed in results { + let components = builders + .entry((transformed.entity_path, transformed.is_static)) + .or_insert_with(ChunkComponents::default); + + if components.contains_component(&transformed.column.descriptor) { + re_log::warn_once!( + "Replacing duplicated component {}", + transformed.column.descriptor.component + ); + } + + components.insert(transformed.column.descriptor, transformed.column.list_array); + } + + builders + .into_iter() + .filter_map(|((entity_path, is_static), components)| { + let timelines = if is_static { + Default::default() + } else { + chunk.timelines().clone() + }; + + // TODO: In case of static, should we use sparse rows instead? + Chunk::from_auto_row_ids(ChunkId::new(), entity_path.clone(), timelines, components) + .inspect_err(|err| { + re_log::error_once!( + "Failed to build chunk at entity path '{entity_path}': {err}" + ); + }) + .ok() + }) + .collect() + } +} + +/// Provides commonly used transformations of Arrow arrays. +/// +/// # Experimental +/// +/// This is an experimental API and may change in future releases. +pub mod op { + + // TODO(grtlr): Make this into proper objects, with APIs similar to Datafusion's UDFs. + + use std::sync::Arc; + + use re_chunk::external::arrow::{ + array::{ListArray, StructArray}, + compute, + datatypes::{DataType, Field}, + }; + + /// Extracts a specific field from a struct component within a ListArray. + /// + /// Takes a ListArray containing StructArrays and extracts the specified field, + /// returning a new ListArray containing only that field's data. + /// Returns an empty ListArray if the extraction fails. + pub fn extract_field(list_array: ListArray, column_name: &str) -> ListArray { + let (field, offsets, values, nulls) = list_array.into_parts(); + let struct_array = match values.as_any().downcast_ref::() { + Some(array) => array, + None => { + re_log::error_once!("Expected StructArray in ListArray, but found different type"); + return ListArray::new_null(field, offsets.len() - 1); + } + }; + let column = match struct_array.column_by_name(column_name) { + Some(col) => col, + None => { + re_log::error_once!("Field '{}' not found in struct", column_name); + return ListArray::new_null(field, offsets.len() - 1); + } + }; + ListArray::new( + Arc::new(Field::new_list_field(column.data_type().clone(), true)), + offsets, + column.clone(), + nulls, + ) + } + + /// Casts the inner array of a ListArray to a different data type. + /// + /// Performs type casting on the component data within the ListArray, + /// preserving the list structure while changing the inner data type. + /// Returns an empty ListArray if the cast fails. + pub fn cast_component_batch(list_array: ListArray, to_inner_type: &DataType) -> ListArray { + let (field, offsets, ref array, nulls) = list_array.into_parts(); + let res = match compute::cast(array, to_inner_type) { + Ok(casted) => casted, + Err(err) => { + re_log::error_once!("Failed to cast array to {:?}: {}", to_inner_type, err); + return ListArray::new_null(field, offsets.len() - 1); + } + }; + ListArray::new( + Arc::new(Field::new_list_field(res.data_type().clone(), true)), + offsets, + res, + nulls, + ) + } +} +#[cfg(test)] +mod test { + use std::sync::Arc; + + use re_chunk::{ + TimeColumn, TimelineName, + external::arrow::{ + array::{ + Float32Builder, Float64Builder, Int32Builder, ListBuilder, StringBuilder, + StructBuilder, + }, + datatypes::{DataType, Field}, + }, + }; + use re_types::{ComponentDescriptor, archetypes::Scalars}; + + use super::*; + + /// Creates a chunk that contains all sorts of validity, nullability, and empty lists. + // ┌──────────────┬───────────┐ + // │ [{a:0,b:0}] │ ["zero"] │ + // ├──────────────┼───────────┤ + // │[{a:1,b:null}]│["one","1"]│ + // ├──────────────┼───────────┤ + // │ [] │ [] │ + // ├──────────────┼───────────┤ + // │ null │ ["three"] │ + // ├──────────────┼───────────┤ + // │ [{a:4,b:4}] │ null │ + // ├──────────────┼───────────┤ + // │ [null] │ ["five"] │ + // ├──────────────┼───────────┤ + // │ [{a:6,b:6}] │ [null] │ + // └──────────────┴───────────┘ + fn nullability_chunk() -> Chunk { + let mut struct_column_builder = ListBuilder::new(StructBuilder::new( + [ + Arc::new(Field::new("a", DataType::Float32, true)), + Arc::new(Field::new("b", DataType::Float64, true)), + ], + vec![ + Box::new(Float32Builder::new()), + Box::new(Float64Builder::new()), + ], + )); + let mut string_column_builder = ListBuilder::new(StringBuilder::new()); + + // row 0 + struct_column_builder + .values() + .field_builder::(0) + .unwrap() + .append_value(0.0); + struct_column_builder + .values() + .field_builder::(1) + .unwrap() + .append_value(0.0); + struct_column_builder.values().append(true); + struct_column_builder.append(true); + + string_column_builder.values().append_value("zero"); + string_column_builder.append(true); + + // row 1 + struct_column_builder + .values() + .field_builder::(0) + .unwrap() + .append_value(1.0); + struct_column_builder + .values() + .field_builder::(1) + .unwrap() + .append_null(); + struct_column_builder.values().append(true); + struct_column_builder.append(true); + + string_column_builder.values().append_value("one"); + string_column_builder.values().append_value("1"); + string_column_builder.append(true); + + // row 2 + struct_column_builder.append(true); // empty list + + string_column_builder.append(true); // empty list + + // row 3 + struct_column_builder.append(false); // null + + string_column_builder.values().append_value("three"); + string_column_builder.append(true); + + // row 4 + struct_column_builder + .values() + .field_builder::(0) + .unwrap() + .append_value(4.0); + struct_column_builder + .values() + .field_builder::(1) + .unwrap() + .append_value(4.0); + struct_column_builder.values().append(true); + struct_column_builder.append(true); + + string_column_builder.append(false); // null + + // row 5 + struct_column_builder + .values() + .field_builder::(0) + .unwrap() + .append_null(); // placeholder for null struct + struct_column_builder + .values() + .field_builder::(1) + .unwrap() + .append_null(); // placeholder for null struct + struct_column_builder.values().append(false); // null struct element + struct_column_builder.append(true); + + string_column_builder.values().append_value("five"); + string_column_builder.append(true); + + // row 6 + struct_column_builder + .values() + .field_builder::(0) + .unwrap() + .append_value(6.0); + struct_column_builder + .values() + .field_builder::(1) + .unwrap() + .append_value(6.0); + struct_column_builder.values().append(true); + struct_column_builder.append(true); + + string_column_builder.values().append_null(); + string_column_builder.append(true); + + let struct_column = struct_column_builder.finish(); + let string_column = string_column_builder.finish(); + + let components = [ + (ComponentDescriptor::partial("structs"), struct_column), + (ComponentDescriptor::partial("strings"), string_column), + ] + .into_iter(); + + let time_column = TimeColumn::new_sequence("tick", [0, 1, 2, 3, 4, 5, 6]); + + Chunk::from_auto_row_ids( + ChunkId::new(), + "nullability".into(), + std::iter::once((TimelineName::new("tick"), time_column)).collect(), + components.collect(), + ) + .unwrap() + } + + #[test] + fn test_destructure_cast() { + let original_chunk = nullability_chunk(); + println!("{original_chunk}"); + + let destructure = Lens::new( + "nullability".parse().unwrap(), + "structs", + |list_array, entity_path| { + let list_array = op::extract_field(list_array, "a"); + let list_array = op::cast_component_batch(list_array, &DataType::Float64); + + vec![TransformedColumn::new( + entity_path.join(&EntityPath::parse_forgiving("a")), + SerializedComponentColumn { + list_array, + descriptor: Scalars::descriptor_scalars(), + }, + )] + }, + ); + + let pipeline = LensRegistry { + lenses: vec![destructure], + }; + + let res = pipeline.apply(&original_chunk); + assert_eq!(res.len(), 1); + + let chunk = &res[0]; + insta::assert_snapshot!("destructure_cast", format!("{chunk:-240}")); + } + + #[test] + fn test_destructure() { + let original_chunk = nullability_chunk(); + println!("{original_chunk}"); + + let destructure = Lens::new( + "nullability".parse().unwrap(), + "structs", + |list_array, entity_path| { + let list_array = op::extract_field(list_array, "b"); + + vec![TransformedColumn::new( + entity_path.join(&EntityPath::parse_forgiving("b")), + SerializedComponentColumn { + list_array, + descriptor: Scalars::descriptor_scalars(), + }, + )] + }, + ); + + let pipeline = LensRegistry { + lenses: vec![destructure], + }; + + let res = pipeline.apply(&original_chunk); + assert_eq!(res.len(), 1); + + let chunk = &res[0]; + insta::assert_snapshot!("destructure_only", format!("{chunk:-240}")); + } + + #[test] + fn test_inner_count() { + let original_chunk = nullability_chunk(); + println!("{original_chunk}"); + + let count = Lens::new( + "nullability".parse().unwrap(), + "strings", + |list_array, entity_path| { + // We keep the original `list_array` around for better comparability. + let original_list_array = list_array.clone(); + let mut builder = ListBuilder::new(Int32Builder::new()); + + for maybe_array in list_array.iter() { + match maybe_array { + None => builder.append_null(), + Some(component_batch_array) => { + builder + .values() + .append_value(component_batch_array.len() as i32); + builder.append(true); + } + } + } + + let list_array = builder.finish(); + + vec![ + TransformedColumn::new( + entity_path.join(&EntityPath::parse_forgiving("b_count")), + SerializedComponentColumn { + list_array, + descriptor: ComponentDescriptor::partial("counts"), + }, + ), + TransformedColumn::new( + entity_path.join(&EntityPath::parse_forgiving("b_count")), + SerializedComponentColumn { + list_array: original_list_array, + descriptor: ComponentDescriptor::partial("original"), + }, + ), + ] + }, + ); + + let pipeline = LensRegistry { + lenses: vec![count], + }; + + let res = pipeline.apply(&original_chunk); + assert_eq!(res.len(), 1); + + let chunk = &res[0]; + insta::assert_snapshot!("inner_count", format!("{chunk:-240}")); + } + + #[test] + fn test_static_chunk_creation() { + let original_chunk = nullability_chunk(); + + let static_lens_a = Lens::new( + "nullability".parse().unwrap(), + "strings", + |_, entity_path| { + let mut metadata_builder_a = ListBuilder::new(StringBuilder::new()); + metadata_builder_a + .values() + .append_value("static_metadata_a"); + metadata_builder_a.append(true); + + let mut metadata_builder_b = ListBuilder::new(StringBuilder::new()); + metadata_builder_b + .values() + .append_value("static_metadata_b"); + metadata_builder_b.append(true); + + vec![ + TransformedColumn::new_static( + entity_path.join(&EntityPath::parse_forgiving("static")), + SerializedComponentColumn { + list_array: metadata_builder_a.finish(), + descriptor: ComponentDescriptor::partial("static_metadata_a"), + }, + ), + TransformedColumn::new_static( + entity_path.join(&EntityPath::parse_forgiving("static")), + SerializedComponentColumn { + list_array: metadata_builder_b.finish(), + descriptor: ComponentDescriptor::partial("static_metadata_b"), + }, + ), + ] + }, + ); + + let pipeline = LensRegistry { + lenses: vec![static_lens_a], + }; + + let res = pipeline.apply(&original_chunk); + assert_eq!(res.len(), 1); + + let chunk = &res[0]; + insta::assert_snapshot!("single_static", format!("{chunk:-240}")); + } +} diff --git a/crates/top/re_sdk/src/lib.rs b/crates/top/re_sdk/src/lib.rs index cb77eaf1bd1a..7dd064978f3d 100644 --- a/crates/top/re_sdk/src/lib.rs +++ b/crates/top/re_sdk/src/lib.rs @@ -116,6 +116,13 @@ pub use re_types::{ SerializedComponentColumn, }; +/// Transformation and reinterpretation of components. +/// +/// # Experimental +/// +/// This is an experimental API and may change in future releases. +pub mod lenses; + pub use re_byte_size::SizeBytes; #[cfg(feature = "data_loaders")] diff --git a/crates/top/re_sdk/src/snapshots/re_sdk__lenses__test__destructure_cast.snap b/crates/top/re_sdk/src/snapshots/re_sdk__lenses__test__destructure_cast.snap new file mode 100644 index 000000000000..11ada273145a --- /dev/null +++ b/crates/top/re_sdk/src/snapshots/re_sdk__lenses__test__destructure_cast.snap @@ -0,0 +1,35 @@ +--- +source: crates/top/re_sdk/src/lenses.rs +expression: "format!(\"{chunk:-240}\")" +--- +┌───────────────────────────────────────────────────────────────────────────────────────────────────┐ +│ METADATA: │ +│ * entity_path: /nullability/a │ +│ * heap_size_bytes: [**REDACTED**] │ +│ * id: [**REDACTED**] │ +│ * version: [**REDACTED**] │ +├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ +│ ┌───────────────────────────────────────────────┬──────────────────┬────────────────────────────┐ │ +│ │ RowId ┆ tick ┆ Scalars:scalars │ │ +│ │ --- ┆ --- ┆ --- │ │ +│ │ type: FixedSizeBinary[16] ┆ type: i64 ┆ type: List[nullable f64] │ │ +│ │ ARROW:extension:metadata: {"namespace":"row"} ┆ index_name: tick ┆ archetype: Scalars │ │ +│ │ ARROW:extension:name: TUID ┆ is_sorted: true ┆ component: Scalars:scalars │ │ +│ │ is_sorted: true ┆ kind: index ┆ component_type: Scalar │ │ +│ │ kind: control ┆ ┆ kind: data │ │ +│ ╞═══════════════════════════════════════════════╪══════════════════╪════════════════════════════╡ │ +│ │ row_[**REDACTED**] ┆ 0 ┆ [0.0] │ │ +│ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ +│ │ row_[**REDACTED**] ┆ 1 ┆ [1.0] │ │ +│ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ +│ │ row_[**REDACTED**] ┆ 2 ┆ [] │ │ +│ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ +│ │ row_[**REDACTED**] ┆ 3 ┆ null │ │ +│ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ +│ │ row_[**REDACTED**] ┆ 4 ┆ [4.0] │ │ +│ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ +│ │ row_[**REDACTED**] ┆ 5 ┆ [null] │ │ +│ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ +│ │ row_[**REDACTED**] ┆ 6 ┆ [6.0] │ │ +│ └───────────────────────────────────────────────┴──────────────────┴────────────────────────────┘ │ +└───────────────────────────────────────────────────────────────────────────────────────────────────┘ diff --git a/crates/top/re_sdk/src/snapshots/re_sdk__lenses__test__destructure_only.snap b/crates/top/re_sdk/src/snapshots/re_sdk__lenses__test__destructure_only.snap new file mode 100644 index 000000000000..69225f41bd13 --- /dev/null +++ b/crates/top/re_sdk/src/snapshots/re_sdk__lenses__test__destructure_only.snap @@ -0,0 +1,35 @@ +--- +source: crates/top/re_sdk/src/lenses.rs +expression: "format!(\"{chunk:-240}\")" +--- +┌───────────────────────────────────────────────────────────────────────────────────────────────────┐ +│ METADATA: │ +│ * entity_path: /nullability/b │ +│ * heap_size_bytes: [**REDACTED**] │ +│ * id: [**REDACTED**] │ +│ * version: [**REDACTED**] │ +├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ +│ ┌───────────────────────────────────────────────┬──────────────────┬────────────────────────────┐ │ +│ │ RowId ┆ tick ┆ Scalars:scalars │ │ +│ │ --- ┆ --- ┆ --- │ │ +│ │ type: FixedSizeBinary[16] ┆ type: i64 ┆ type: List[nullable f64] │ │ +│ │ ARROW:extension:metadata: {"namespace":"row"} ┆ index_name: tick ┆ archetype: Scalars │ │ +│ │ ARROW:extension:name: TUID ┆ is_sorted: true ┆ component: Scalars:scalars │ │ +│ │ is_sorted: true ┆ kind: index ┆ component_type: Scalar │ │ +│ │ kind: control ┆ ┆ kind: data │ │ +│ ╞═══════════════════════════════════════════════╪══════════════════╪════════════════════════════╡ │ +│ │ row_[**REDACTED**] ┆ 0 ┆ [0.0] │ │ +│ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ +│ │ row_[**REDACTED**] ┆ 1 ┆ [null] │ │ +│ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ +│ │ row_[**REDACTED**] ┆ 2 ┆ [] │ │ +│ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ +│ │ row_[**REDACTED**] ┆ 3 ┆ null │ │ +│ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ +│ │ row_[**REDACTED**] ┆ 4 ┆ [4.0] │ │ +│ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ +│ │ row_[**REDACTED**] ┆ 5 ┆ [null] │ │ +│ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ +│ │ row_[**REDACTED**] ┆ 6 ┆ [6.0] │ │ +│ └───────────────────────────────────────────────┴──────────────────┴────────────────────────────┘ │ +└───────────────────────────────────────────────────────────────────────────────────────────────────┘ diff --git a/crates/top/re_sdk/src/snapshots/re_sdk__lenses__test__inner_count.snap b/crates/top/re_sdk/src/snapshots/re_sdk__lenses__test__inner_count.snap new file mode 100644 index 000000000000..3e9203ab9d43 --- /dev/null +++ b/crates/top/re_sdk/src/snapshots/re_sdk__lenses__test__inner_count.snap @@ -0,0 +1,35 @@ +--- +source: crates/top/re_sdk/src/lenses.rs +expression: "format!(\"{chunk:-240}\")" +--- +┌─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ +│ METADATA: │ +│ * entity_path: /nullability/b_count │ +│ * heap_size_bytes: [**REDACTED**] │ +│ * id: [**REDACTED**] │ +│ * version: [**REDACTED**] │ +├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ +│ ┌───────────────────────────────────────────────┬──────────────────┬──────────────────────────┬───────────────────────────┐ │ +│ │ RowId ┆ tick ┆ counts ┆ original │ │ +│ │ --- ┆ --- ┆ --- ┆ --- │ │ +│ │ type: FixedSizeBinary[16] ┆ type: i64 ┆ type: List[nullable i32] ┆ type: List[nullable Utf8] │ │ +│ │ ARROW:extension:metadata: {"namespace":"row"} ┆ index_name: tick ┆ component: counts ┆ component: original │ │ +│ │ ARROW:extension:name: TUID ┆ is_sorted: true ┆ kind: data ┆ kind: data │ │ +│ │ is_sorted: true ┆ kind: index ┆ ┆ │ │ +│ │ kind: control ┆ ┆ ┆ │ │ +│ ╞═══════════════════════════════════════════════╪══════════════════╪══════════════════════════╪═══════════════════════════╡ │ +│ │ row_[**REDACTED**] ┆ 0 ┆ [1] ┆ [zero] │ │ +│ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ +│ │ row_[**REDACTED**] ┆ 1 ┆ [2] ┆ [one, 1] │ │ +│ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ +│ │ row_[**REDACTED**] ┆ 2 ┆ [0] ┆ [] │ │ +│ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ +│ │ row_[**REDACTED**] ┆ 3 ┆ [1] ┆ [three] │ │ +│ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ +│ │ row_[**REDACTED**] ┆ 4 ┆ null ┆ null │ │ +│ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ +│ │ row_[**REDACTED**] ┆ 5 ┆ [1] ┆ [five] │ │ +│ ├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ │ +│ │ row_[**REDACTED**] ┆ 6 ┆ [1] ┆ [null] │ │ +│ └───────────────────────────────────────────────┴──────────────────┴──────────────────────────┴───────────────────────────┘ │ +└─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘ diff --git a/crates/top/re_sdk/src/snapshots/re_sdk__lenses__test__single_static.snap b/crates/top/re_sdk/src/snapshots/re_sdk__lenses__test__single_static.snap new file mode 100644 index 000000000000..0cd38c20a176 --- /dev/null +++ b/crates/top/re_sdk/src/snapshots/re_sdk__lenses__test__single_static.snap @@ -0,0 +1,23 @@ +--- +source: crates/top/re_sdk/src/lenses.rs +expression: "format!(\"{chunk:-240}\")" +--- +┌─────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ +│ METADATA: │ +│ * entity_path: /nullability/static │ +│ * heap_size_bytes: [**REDACTED**] │ +│ * id: [**REDACTED**] │ +│ * version: [**REDACTED**] │ +├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤ +│ ┌───────────────────────────────────────────────┬──────────────────────────────┬──────────────────────────────┐ │ +│ │ RowId ┆ static_metadata_a ┆ static_metadata_b │ │ +│ │ --- ┆ --- ┆ --- │ │ +│ │ type: FixedSizeBinary[16] ┆ type: List[nullable Utf8] ┆ type: List[nullable Utf8] │ │ +│ │ ARROW:extension:metadata: {"namespace":"row"} ┆ component: static_metadata_a ┆ component: static_metadata_b │ │ +│ │ ARROW:extension:name: TUID ┆ kind: data ┆ kind: data │ │ +│ │ is_sorted: true ┆ ┆ │ │ +│ │ kind: control ┆ ┆ │ │ +│ ╞═══════════════════════════════════════════════╪══════════════════════════════╪══════════════════════════════╡ │ +│ │ row_[**REDACTED**] ┆ [static_metadata_a] ┆ [static_metadata_b] │ │ +│ └───────────────────────────────────────────────┴──────────────────────────────┴──────────────────────────────┘ │ +└─────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘ diff --git a/crates/utils/re_mcap/src/layers/protobuf.rs b/crates/utils/re_mcap/src/layers/protobuf.rs index 3749e62f5360..064a54666fdd 100644 --- a/crates/utils/re_mcap/src/layers/protobuf.rs +++ b/crates/utils/re_mcap/src/layers/protobuf.rs @@ -547,32 +547,6 @@ mod test { chunks } - /// Wrapper to help with creating nicely formatted chunks to use with `insta`. - struct ChunkRedacted<'a>(&'a Chunk); - - impl std::fmt::Display for ChunkRedacted<'_> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let batch = self.0.to_record_batch().map_err(|err| { - re_log::error_once!("couldn't display Chunk: {err}"); - std::fmt::Error - })?; - re_format_arrow::format_record_batch_opts( - &batch, - &re_format_arrow::RecordBatchFormatOpts { - transposed: false, - width: f.width(), - include_metadata: true, - include_column_metadata: true, - trim_field_names: true, - trim_metadata_keys: true, - trim_metadata_values: true, - redact_non_deterministic: true, - }, - ) - .fmt(f) - } - } - #[test] fn two_simple_rows() { // Writing to the MCAP buffer. @@ -615,9 +589,6 @@ mod test { let chunks = run_layer(&summary, buffer.as_slice()); assert_eq!(chunks.len(), 1); - insta::assert_snapshot!( - "two_simple_rows", - format!("{:240}", ChunkRedacted(&chunks[0])) - ); + insta::assert_snapshot!("two_simple_rows", format!("{:-240}", &chunks[0])); } } diff --git a/examples/rust/lenses/Cargo.toml b/examples/rust/lenses/Cargo.toml new file mode 100644 index 000000000000..8a6772e99816 --- /dev/null +++ b/examples/rust/lenses/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "lenses" +version = "0.26.0-alpha.1+dev" +edition = "2024" +rust-version = "1.88" +license = "MIT OR Apache-2.0" +publish = false + +[dependencies] +rerun = { path = "../../../crates/top/rerun", features = [ + "web_viewer", + "clap", +] } + +anyhow = "1.0" +arrow.workspace = true +clap = { version = "4.0", features = ["derive"] } + +[dev-dependencies] +insta.workspace = true diff --git a/examples/rust/lenses/README.md b/examples/rust/lenses/README.md new file mode 100644 index 000000000000..efeb98aa3bf4 --- /dev/null +++ b/examples/rust/lenses/README.md @@ -0,0 +1,10 @@ + + + + +Demonstrates how to transform log messages before forwarding them to the sink the SDK. +```bash +cargo run -p lenses +``` diff --git a/examples/rust/lenses/src/main.rs b/examples/rust/lenses/src/main.rs new file mode 100644 index 000000000000..ccc99d004795 --- /dev/null +++ b/examples/rust/lenses/src/main.rs @@ -0,0 +1,192 @@ +use std::sync::Arc; + +use arrow::{ + array::{Array, Float32Array, Float64Array, ListArray, StringArray, StructArray}, + datatypes::{DataType, Field}, +}; +use rerun::{ + DynamicArchetype, EntityPath, RecordingStream, Scalars, SerializedComponentColumn, SeriesLines, + SeriesPoints, TextDocument, TimeCell, + external::re_log, + lenses::{Lens, LensesSink, TransformedColumn, op}, + sink::GrpcSink, +}; + +fn lens_instruction() -> anyhow::Result { + Ok(Lens::new( + "/instructions".parse()?, + "com.Example.Instruction:text", + |array, entity_path| { + vec![TransformedColumn { + entity_path: entity_path.clone(), + column: SerializedComponentColumn { + descriptor: TextDocument::descriptor_text(), + list_array: array, + }, + is_static: false, + }] + }, + )) +} + +fn lens_destructure() -> anyhow::Result { + Ok(Lens::new( + "/nested".parse().unwrap(), + "com.Example.Nested:payload", + |array, entity_path| { + let list_array_a = op::extract_field(array.clone(), "a"); + let list_array_a = op::cast_component_batch(list_array_a, &DataType::Float64); + + let list_array_b = op::extract_field(array, "b"); + + vec![ + TransformedColumn::new( + entity_path.join(&EntityPath::parse_forgiving("a")), + SerializedComponentColumn { + descriptor: Scalars::descriptor_scalars(), + list_array: list_array_a, + }, + ), + TransformedColumn::new( + entity_path.join(&EntityPath::parse_forgiving("b")), + SerializedComponentColumn { + descriptor: Scalars::descriptor_scalars(), + list_array: list_array_b, + }, + ), + ] + }, + )) +} + +fn lens_flag() -> anyhow::Result { + Ok(Lens::new( + "/flag".parse()?, + "com.Example.Flag:flag", + |list_array, entity_path| { + let (_, offsets, values, nulls) = list_array.into_parts(); + let flag_array = values.as_any().downcast_ref::().unwrap(); + + let scalar_array: Float64Array = flag_array + .iter() + .map(|s| { + s.map(|v| match v { + "ACTIVE" => 1.0, + "INACTIVE" => 2.0, + _ => 0.0, + }) + }) + .collect(); + + let list_array = ListArray::new( + Arc::new(Field::new_list_field( + scalar_array.data_type().clone(), + true, + )), + offsets, + Arc::new(scalar_array), + nulls, + ); + + let series_points = SeriesPoints::new() + .with_marker_sizes([5.0]) + .columns_of_unit_batches() + .unwrap() + .next() + .unwrap(); + + let series_lines = SeriesLines::new() + .with_widths([3.0]) + .columns_of_unit_batches() + .unwrap() + .next() + .unwrap(); + + vec![ + TransformedColumn::new( + entity_path.clone(), + SerializedComponentColumn { + list_array, + descriptor: Scalars::descriptor_scalars(), + }, + ), + TransformedColumn::new_static(entity_path.clone(), series_points), + TransformedColumn::new_static(entity_path.clone(), series_lines), + ] + }, + )) +} + +fn main() -> anyhow::Result<()> { + re_log::setup_logging(); + + let lenses_sink = LensesSink::new(GrpcSink::default()) + .with_lens(lens_instruction()?) + .with_lens(lens_destructure()?) + .with_lens(lens_flag()?); + + let rec = rerun::RecordingStreamBuilder::new("rerun_example_lenses").spawn()?; + rec.set_sink(Box::new(lenses_sink)); + + log_instructions(&rec)?; + log_structs_with_scalars(&rec)?; + log_flag(&rec)?; + + Ok(()) +} + +fn log_flag(rec: &RecordingStream) -> anyhow::Result<()> { + let flags = ["ACTIVE", "ACTIVE", "INACTIVE", "UNKNOWN"]; + for x in 0..10i64 { + let flag = StringArray::from(vec![flags[x as usize % flags.len()]]); + rec.set_time("tick", TimeCell::from_sequence(x)); + rec.log( + "flag", + &DynamicArchetype::new("com.Example.Flag") + .with_component_from_data("flag", Arc::new(flag)), + )? + } + + Ok(()) +} + +fn log_instructions(rec: &RecordingStream) -> anyhow::Result<()> { + rec.set_time("tick", TimeCell::from_sequence(1)); + rec.log( + "instructions", + &DynamicArchetype::new("com.Example.Instruction").with_component_from_data( + "text", + Arc::new(arrow::array::StringArray::from(vec![ + "This is a nice instruction text.", + ])), + ), + )?; + + Ok(()) +} + +fn log_structs_with_scalars(rec: &RecordingStream) -> anyhow::Result<()> { + for x in 0..10i64 { + let a = Float32Array::from(vec![1.0 * x as f32, 2.0 + x as f32, 3.0 + x as f32]); + let b = Float64Array::from(vec![5.0 * x as f64, 6.0 + x as f64, 7.0 + x as f64]); + + let struct_array = StructArray::from(vec![ + ( + Arc::new(Field::new("a", DataType::Float32, false)), + Arc::new(a) as Arc, + ), + ( + Arc::new(Field::new("b", DataType::Float64, false)), + Arc::new(b) as Arc, + ), + ]); + rec.set_time("tick", TimeCell::from_sequence(x)); + rec.log( + "nested", + &DynamicArchetype::new("com.Example.Nested") + .with_component_from_data("payload", Arc::new(struct_array)), + )? + } + + Ok(()) +}