Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/store/re_sorbet/src/migrations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod make_list_arrays;
mod v0_0_1__to__v0_0_2;
mod v0_0_2__to__v0_1_0;
mod v0_1_0__to__v0_1_1;
mod v0_1_1__to__v0_1_2;

/// This trait needs to be implemented by any new migrations. It ensures that
/// all migrations adhere to the same contract.
Expand Down Expand Up @@ -128,6 +129,7 @@ pub fn migrate_record_batch(mut batch: RecordBatch) -> RecordBatch {
batch = maybe_apply::<v0_0_1__to__v0_0_2::Migration>(&batch_version, batch);
batch = maybe_apply::<v0_0_2__to__v0_1_0::Migration>(&batch_version, batch);
batch = maybe_apply::<v0_1_0__to__v0_1_1::Migration>(&batch_version, batch);
batch = maybe_apply::<v0_1_1__to__v0_1_2::Migration>(&batch_version, batch);
batch
}
}
Expand Down
151 changes: 151 additions & 0 deletions crates/store/re_sorbet/src/migrations/v0_1_1__to__v0_1_2.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
//! Breaking changes:
//! * `Blob` is encoded as `Binary` instead of `List[u8]`
use std::sync::Arc;

use arrow::{
array::{
Array, ArrayRef, AsArray as _, ListArray, RecordBatch, RecordBatchOptions, UInt8Array,
},
datatypes::{DataType, Field, FieldRef, Schema},
};

use re_log::ResultExt as _;

pub struct Migration;

impl super::Migration for Migration {
const SOURCE_VERSION: semver::Version = semver::Version::new(0, 1, 1);
const TARGET_VERSION: semver::Version = semver::Version::new(0, 1, 2);

fn migrate(batch: RecordBatch) -> RecordBatch {
migrate_blobs(batch)
}
}

/// Change datatype from `List[u8]` to `Binary` for blobs
fn migrate_blobs(batch: RecordBatch) -> RecordBatch {
re_tracing::profile_function!();

/// Is this a `List<List<u8>>` ?
fn is_list_list_u8(datatype: &DataType) -> bool {
if let DataType::List(list_field) = datatype
&& let DataType::List(innermost_field) = list_field.data_type()
{
innermost_field.data_type() == &DataType::UInt8
} else {
false
}
}

fn is_blob_field(field: &Field) -> bool {
let components_with_blobs = [
"rerun.components.Blob",
"rerun.components.ImageBuffer",
"rerun.components.VideoSample",
];

if let Some(component_type) = field.metadata().get("rerun:component_type")
&& components_with_blobs.contains(&component_type.as_str())
{
is_list_list_u8(field.data_type())
} else {
false
}
}

let needs_migration = batch
.schema()
.fields()
.iter()
.any(|field| is_blob_field(field));

if !needs_migration {
return batch;
}

let num_columns = batch.num_columns();
let mut fields: Vec<FieldRef> = Vec::with_capacity(num_columns);
let mut columns: Vec<ArrayRef> = Vec::with_capacity(num_columns);

for (field, array) in itertools::izip!(batch.schema().fields(), batch.columns()) {
if is_blob_field(field) {
if let Some(new_array) = convert_list_list_u8_to_list_binary(array.as_ref()) {
let new_field = Field::new(
field.name(),
new_array.data_type().clone(),
field.is_nullable(),
)
.with_metadata(field.metadata().clone());

fields.push(new_field.into());
columns.push(Arc::new(new_array));

re_log::debug_once!(
"Changed datatype of '{}' from List[u8] to Binary",
field.name()
);
continue;
} else {
re_log::warn_once!("Failed to convert {} to Binary", field.name());
}
}

fields.push(field.clone());
columns.push(array.clone());
}

let schema = Arc::new(Schema::new_with_metadata(
fields,
batch.schema().metadata.clone(),
));

RecordBatch::try_new_with_options(
schema.clone(),
columns,
&RecordBatchOptions::default().with_row_count(Some(batch.num_rows())),
)
.ok_or_log_error()
.unwrap_or_else(|| RecordBatch::new_empty(schema))
}

/// `List[List[u8]]` -> `List[Binary]`
fn convert_list_list_u8_to_list_binary(list_array: &dyn Array) -> Option<ListArray> {
re_tracing::profile_function!();

// The outer `List[List[u8]]`
let list_array = list_array.as_list_opt()?;

// The inner List[u8] array
let inner_list_array: &ListArray = list_array.values().as_list_opt()?;

// The underlying u8 values
let u8_array: &UInt8Array = inner_list_array.values().as_primitive_opt()?;

// We consistently use 64-bit offsets for binary data in order to keep our backwards-compatibility checks simpler.
// Create the binary array reusing existing buffers
let binary_array = arrow::array::LargeBinaryArray::try_new(
arrow::buffer::OffsetBuffer::new(
inner_list_array
.offsets()
.iter()
.map(|&o| o as i64)
.collect(),
),
u8_array.values().clone().into_inner(),
inner_list_array.nulls().cloned(),
)
.ok()?;

// Create the outer list array with binary inner type
let outer_list = ListArray::try_new(
Arc::new(Field::new("item", DataType::LargeBinary, true)),
list_array.offsets().clone(),
Arc::new(binary_array),
list_array.nulls().cloned(),
)
.ok()?;

debug_assert_eq!(list_array.len(), outer_list.len());

Some(outer_list)
}
2 changes: 1 addition & 1 deletion crates/store/re_sorbet/src/sorbet_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl SorbetSchema {
/// This is bumped everytime we require a migration, but notable it is
/// decoupled from the Rerun version to avoid confusion as there will not
/// be a new Sorbet version for each Rerun version.
pub(crate) const METADATA_VERSION: semver::Version = semver::Version::new(0, 1, 1);
pub(crate) const METADATA_VERSION: semver::Version = semver::Version::new(0, 1, 2);
}

impl SorbetSchema {
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_types/definitions/rerun/datatypes/blob.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ table Blob (
"attr.rust.repr": "transparent",
"attr.rust.tuple_struct"
) {
data: [ubyte] (order: 100);
data: [ubyte] (order: 100, "attr.rerun.override_type": "binary");
}
109 changes: 48 additions & 61 deletions crates/store/re_types/src/datatypes/blob.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading