diff --git a/crates/store/re_sorbet/src/migrations/mod.rs b/crates/store/re_sorbet/src/migrations/mod.rs index 8590ab7769ac..b17209c6945f 100644 --- a/crates/store/re_sorbet/src/migrations/mod.rs +++ b/crates/store/re_sorbet/src/migrations/mod.rs @@ -22,6 +22,7 @@ mod make_list_arrays; mod v0_0_1__to__v0_0_2; mod v0_0_2__to__v0_1_0; mod v0_1_0__to__v0_1_1; +mod v0_1_1__to__v0_1_2; /// This trait needs to be implemented by any new migrations. It ensures that /// all migrations adhere to the same contract. @@ -128,6 +129,7 @@ pub fn migrate_record_batch(mut batch: RecordBatch) -> RecordBatch { batch = maybe_apply::(&batch_version, batch); batch = maybe_apply::(&batch_version, batch); batch = maybe_apply::(&batch_version, batch); + batch = maybe_apply::(&batch_version, batch); batch } } diff --git a/crates/store/re_sorbet/src/migrations/v0_1_1__to__v0_1_2.rs b/crates/store/re_sorbet/src/migrations/v0_1_1__to__v0_1_2.rs new file mode 100644 index 000000000000..afbf29df5bdd --- /dev/null +++ b/crates/store/re_sorbet/src/migrations/v0_1_1__to__v0_1_2.rs @@ -0,0 +1,151 @@ +//! Breaking changes: +//! * `Blob` is encoded as `Binary` instead of `List[u8]` +use std::sync::Arc; + +use arrow::{ + array::{ + Array, ArrayRef, AsArray as _, ListArray, RecordBatch, RecordBatchOptions, UInt8Array, + }, + datatypes::{DataType, Field, FieldRef, Schema}, +}; + +use re_log::ResultExt as _; + +pub struct Migration; + +impl super::Migration for Migration { + const SOURCE_VERSION: semver::Version = semver::Version::new(0, 1, 1); + const TARGET_VERSION: semver::Version = semver::Version::new(0, 1, 2); + + fn migrate(batch: RecordBatch) -> RecordBatch { + migrate_blobs(batch) + } +} + +/// Change datatype from `List[u8]` to `Binary` for blobs +fn migrate_blobs(batch: RecordBatch) -> RecordBatch { + re_tracing::profile_function!(); + + /// Is this a `List>` ? + fn is_list_list_u8(datatype: &DataType) -> bool { + if let DataType::List(list_field) = datatype + && let DataType::List(innermost_field) = list_field.data_type() + { + innermost_field.data_type() == &DataType::UInt8 + } else { + false + } + } + + fn is_blob_field(field: &Field) -> bool { + let components_with_blobs = [ + "rerun.components.Blob", + "rerun.components.ImageBuffer", + "rerun.components.VideoSample", + ]; + + if let Some(component_type) = field.metadata().get("rerun:component_type") + && components_with_blobs.contains(&component_type.as_str()) + { + is_list_list_u8(field.data_type()) + } else { + false + } + } + + let needs_migration = batch + .schema() + .fields() + .iter() + .any(|field| is_blob_field(field)); + + if !needs_migration { + return batch; + } + + let num_columns = batch.num_columns(); + let mut fields: Vec = Vec::with_capacity(num_columns); + let mut columns: Vec = Vec::with_capacity(num_columns); + + for (field, array) in itertools::izip!(batch.schema().fields(), batch.columns()) { + if is_blob_field(field) { + if let Some(new_array) = convert_list_list_u8_to_list_binary(array.as_ref()) { + let new_field = Field::new( + field.name(), + new_array.data_type().clone(), + field.is_nullable(), + ) + .with_metadata(field.metadata().clone()); + + fields.push(new_field.into()); + columns.push(Arc::new(new_array)); + + re_log::debug_once!( + "Changed datatype of '{}' from List[u8] to Binary", + field.name() + ); + continue; + } else { + re_log::warn_once!("Failed to convert {} to Binary", field.name()); + } + } + + fields.push(field.clone()); + columns.push(array.clone()); + } + + let schema = Arc::new(Schema::new_with_metadata( + fields, + batch.schema().metadata.clone(), + )); + + RecordBatch::try_new_with_options( + schema.clone(), + columns, + &RecordBatchOptions::default().with_row_count(Some(batch.num_rows())), + ) + .ok_or_log_error() + .unwrap_or_else(|| RecordBatch::new_empty(schema)) +} + +/// `List[List[u8]]` -> `List[Binary]` +fn convert_list_list_u8_to_list_binary(list_array: &dyn Array) -> Option { + re_tracing::profile_function!(); + + // The outer `List[List[u8]]` + let list_array = list_array.as_list_opt()?; + + // The inner List[u8] array + let inner_list_array: &ListArray = list_array.values().as_list_opt()?; + + // The underlying u8 values + let u8_array: &UInt8Array = inner_list_array.values().as_primitive_opt()?; + + // We consistently use 64-bit offsets for binary data in order to keep our backwards-compatibility checks simpler. + // Create the binary array reusing existing buffers + let binary_array = arrow::array::LargeBinaryArray::try_new( + arrow::buffer::OffsetBuffer::new( + inner_list_array + .offsets() + .iter() + .map(|&o| o as i64) + .collect(), + ), + u8_array.values().clone().into_inner(), + inner_list_array.nulls().cloned(), + ) + .ok()?; + + // Create the outer list array with binary inner type + let outer_list = ListArray::try_new( + Arc::new(Field::new("item", DataType::LargeBinary, true)), + list_array.offsets().clone(), + Arc::new(binary_array), + list_array.nulls().cloned(), + ) + .ok()?; + + debug_assert_eq!(list_array.len(), outer_list.len()); + + Some(outer_list) +} diff --git a/crates/store/re_sorbet/src/sorbet_schema.rs b/crates/store/re_sorbet/src/sorbet_schema.rs index eafd6d419de7..07f9316c4918 100644 --- a/crates/store/re_sorbet/src/sorbet_schema.rs +++ b/crates/store/re_sorbet/src/sorbet_schema.rs @@ -42,7 +42,7 @@ impl SorbetSchema { /// This is bumped everytime we require a migration, but notable it is /// decoupled from the Rerun version to avoid confusion as there will not /// be a new Sorbet version for each Rerun version. - pub(crate) const METADATA_VERSION: semver::Version = semver::Version::new(0, 1, 1); + pub(crate) const METADATA_VERSION: semver::Version = semver::Version::new(0, 1, 2); } impl SorbetSchema { diff --git a/crates/store/re_types/definitions/rerun/datatypes/blob.fbs b/crates/store/re_types/definitions/rerun/datatypes/blob.fbs index e3fa4f9bdb1c..d5334afdeb5c 100644 --- a/crates/store/re_types/definitions/rerun/datatypes/blob.fbs +++ b/crates/store/re_types/definitions/rerun/datatypes/blob.fbs @@ -13,5 +13,5 @@ table Blob ( "attr.rust.repr": "transparent", "attr.rust.tuple_struct" ) { - data: [ubyte] (order: 100); + data: [ubyte] (order: 100, "attr.rerun.override_type": "binary"); } diff --git a/crates/store/re_types/src/datatypes/blob.rs b/crates/store/re_types/src/datatypes/blob.rs index de7eba8815ac..5a5e0413d6df 100644 --- a/crates/store/re_types/src/datatypes/blob.rs +++ b/crates/store/re_types/src/datatypes/blob.rs @@ -24,7 +24,7 @@ use ::re_types_core::{DeserializationError, DeserializationResult}; /// Ref-counted internally and therefore cheap to clone. #[derive(Clone, Debug, PartialEq)] #[repr(transparent)] -pub struct Blob(pub ::arrow::buffer::ScalarBuffer); +pub struct Blob(pub ::arrow::buffer::Buffer); ::re_types_core::macros::impl_into_cow!(Blob); @@ -33,11 +33,7 @@ impl ::re_types_core::Loggable for Blob { fn arrow_datatype() -> arrow::datatypes::DataType { #![allow(clippy::wildcard_imports)] use arrow::datatypes::*; - DataType::List(std::sync::Arc::new(Field::new( - "item", - DataType::UInt8, - false, - ))) + DataType::LargeBinary } fn to_arrow_opt<'a>( @@ -64,28 +60,20 @@ impl ::re_types_core::Loggable for Blob { any_nones.then(|| somes.into()) }; { - let offsets = arrow::buffer::OffsetBuffer::::from_lengths( + let offsets = arrow::buffer::OffsetBuffer::from_lengths( data0 .iter() - .map(|opt| opt.as_ref().map_or(0, |datum| datum.len())), + .map(|opt| opt.as_ref().map(|datum| datum.len()).unwrap_or_default()), ); - let data0_inner_data: ScalarBuffer<_> = data0 - .iter() - .flatten() - .map(|b| b as &[_]) - .collect::>() - .concat() - .into(); - let data0_inner_validity: Option = None; - as_array_ref(ListArray::try_new( - std::sync::Arc::new(Field::new("item", DataType::UInt8, false)), - offsets, - as_array_ref(PrimitiveArray::::new( - data0_inner_data, - data0_inner_validity, - )), - data0_validity, - )?) + + #[allow(clippy::unwrap_used)] + let capacity = offsets.last().copied().unwrap() as usize; + let mut buffer_builder = arrow::array::builder::BufferBuilder::::new(capacity); + for data in data0.iter().flatten() { + buffer_builder.append_slice(data); + } + let inner_data: arrow::buffer::Buffer = buffer_builder.finish(); + as_array_ref(LargeBinaryArray::new(offsets, inner_data, data0_validity)) } }) } @@ -100,53 +88,52 @@ impl ::re_types_core::Loggable for Blob { use ::re_types_core::{arrow_zip_validity::ZipValidity, Loggable as _, ResultExt as _}; use arrow::{array::*, buffer::*, datatypes::*}; Ok({ - let arrow_data = arrow_data - .as_any() - .downcast_ref::() - .ok_or_else(|| { - let expected = Self::arrow_datatype(); - let actual = arrow_data.data_type().clone(); - DeserializationError::datatype_mismatch(expected, actual) - }) - .with_context("rerun.datatypes.Blob#data")?; - if arrow_data.is_empty() { - Vec::new() - } else { - let arrow_data_inner = { - let arrow_data_inner = &**arrow_data.values(); - arrow_data_inner - .as_any() - .downcast_ref::() - .ok_or_else(|| { - let expected = DataType::UInt8; - let actual = arrow_data_inner.data_type().clone(); - DeserializationError::datatype_mismatch(expected, actual) - }) - .with_context("rerun.datatypes.Blob#data")? - .values() - }; + fn extract_from_binary( + arrow_data: &arrow::array::GenericByteArray>, + ) -> DeserializationResult>> + where + O: ::arrow::array::OffsetSizeTrait, + { + use ::arrow::array::Array as _; + use ::re_types_core::arrow_zip_validity::ZipValidity; + let arrow_data_buf = arrow_data.values(); let offsets = arrow_data.offsets(); ZipValidity::new_with_validity(offsets.windows(2), arrow_data.nulls()) .map(|elem| { elem.map(|window| { - let start = window[0] as usize; - let end = window[1] as usize; - if arrow_data_inner.len() < end { + let start = window[0].as_usize(); + let end = window[1].as_usize(); + let len = end - start; + if arrow_data_buf.len() < end { return Err(DeserializationError::offset_slice_oob( (start, end), - arrow_data_inner.len(), + arrow_data_buf.len(), )); } #[allow(unsafe_code, clippy::undocumented_unsafe_blocks)] - let data = arrow_data_inner.clone().slice(start, end - start); + let data = arrow_data_buf.slice_with_length(start, len); Ok(data) }) .transpose() }) - .collect::>>>()? + .collect::>>>() + } + if let Some(arrow_data) = arrow_data.as_any().downcast_ref::() { + extract_from_binary(arrow_data) + .with_context("rerun.datatypes.Blob#data")? + .into_iter() + } else if let Some(arrow_data) = arrow_data.as_any().downcast_ref::() + { + extract_from_binary(arrow_data) + .with_context("rerun.datatypes.Blob#data")? + .into_iter() + } else { + let expected = Self::arrow_datatype(); + let actual = arrow_data.data_type().clone(); + return Err(DeserializationError::datatype_mismatch(expected, actual)) + .with_context("rerun.datatypes.Blob#data"); } - .into_iter() } .map(|v| v.ok_or_else(DeserializationError::missing_data)) .map(|res| res.map(|v| Some(Self(v)))) @@ -156,14 +143,14 @@ impl ::re_types_core::Loggable for Blob { } } -impl From<::arrow::buffer::ScalarBuffer> for Blob { +impl From<::arrow::buffer::Buffer> for Blob { #[inline] - fn from(data: ::arrow::buffer::ScalarBuffer) -> Self { + fn from(data: ::arrow::buffer::Buffer) -> Self { Self(data) } } -impl From for ::arrow::buffer::ScalarBuffer { +impl From for ::arrow::buffer::Buffer { #[inline] fn from(value: Blob) -> Self { value.0 @@ -178,6 +165,6 @@ impl ::re_byte_size::SizeBytes for Blob { #[inline] fn is_pod() -> bool { - <::arrow::buffer::ScalarBuffer>::is_pod() + <::arrow::buffer::Buffer>::is_pod() } } diff --git a/crates/store/re_types/src/datatypes/blob_ext.rs b/crates/store/re_types/src/datatypes/blob_ext.rs index 5e0e8e5f81b8..17d816fca4d8 100644 --- a/crates/store/re_types/src/datatypes/blob_ext.rs +++ b/crates/store/re_types/src/datatypes/blob_ext.rs @@ -1,3 +1,5 @@ +use arrow::{array::Array as _, buffer::ScalarBuffer}; + use super::Blob; impl Blob { @@ -10,32 +12,45 @@ impl Blob { /// Panics iff `offset + length` is larger than `len`. #[inline] pub fn sliced(self, range: std::ops::Range) -> Self { - self.0.slice(range.start, range.len()).into() + self.0.slice_with_length(range.start, range.len()).into() } /// Returns the bytes of a serialized blob batch without copying it. /// - /// Returns `None` if the serialized component batch didn't have the expected shape. + /// Returns `None` if the serialized component batch didn't have the expected type or shape. pub fn serialized_blob_as_slice( serialized_blob: &re_types_core::SerializedComponentBatch, ) -> Option<&[u8]> { - let blob_list_array = serialized_blob - .array - .as_any() - .downcast_ref::()?; - let blob_data = blob_list_array - .values() + Self::binary_array_as_slice(&serialized_blob.array) + } + + /// Returns the bytes of a serialized blob batch without copying it. + /// + /// Returns `None` if the serialized component batch didn't have the expected type or shape. + pub fn binary_array_as_slice(array: &std::sync::Arc) -> Option<&[u8]> { + if let Some(blob_data) = array.as_any().downcast_ref::() { + if blob_data.len() == 1 { + return Some(blob_data.value(0)); + } + } + + if let Some(blob_data) = array .as_any() - .downcast_ref::>()?; + .downcast_ref::() + { + if blob_data.len() == 1 { + return Some(blob_data.value(0)); + } + } - Some(blob_data.values().inner().as_slice()) + None } } impl Eq for Blob {} -impl From for Blob { - fn from(buffer: arrow::buffer::Buffer) -> Self { +impl From> for Blob { + fn from(buffer: ScalarBuffer) -> Self { Self(buffer.into()) } } @@ -53,10 +68,10 @@ impl From<&[u8]> for Blob { } impl std::ops::Deref for Blob { - type Target = arrow::buffer::ScalarBuffer; + type Target = arrow::buffer::Buffer; #[inline] - fn deref(&self) -> &arrow::buffer::ScalarBuffer { + fn deref(&self) -> &arrow::buffer::Buffer { &self.0 } } diff --git a/crates/store/re_types/src/image.rs b/crates/store/re_types/src/image.rs index fc7787841de9..e170d34147c0 100644 --- a/crates/store/re_types/src/image.rs +++ b/crates/store/re_types/src/image.rs @@ -150,28 +150,20 @@ where /// Converts it to what is useful for the image API. pub fn blob_and_datatype_from_tensor(tensor_buffer: TensorBuffer) -> (Blob, ChannelDatatype) { match tensor_buffer { - TensorBuffer::U8(buffer) => (Blob(buffer), ChannelDatatype::U8), - TensorBuffer::U16(buffer) => (Blob(cast_to_u8(&buffer)), ChannelDatatype::U16), - TensorBuffer::U32(buffer) => (Blob(cast_to_u8(&buffer)), ChannelDatatype::U32), - TensorBuffer::U64(buffer) => (Blob(cast_to_u8(&buffer)), ChannelDatatype::U64), - TensorBuffer::I8(buffer) => (Blob(cast_to_u8(&buffer)), ChannelDatatype::I8), - TensorBuffer::I16(buffer) => (Blob(cast_to_u8(&buffer)), ChannelDatatype::I16), - TensorBuffer::I32(buffer) => (Blob(cast_to_u8(&buffer)), ChannelDatatype::I32), - TensorBuffer::I64(buffer) => (Blob(cast_to_u8(&buffer)), ChannelDatatype::I64), - TensorBuffer::F16(buffer) => (Blob(cast_to_u8(&buffer)), ChannelDatatype::F16), - TensorBuffer::F32(buffer) => (Blob(cast_to_u8(&buffer)), ChannelDatatype::F32), - TensorBuffer::F64(buffer) => (Blob(cast_to_u8(&buffer)), ChannelDatatype::F64), + TensorBuffer::U8(buffer) => (Blob(buffer.inner().clone()), ChannelDatatype::U8), + TensorBuffer::U16(buffer) => (Blob(buffer.inner().clone()), ChannelDatatype::U16), + TensorBuffer::U32(buffer) => (Blob(buffer.inner().clone()), ChannelDatatype::U32), + TensorBuffer::U64(buffer) => (Blob(buffer.inner().clone()), ChannelDatatype::U64), + TensorBuffer::I8(buffer) => (Blob(buffer.inner().clone()), ChannelDatatype::I8), + TensorBuffer::I16(buffer) => (Blob(buffer.inner().clone()), ChannelDatatype::I16), + TensorBuffer::I32(buffer) => (Blob(buffer.inner().clone()), ChannelDatatype::I32), + TensorBuffer::I64(buffer) => (Blob(buffer.inner().clone()), ChannelDatatype::I64), + TensorBuffer::F16(buffer) => (Blob(buffer.inner().clone()), ChannelDatatype::F16), + TensorBuffer::F32(buffer) => (Blob(buffer.inner().clone()), ChannelDatatype::F32), + TensorBuffer::F64(buffer) => (Blob(buffer.inner().clone()), ChannelDatatype::F64), } } -/// Reinterpret POD (plain-old-data) types to `u8`. -#[inline] -pub fn cast_to_u8( - buffer: &arrow::buffer::ScalarBuffer, -) -> ScalarBuffer { - arrow::buffer::ScalarBuffer::new(buffer.inner().clone(), 0, buffer.inner().len()) -} - // ---------------------------------------------------------------------------- /// Types that implement this can be used as image channel types. diff --git a/crates/utils/re_mcap/src/layers/raw.rs b/crates/utils/re_mcap/src/layers/raw.rs index 4e0af00f2d23..421a436e4349 100644 --- a/crates/utils/re_mcap/src/layers/raw.rs +++ b/crates/utils/re_mcap/src/layers/raw.rs @@ -1,4 +1,4 @@ -use arrow::array::{ListBuilder, UInt8Builder}; +use arrow::array::LargeBinaryBuilder; use re_chunk::{ChunkId, external::arrow::array::FixedSizeListBuilder}; use re_types::{ Component as _, ComponentDescriptor, components, reflection::ComponentDescriptorExt as _, @@ -6,11 +6,11 @@ use re_types::{ use crate::{ Error, LayerIdentifier, MessageLayer, - parsers::{MessageParser, ParserContext, util::blob_list_builder}, + parsers::{MessageParser, ParserContext, util::fixed_size_list_builder}, }; struct RawMcapMessageParser { - data: FixedSizeListBuilder>, + data: FixedSizeListBuilder, } impl RawMcapMessageParser { @@ -18,7 +18,7 @@ impl RawMcapMessageParser { fn new(num_rows: usize) -> Self { Self { - data: blob_list_builder(num_rows), + data: fixed_size_list_builder(1, num_rows), } } } @@ -30,8 +30,7 @@ impl MessageParser for RawMcapMessageParser { msg: &::mcap::Message<'_>, ) -> anyhow::Result<()> { re_tracing::profile_function!(); - self.data.values().values().append_slice(&msg.data); - self.data.values().append(true); + self.data.values().append_value(&msg.data); self.data.append(true); Ok(()) } diff --git a/crates/utils/re_mcap/src/parsers/mod.rs b/crates/utils/re_mcap/src/parsers/mod.rs index 55ebb144e617..174edd2c3d5d 100644 --- a/crates/utils/re_mcap/src/parsers/mod.rs +++ b/crates/utils/re_mcap/src/parsers/mod.rs @@ -7,32 +7,12 @@ pub use decode::{ChannelId, MessageParser, ParserContext}; /// Defines utility functions shared across parsers. pub(crate) mod util { - use arrow::{ - array::{FixedSizeListBuilder, ListBuilder, UInt8Builder}, - datatypes::{DataType, Field}, - }; - use re_types::{Loggable as _, components}; - use std::sync::Arc; + use arrow::array::{ArrayBuilder, FixedSizeListBuilder}; - pub(crate) fn fixed_size_list_builder( + pub(crate) fn fixed_size_list_builder( value_length: i32, capacity: usize, - ) -> arrow::array::FixedSizeListBuilder { - arrow::array::FixedSizeListBuilder::with_capacity( - Default::default(), - value_length, - capacity, - ) - } - - pub(crate) fn blob_list_builder( - capacity: usize, - ) -> FixedSizeListBuilder> { - let list_builder = ListBuilder::::default() - .with_field(Arc::new(Field::new_list_field(DataType::UInt8, false))); - - FixedSizeListBuilder::with_capacity(list_builder, 1, capacity).with_field(Arc::new( - Field::new_list_field(components::Blob::arrow_datatype(), false), - )) + ) -> FixedSizeListBuilder { + FixedSizeListBuilder::with_capacity(Default::default(), value_length, capacity) } } diff --git a/crates/utils/re_mcap/src/parsers/ros2msg/sensor_msgs/point_cloud_2.rs b/crates/utils/re_mcap/src/parsers/ros2msg/sensor_msgs/point_cloud_2.rs index 4943c36bac05..77dcd76256f3 100644 --- a/crates/utils/re_mcap/src/parsers/ros2msg/sensor_msgs/point_cloud_2.rs +++ b/crates/utils/re_mcap/src/parsers/ros2msg/sensor_msgs/point_cloud_2.rs @@ -3,8 +3,8 @@ use std::io::Cursor; use super::super::definitions::sensor_msgs::{self, PointField, PointFieldDatatype}; use arrow::{ array::{ - BooleanBuilder, FixedSizeListBuilder, ListBuilder, StringBuilder, StructBuilder, - UInt8Builder, UInt32Builder, + BooleanBuilder, FixedSizeListBuilder, LargeBinaryBuilder, ListBuilder, StringBuilder, + StructBuilder, UInt8Builder, UInt32Builder, }, datatypes::{DataType, Field, Fields}, }; @@ -22,7 +22,7 @@ use crate::{ parsers::{ cdr, decode::{MessageParser, ParserContext}, - util::{blob_list_builder, fixed_size_list_builder}, + util::fixed_size_list_builder, }, }; @@ -35,7 +35,7 @@ pub struct PointCloud2MessageParser { is_bigendian: FixedSizeListBuilder, point_step: FixedSizeListBuilder, row_step: FixedSizeListBuilder, - data: FixedSizeListBuilder>, + data: FixedSizeListBuilder, is_dense: FixedSizeListBuilder, // We lazily create this, only if we can interpret the point cloud semantically. @@ -75,7 +75,7 @@ impl PointCloud2MessageParser { is_bigendian: fixed_size_list_builder(1, num_rows), point_step: fixed_size_list_builder(1, num_rows), row_step: fixed_size_list_builder(1, num_rows), - data: blob_list_builder(num_rows), + data: fixed_size_list_builder(1, num_rows), is_dense: fixed_size_list_builder(1, num_rows), points_3ds: None, @@ -257,7 +257,7 @@ impl MessageParser for PointCloud2MessageParser { point_step.values().append_slice(&[point_cloud.point_step]); row_step.values().append_slice(&[point_cloud.row_step]); - data.values().values().append_slice(&point_cloud.data); + data.values().append_value(&point_cloud.data); is_dense.values().append_slice(&[point_cloud.is_dense]); height.append(true); @@ -267,7 +267,6 @@ impl MessageParser for PointCloud2MessageParser { row_step.append(true); is_dense.append(true); - data.values().append(true); data.append(true); Ok(()) diff --git a/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_narrow/VideoSample_placeholder.png b/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_narrow/VideoSample_placeholder.png index 5d1858b7254e..e6461d640f11 100644 --- a/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_narrow/VideoSample_placeholder.png +++ b/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_narrow/VideoSample_placeholder.png @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:b052a99437a77e4c3ddb6613b3d944fe90e872b50b2be52d0c1a7041ee926b35 -size 2843 +oid sha256:c78265faf1086880f4307cd13a33b32e0f5dfdca11927480ea7dfbd720b1469b +size 3056 diff --git a/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_narrow/custom_large_blob_any_value_large_blob.png b/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_narrow/custom_large_blob_any_value_large_blob.png deleted file mode 100644 index 03bb1a182256..000000000000 --- a/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_narrow/custom_large_blob_any_value_large_blob.png +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:14c8dec3337ae04ad74e88a1a1dc98c5503bb43fabc620f51f8c92a5190afdfc -size 3670 diff --git a/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_narrow/custom_small_array_any_value_small_array.png b/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_narrow/custom_small_array_any_value_small_array.png index 3850596436a3..11a813d082c4 100644 --- a/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_narrow/custom_small_array_any_value_small_array.png +++ b/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_narrow/custom_small_array_any_value_small_array.png @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:38bf2e72b7b08aaf3c27a15b096cfee9e443cbfa72d3336b3cce381f7a5eba99 -size 3151 +oid sha256:a3b6c910752fc5348ca3a6f122f00f3a08d2c6152daee1ac666dcb21f09843e1 +size 4059 diff --git a/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_narrow/one_large_blob_any_value_one_large_blob.png b/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_narrow/one_large_blob_any_value_one_large_blob.png new file mode 100644 index 000000000000..0f356869535c --- /dev/null +++ b/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_narrow/one_large_blob_any_value_one_large_blob.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:17ca10070dbf6c6b2cd17e75dae941909ab73068fe185c775967bd76eda2f4fc +size 3649 diff --git a/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_narrow/one_small_blob_any_value_one_small_blob.png b/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_narrow/one_small_blob_any_value_one_small_blob.png new file mode 100644 index 000000000000..c5fa581351bd --- /dev/null +++ b/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_narrow/one_small_blob_any_value_one_small_blob.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:bab838f7d573b54f6324c448bd170ec144e4c8e6c325ce01499c2bf357d95ec7 +size 3047 diff --git a/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_narrow/two_large_blobs_any_value_two_large_blobs.png b/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_narrow/two_large_blobs_any_value_two_large_blobs.png new file mode 100644 index 000000000000..4abc594e4066 --- /dev/null +++ b/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_narrow/two_large_blobs_any_value_two_large_blobs.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:52d93f776a5de0e6976a0dcdf5acc30e77d7ff55aa610f51ee7a4111aee4de4a +size 4105 diff --git a/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_wide/VideoSample_placeholder.png b/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_wide/VideoSample_placeholder.png index 7bca27a85996..c429f0e19217 100644 --- a/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_wide/VideoSample_placeholder.png +++ b/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_wide/VideoSample_placeholder.png @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:e2dfb7e6848faae48ec178407bcc37524bfc98a03ce4c5b6a44a45616db27f83 -size 3158 +oid sha256:7b5fe5fde8351c6b31046d97f6f79e6fdacd8000c695a75fdf3878e1c7d3b1d0 +size 3373 diff --git a/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_wide/custom_large_blob_any_value_large_blob.png b/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_wide/custom_large_blob_any_value_large_blob.png deleted file mode 100644 index 9ac0c0c64efa..000000000000 --- a/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_wide/custom_large_blob_any_value_large_blob.png +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:da0df8f8be85731d04abf18c7196ae70c7944ac802d0a8ae2011cf05ed2f7d27 -size 3981 diff --git a/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_wide/custom_small_array_any_value_small_array.png b/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_wide/custom_small_array_any_value_small_array.png index 94b28772b908..db966c4dfaf5 100644 --- a/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_wide/custom_small_array_any_value_small_array.png +++ b/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_wide/custom_small_array_any_value_small_array.png @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:b12caf4b4d09cb8de8d296a6290eefa8793279e3c09ca4c0be67500e8332e200 -size 3459 +oid sha256:93951d4e2fadf80af35cb07dcd0d776febdbdc8c1b8bdb9492f50e00c1b37548 +size 4970 diff --git a/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_wide/one_large_blob_any_value_one_large_blob.png b/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_wide/one_large_blob_any_value_one_large_blob.png new file mode 100644 index 000000000000..771b136fba6d --- /dev/null +++ b/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_wide/one_large_blob_any_value_one_large_blob.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:ba4368804019097ac9fa6d609ef2be3df3ba1d457a50b91ed61449f4da45e200 +size 3961 diff --git a/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_wide/one_small_blob_any_value_one_small_blob.png b/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_wide/one_small_blob_any_value_one_small_blob.png new file mode 100644 index 000000000000..5ae075deb66a --- /dev/null +++ b/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_wide/one_small_blob_any_value_one_small_blob.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:ea569e1d409fdfba25ed45e74c98326f6f077b076a895cc0f1d552db1e0b1edb +size 3364 diff --git a/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_wide/two_large_blobs_any_value_two_large_blobs.png b/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_wide/two_large_blobs_any_value_two_large_blobs.png new file mode 100644 index 000000000000..d76c9ef66a28 --- /dev/null +++ b/crates/viewer/re_component_ui/tests/snapshots/all_components_list_item_wide/two_large_blobs_any_value_two_large_blobs.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:4a65d6a53d4cefc87611b9dcb0418981d67bd672f2addf1eebef0f74224005f2 +size 5081 diff --git a/crates/viewer/re_component_ui/tests/test_all_components_ui.rs b/crates/viewer/re_component_ui/tests/test_all_components_ui.rs index fad880740dbc..acd8bd190710 100644 --- a/crates/viewer/re_component_ui/tests/test_all_components_ui.rs +++ b/crates/viewer/re_component_ui/tests/test_all_components_ui.rs @@ -103,9 +103,19 @@ fn test_cases(reflection: &Reflection) -> Vec { "any_value_small_array", ), TestCase::from_arrow( - ComponentType::from("custom_large_blob"), - arrow::array::UInt8Array::from(vec![42; 3001]), - "any_value_large_blob", + ComponentType::from("one_small_blob"), + arrow::array::BinaryArray::from_vec(vec![&[1, 2, 3]]), + "any_value_one_small_blob", + ), + TestCase::from_arrow( + ComponentType::from("one_large_blob"), + arrow::array::LargeBinaryArray::from_vec(vec![&vec![42_u8; 3001]]), + "any_value_one_large_blob", + ), + TestCase::from_arrow( + ComponentType::from("two_large_blobs"), + arrow::array::LargeBinaryArray::from_vec(vec![&vec![42_u8; 3001], &vec![69_u8; 6001]]), + "any_value_two_large_blobs", ), TestCase::from_arrow( ComponentType::from("custom_struct_array"), diff --git a/crates/viewer/re_ui/src/arrow_ui.rs b/crates/viewer/re_ui/src/arrow_ui.rs index 95bee335e45b..4087bf5b37aa 100644 --- a/crates/viewer/re_ui/src/arrow_ui.rs +++ b/crates/viewer/re_ui/src/arrow_ui.rs @@ -96,9 +96,7 @@ pub fn arrow_ui(ui: &mut egui::Ui, ui_layout: UiLayout, array: &dyn arrow::array } else { let instance_count_str = re_format::format_uint(instance_count); - let string = if array.data_type() == &DataType::UInt8 { - re_format::format_bytes(instance_count as _) - } else if let Some(dtype) = simple_datatype_string(array.data_type()) { + let string = if let Some(dtype) = simple_datatype_string(array.data_type()) { format!("{instance_count_str} items of {dtype}") } else if let DataType::Struct(fields) = array.data_type() { format!( diff --git a/crates/viewer/re_ui/tests/snapshots/arrow_ui.png b/crates/viewer/re_ui/tests/snapshots/arrow_ui.png index 1ea7aa482181..c80399ae9fcb 100644 --- a/crates/viewer/re_ui/tests/snapshots/arrow_ui.png +++ b/crates/viewer/re_ui/tests/snapshots/arrow_ui.png @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:61e49c24e3291cc1cd0a917b5b26b32951c1cd095d9131030ed13f9c3c76b4cb -size 49877 +oid sha256:b7099b0b334f607ef84e645af30cb138f524affa8b5cf90c21a33bc15eabe1fd +size 49286 diff --git a/crates/viewer/re_viewer_context/src/cache/video_stream_cache.rs b/crates/viewer/re_viewer_context/src/cache/video_stream_cache.rs index 093df81e374d..b004521dc8dc 100644 --- a/crates/viewer/re_viewer_context/src/cache/video_stream_cache.rs +++ b/crates/viewer/re_viewer_context/src/cache/video_stream_cache.rs @@ -292,6 +292,46 @@ fn read_samples_from_chunk( ) -> Result<(), VideoStreamProcessingError> { re_tracing::profile_function!(); + let sample_descr = VideoStream::descriptor_sample(); + let Some(raw_array) = chunk.raw_component_array(&sample_descr) else { + // This chunk doesn't have any video chunks. + return Ok(()); + }; + + if let Some(binary_array) = raw_array.downcast_array_ref::() { + read_sample_from_binary_array(timeline, chunk, video_descr, chunk_buffers, binary_array); + Ok(()) + } else if let Some(binary_array) = + raw_array.downcast_array_ref::() + { + read_sample_from_binary_array(timeline, chunk, video_descr, chunk_buffers, binary_array); + Ok(()) + } else { + Err(VideoStreamProcessingError::InvalidVideoSampleType( + raw_array.data_type().clone(), + )) + } +} + +fn read_sample_from_binary_array( + timeline: TimelineName, + chunk: &re_chunk::Chunk, + video_descr: &mut re_video::VideoDataDescription, + chunk_buffers: &mut StableIndexDeque, + binary_array: &arrow::array::GenericByteArray>, +) { + // The underlying data within a chunk is logically a Vec>, + // where the inner Vec always has a len=1, because we're dealing with a "mono-component" + // (each VideoStream has exactly one VideoSample instance per time)`. + // + // Because of how arrow works, the bytes of all the blobs are actually sequential in memory (yay!) in a single buffer, + // what you call values below (could use a better name btw). + // + // We want to figure out the byte offsets of each blob within the arrow buffer that holds all the blobs, + // i.e. get out a Vec. + + let sample_descr = VideoStream::descriptor_sample(); + let re_video::VideoDataDescription { codec, samples, @@ -300,12 +340,6 @@ fn read_samples_from_chunk( .. } = video_descr; - let sample_descr = VideoStream::descriptor_sample(); - let Some(raw_array) = chunk.raw_component_array(&sample_descr) else { - // This chunk doesn't have any video chunks. - return Ok(()); - }; - let mut previous_max_presentation_timestamp = samples .back() .map_or(re_video::Time::MIN, |s| s.presentation_timestamp); @@ -322,41 +356,21 @@ fn read_samples_from_chunk( re_log::warn_once!( "Out of order logging on video streams is not supported. Ignoring any out of order samples." ); - return Ok(()); + return; } } None => { // This chunk doesn't have any data on this timeline. - return Ok(()); + return; } } // Make sure our index is sorted by the timeline we're interested in. let chunk = chunk.sorted_by_timeline_if_unsorted(&timeline); - // The underlying data within a chunk is logically a Vec>, - // where the inner Vec always has a len=1, because we're dealing with a "mono-component" - // (each VideoStream has exactly one VideoSample instance per time)`. - // - // Because of how arrow works, the bytes of all the blobs are actually sequential in memory (yay!) in a single buffer, - // what you call values below (could use a better name btw). - // - // We want to figure out the byte offsets of each blob within the arrow buffer that holds all the blobs, - // i.e. get out a Vec. - let inner_list_array = raw_array - .downcast_array_ref::() - .ok_or(VideoStreamProcessingError::InvalidVideoSampleType( - raw_array.data_type().clone(), - ))?; - let values = inner_list_array - .values() - .downcast_array_ref::>() - .ok_or(VideoStreamProcessingError::InvalidVideoSampleType( - raw_array.data_type().clone(), - ))?; - let values = values.values().inner(); + let buffer = binary_array.values(); - let offsets = inner_list_array.offsets(); + let offsets = binary_array.offsets(); let lengths = offsets.lengths().collect::>(); let buffer_index = chunk_buffers.next_index(); @@ -380,8 +394,8 @@ fn read_samples_from_chunk( } let sample_idx = sample_base_idx + start; - let byte_span = Span { start:offsets[start] as usize, len: lengths[start] }; - let sample_bytes = &values[byte_span.range()]; + let byte_span = Span { start: offsets[start].as_usize(), len: lengths[start] }; + let sample_bytes = &buffer[byte_span.range()]; // Note that the conversion of this time value is already handled by `VideoDataDescription::timescale`: // For sequence time we use a scale of 1, for nanoseconds time we use a scale of 1_000_000_000. @@ -459,7 +473,7 @@ fn read_samples_from_chunk( // Any new samples actually added? Early out if not. if sample_base_idx == samples.next_index() { - return Ok(()); + return; } // Fill out durations for all new samples plus the first existing sample for which we didn't know the duration yet. @@ -486,7 +500,7 @@ fn read_samples_from_chunk( } chunk_buffers.push_back(SampleBuffer { - buffer: values.clone(), + buffer: buffer.clone(), source_chunk_id: chunk.id(), sample_index_range: sample_base_idx..samples.next_index(), }); @@ -499,8 +513,6 @@ fn read_samples_from_chunk( chunk.entity_path() ); } - - Ok(()) } impl Cache for VideoStreamCache { diff --git a/docs/content/reference/migration/migration-0-25.md b/docs/content/reference/migration/migration-0-25.md index c43258d825ff..d9a826b1e085 100644 --- a/docs/content/reference/migration/migration-0-25.md +++ b/docs/content/reference/migration/migration-0-25.md @@ -29,3 +29,14 @@ Previously this could only be configured for gRPC sinks, and it was configured o In the C++ and Python APIs, negative timeouts used to have special meaning. Now they are no longer permitted. The Python flush calls now raises an error if the flushing did not complete successfully. + + +## Changed arrow encoding of blobs +We used to encode blobs as `List`, which was rather unidiomatic. +Now they are instead encoded as `Binary`. +Old data will be migrated on ingestion (zero-copy). + +Affects the following components: +- [`Blob`](https://rerun.io/docs/reference/types/components/blob) +- [`ImageBuffer`](https://rerun.io/docs/reference/types/components/image_buffer) +- [`VideoSample`](https://rerun.io/docs/reference/types/components/video_sample) diff --git a/docs/content/reference/types/components/blob.md b/docs/content/reference/types/components/blob.md index b9fe1a1087f4..75b7067222cc 100644 --- a/docs/content/reference/types/components/blob.md +++ b/docs/content/reference/types/components/blob.md @@ -11,7 +11,7 @@ A binary blob of data. ## Arrow datatype ``` -List +binary ``` ## API reference links diff --git a/docs/content/reference/types/components/image_buffer.md b/docs/content/reference/types/components/image_buffer.md index 601f857b54eb..acef29d4c395 100644 --- a/docs/content/reference/types/components/image_buffer.md +++ b/docs/content/reference/types/components/image_buffer.md @@ -13,7 +13,7 @@ To interpret the contents of this buffer, see, [`components.ImageFormat`](https: ## Arrow datatype ``` -List +binary ``` ## API reference links diff --git a/docs/content/reference/types/components/video_sample.md b/docs/content/reference/types/components/video_sample.md index 244719f15c61..18fba252a0f5 100644 --- a/docs/content/reference/types/components/video_sample.md +++ b/docs/content/reference/types/components/video_sample.md @@ -16,7 +16,7 @@ Keyframes may require additional data, for details see [`components.VideoCodec`] ## Arrow datatype ``` -List +binary ``` ## API reference links diff --git a/docs/content/reference/types/datatypes/blob.md b/docs/content/reference/types/datatypes/blob.md index 389ccec848e1..864daa7810c2 100644 --- a/docs/content/reference/types/datatypes/blob.md +++ b/docs/content/reference/types/datatypes/blob.md @@ -8,7 +8,7 @@ A binary blob of data. ## Arrow datatype ``` -List +binary ``` ## API reference links diff --git a/rerun_cpp/src/rerun/archetypes/asset_video_ext.cpp b/rerun_cpp/src/rerun/archetypes/asset_video_ext.cpp index ed206eb844a1..5f5edb766dcd 100644 --- a/rerun_cpp/src/rerun/archetypes/asset_video_ext.cpp +++ b/rerun_cpp/src/rerun/archetypes/asset_video_ext.cpp @@ -93,16 +93,35 @@ namespace rerun::archetypes { if (!blob.has_value()) { return std::vector(); } - auto blob_list_array = std::dynamic_pointer_cast(blob.value().array); - if (!blob_list_array) { - return Error(ErrorCode::InvalidComponent, "Blob array is not a primitive array"); - } - auto blob_array = - std::dynamic_pointer_cast(blob_list_array->values()); - if (!blob_array) { - return Error(ErrorCode::InvalidComponent, "Blob array is not a primitive array"); + + auto& array = blob.value().array; + + int64_t num_bytes; + const uint8_t* bytes; + + if (auto binary_array = std::dynamic_pointer_cast(array)) { + if (binary_array->length() != 1) { + return Error( + ErrorCode::InvalidComponent, + "Video blob array should be a single video file" + ); + } + + int32_t num_bytes_i32; + bytes = binary_array->GetValue(0, &num_bytes_i32); + num_bytes = static_cast(num_bytes_i32); + } else if (auto large_binary_array = std::dynamic_pointer_cast(array)) { + if (large_binary_array->length() != 1) { + return Error( + ErrorCode::InvalidComponent, + "Video blob array should be a single video file" + ); + } + + bytes = large_binary_array->GetValue(0, &num_bytes); + } else { + return Error(ErrorCode::InvalidComponent, "Video blob array is not a binary array"); } - auto blob_array_data = blob_array->values(); rr_string media_type_c = detail::to_rr_string(std::nullopt); if (media_type.has_value()) { @@ -117,8 +136,8 @@ namespace rerun::archetypes { rr_error status = {}; rr_video_asset_read_frame_timestamps_nanos( - blob_array_data->data(), - static_cast(blob_array_data->size()), + bytes, + static_cast(num_bytes), media_type_c, &frame_timestamps, &alloc_timestamps, diff --git a/rerun_cpp/src/rerun/datatypes/blob.cpp b/rerun_cpp/src/rerun/datatypes/blob.cpp index 10b05e46fcdf..29e27d67f8f5 100644 --- a/rerun_cpp/src/rerun/datatypes/blob.cpp +++ b/rerun_cpp/src/rerun/datatypes/blob.cpp @@ -10,7 +10,7 @@ namespace rerun::datatypes {} namespace rerun { const std::shared_ptr& Loggable::arrow_datatype() { - static const auto datatype = arrow::list(arrow::field("item", arrow::uint8(), false)); + static const auto datatype = arrow::large_binary(); return datatype; } @@ -24,7 +24,7 @@ namespace rerun { ARROW_ASSIGN_OR_RAISE(auto builder, arrow::MakeBuilder(datatype, pool)) if (instances && num_instances > 0) { RR_RETURN_NOT_OK(Loggable::fill_arrow_array_builder( - static_cast(builder.get()), + static_cast(builder.get()), instances, num_instances )); @@ -35,7 +35,7 @@ namespace rerun { } rerun::Error Loggable::fill_arrow_array_builder( - arrow::ListBuilder* builder, const datatypes::Blob* elements, size_t num_elements + arrow::LargeBinaryBuilder* builder, const datatypes::Blob* elements, size_t num_elements ) { if (builder == nullptr) { return rerun::Error(ErrorCode::UnexpectedNullArgument, "Passed array builder is null."); @@ -47,17 +47,11 @@ namespace rerun { ); } - auto value_builder = static_cast(builder->value_builder()); ARROW_RETURN_NOT_OK(builder->Reserve(static_cast(num_elements))); - ARROW_RETURN_NOT_OK(value_builder->Reserve(static_cast(num_elements * 2))); - for (size_t elem_idx = 0; elem_idx < num_elements; elem_idx += 1) { - const auto& element = elements[elem_idx]; - ARROW_RETURN_NOT_OK(builder->Append()); - ARROW_RETURN_NOT_OK(value_builder->AppendValues( - element.data.data(), - static_cast(element.data.size()), - nullptr + ARROW_RETURN_NOT_OK(builder->Append( + elements[elem_idx].data.data(), + static_cast(elements[elem_idx].data.size()) )); } diff --git a/rerun_cpp/src/rerun/datatypes/blob.hpp b/rerun_cpp/src/rerun/datatypes/blob.hpp index 7008b940d6dd..4f99b67da379 100644 --- a/rerun_cpp/src/rerun/datatypes/blob.hpp +++ b/rerun_cpp/src/rerun/datatypes/blob.hpp @@ -13,7 +13,7 @@ namespace arrow { class Array; class DataType; - class ListBuilder; + class LargeBinaryBuilder; } // namespace arrow namespace rerun::datatypes { @@ -60,7 +60,7 @@ namespace rerun { /// Fills an arrow array builder with an array of this type. static rerun::Error fill_arrow_array_builder( - arrow::ListBuilder* builder, const datatypes::Blob* elements, size_t num_elements + arrow::LargeBinaryBuilder* builder, const datatypes::Blob* elements, size_t num_elements ); }; } // namespace rerun diff --git a/rerun_py/rerun_sdk/rerun/archetypes/image_ext.py b/rerun_py/rerun_sdk/rerun/archetypes/image_ext.py index 863040edb2d7..c3721489ca04 100644 --- a/rerun_py/rerun_sdk/rerun/archetypes/image_ext.py +++ b/rerun_py/rerun_sdk/rerun/archetypes/image_ext.py @@ -272,13 +272,19 @@ def as_pil_image(self: Any) -> PILImage.Image: f"Converting image with pixel_format {image_format.pixel_format} into PIL is not yet supported" ) - buf = self.buffer.as_arrow_array().values.to_numpy().view(image_format.channel_datatype.to_np_dtype()) + buffer = self.buffer.as_arrow_array() + + if len(buffer) != 1: + raise ValueError(f"Expected exactly 1 buffer, got {len(buffer)}") + + blob_bytes = buffer[0].as_py() + array = np.frombuffer(blob_bytes, dtype=image_format.channel_datatype.to_np_dtype()) # Note: np array shape is always (height, width, channels) if image_format.color_model == ColorModel.L: - image = buf.reshape(image_format.height, image_format.width) + image = array.reshape(image_format.height, image_format.width) # type: ignore[assignment] else: - image = buf.reshape(image_format.height, image_format.width, image_format.color_model.num_channels()) + image = array.reshape(image_format.height, image_format.width, image_format.color_model.num_channels()) # type: ignore[assignment] # PIL assumes L or RGB[A]: if image_format.color_model == ColorModel.BGR: diff --git a/rerun_py/rerun_sdk/rerun/datatypes/blob.py b/rerun_py/rerun_sdk/rerun/datatypes/blob.py index 68a950808bc0..f0af43821af4 100644 --- a/rerun_py/rerun_sdk/rerun/datatypes/blob.py +++ b/rerun_py/rerun_sdk/rerun/datatypes/blob.py @@ -16,9 +16,6 @@ from .._baseclasses import ( BaseBatch, ) -from .._converters import ( - to_np_uint8, -) from .blob_ext import BlobExt __all__ = ["Blob", "BlobArrayLike", "BlobBatch", "BlobLike"] @@ -34,15 +31,7 @@ def __init__(self: Any, data: BlobLike) -> None: # You can define your own __init__ function as a member of BlobExt in blob_ext.py self.__attrs_init__(data=data) - data: npt.NDArray[np.uint8] = field(converter=to_np_uint8) - - def __array__(self, dtype: npt.DTypeLike = None, copy: bool | None = None) -> npt.NDArray[Any]: - # You can define your own __array__ function as a member of BlobExt in blob_ext.py - return np.asarray(self.data, dtype=dtype, copy=copy) - - def __len__(self) -> int: - # You can define your own __len__ function as a member of BlobExt in blob_ext.py - return len(self.data) + data: bytes = field(converter=bytes) if TYPE_CHECKING: @@ -54,7 +43,7 @@ def __len__(self) -> int: class BlobBatch(BaseBatch[BlobArrayLike]): - _ARROW_DATATYPE = pa.list_(pa.field("item", pa.uint8(), nullable=False, metadata={})) + _ARROW_DATATYPE = pa.large_binary() @staticmethod def _native_to_pa_array(data: BlobArrayLike, data_type: pa.DataType) -> pa.Array: diff --git a/rerun_py/rerun_sdk/rerun/datatypes/blob_ext.py b/rerun_py/rerun_sdk/rerun/datatypes/blob_ext.py index 8390cb0768d0..ff1fc5455f4f 100644 --- a/rerun_py/rerun_sdk/rerun/datatypes/blob_ext.py +++ b/rerun_py/rerun_sdk/rerun/datatypes/blob_ext.py @@ -25,51 +25,42 @@ def native_to_pa_array_override(data: BlobArrayLike, data_type: pa.DataType) -> if isinstance(data, BlobBatch): return data.as_arrow_array() - # pure-numpy fast path + # numpy fast path: elif isinstance(data, np.ndarray): if len(data) == 0: - inners = [] + return pa.array([], type=pa.large_binary()) elif data.ndim == 1: - inners = [pa.array(np.array(data, dtype=np.uint8).flatten())] + return pa.array([np.array(data, dtype=np.uint8).tobytes()], type=pa.large_binary()) else: - o = 0 - offsets = [o] + [o := next_offset(o, arr) for arr in data] - inner = pa.array(np.array(data, dtype=np.uint8).flatten()) - return pa.ListArray.from_arrays(offsets, inner, type=data_type) + return pa.array([np.array(arr, dtype=np.uint8).tobytes() for arr in data], type=pa.large_binary()) - # pure-object elif isinstance(data, Blob): - inners = [pa.array(np.array(data.data, dtype=np.uint8).flatten())] + return pa.array([data.data], type=pa.large_binary()) - # pure-bytes elif isinstance(data, bytes): - inners = [pa.array(np.frombuffer(data, dtype=np.uint8))] + return pa.array([data], type=pa.large_binary()) elif hasattr(data, "read"): - inners = [pa.array(np.frombuffer(data.read(), dtype=np.uint8))] + return pa.array([data.read()], type=pa.large_binary()) - # sequences elif isinstance(data, Sequence): if len(data) == 0: - inners = [] + return pa.array([], type=pa.large_binary()) elif isinstance(data[0], Blob): - inners = [pa.array(np.array(datum.data, dtype=np.uint8).flatten()) for datum in data] # type: ignore[union-attr] + return pa.array( + [ + np.array( + datum.data, # type: ignore[union-attr] + dtype=np.uint8, + ).tobytes() + for datum in data + ], + type=pa.large_binary(), + ) elif isinstance(data[0], bytes): - inners = [pa.array(np.frombuffer(datum, dtype=np.uint8)) for datum in data] # type: ignore[arg-type] + return pa.array(list(data), type=pa.large_binary()) # type: ignore[arg-type] else: - inners = [pa.array(np.array(datum, dtype=np.uint8).flatten()) for datum in data] + return pa.array([np.array(datum, dtype=np.uint8).tobytes() for datum in data], type=pa.large_binary()) else: - inners = [pa.array(np.array(data.data, dtype=np.uint8).flatten())] - - if len(inners) == 0: - offsets = pa.array([0], type=pa.int32()) - inner = np.array([], dtype=np.uint8).flatten() - return pa.ListArray.from_arrays(offsets, inner, type=data_type) - - o = 0 - offsets = [o] + [o := next_offset(o, inner) for inner in inners] - - inner = pa.concat_arrays(inners) - - return pa.ListArray.from_arrays(offsets, inner, type=data_type) + return pa.array([np.array(data.data, dtype=np.uint8).tobytes()], type=pa.large_binary()) diff --git a/rerun_py/src/video.rs b/rerun_py/src/video.rs index 511b36781d59..9448b76e622d 100644 --- a/rerun_py/src/video.rs +++ b/rerun_py/src/video.rs @@ -2,7 +2,7 @@ use pyo3::{Bound, PyAny, PyResult, exceptions::PyRuntimeError, pyfunction}; -use re_arrow_util::ArrowArrayDowncastRef as _; +use re_chunk::ArrowArray as _; use re_video::VideoLoadError; use crate::arrow::array_to_rust; @@ -20,19 +20,13 @@ pub fn asset_video_read_frame_timestamps_nanos( media_type: Option<&str>, ) -> PyResult> { let video_bytes_arrow_array = array_to_rust(video_bytes_arrow_array)?; - - let video_bytes_arrow_uint8_array = video_bytes_arrow_array - .downcast_array_ref::() - .and_then(|arr| arr.values().downcast_array_ref::()) - .ok_or_else(|| { - PyRuntimeError::new_err(format!( - "Expected arrow array to be a list with a single uint8 array, instead it has the datatype {:?}", - video_bytes_arrow_array.data_type() - )) - })?; - - let video_bytes = video_bytes_arrow_uint8_array.values().as_ref(); - + let video_bytes = binary_array_as_slice(&video_bytes_arrow_array).ok_or_else(|| { + PyRuntimeError::new_err(format!( + "Expected video bytes to be a single BinaryArray, instead it has the datatype {:?} x {}", + video_bytes_arrow_array.data_type(), + video_bytes_arrow_array.len(), + )) + })?; let Some(media_type) = media_type.or_else(|| infer::Infer::new().get(video_bytes).map(|v| v.mime_type())) else { @@ -49,3 +43,22 @@ pub fn asset_video_read_frame_timestamps_nanos( .collect(), ) } + +fn binary_array_as_slice(array: &std::sync::Arc) -> Option<&[u8]> { + if let Some(blob_data) = array.as_any().downcast_ref::() { + if blob_data.len() == 1 { + return Some(blob_data.value(0)); + } + } + + if let Some(blob_data) = array + .as_any() + .downcast_ref::() + { + if blob_data.len() == 1 { + return Some(blob_data.value(0)); + } + } + + None +}