Skip to content

Commit 6616a0e

Browse files
committed
Re-enable support for 64-bit blobs (e.g. 2GiB+ videos)
1 parent 88815d0 commit 6616a0e

File tree

20 files changed

+403
-221
lines changed

20 files changed

+403
-221
lines changed

crates/store/re_sorbet/src/migrations/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ mod make_list_arrays;
2222
mod v0_0_1__to__v0_0_2;
2323
mod v0_0_2__to__v0_1_0;
2424
mod v0_1_0__to__v0_1_1;
25+
mod v0_1_1__to__v0_1_2;
2526

2627
/// This trait needs to be implemented by any new migrations. It ensures that
2728
/// all migrations adhere to the same contract.
@@ -128,6 +129,7 @@ pub fn migrate_record_batch(mut batch: RecordBatch) -> RecordBatch {
128129
batch = maybe_apply::<v0_0_1__to__v0_0_2::Migration>(&batch_version, batch);
129130
batch = maybe_apply::<v0_0_2__to__v0_1_0::Migration>(&batch_version, batch);
130131
batch = maybe_apply::<v0_1_0__to__v0_1_1::Migration>(&batch_version, batch);
132+
batch = maybe_apply::<v0_1_1__to__v0_1_2::Migration>(&batch_version, batch);
131133
batch
132134
}
133135
}
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
//! Breaking changes:
2+
//! * `Blob` is encoded as `Binary` instead of `List[u8]`
3+
use std::sync::Arc;
4+
5+
use arrow::{
6+
array::{
7+
Array, ArrayRef, AsArray as _, ListArray, RecordBatch, RecordBatchOptions, UInt8Array,
8+
},
9+
datatypes::{DataType, Field, FieldRef, Schema},
10+
};
11+
12+
use re_log::ResultExt as _;
13+
14+
pub struct Migration;
15+
16+
impl super::Migration for Migration {
17+
const SOURCE_VERSION: semver::Version = semver::Version::new(0, 1, 1);
18+
const TARGET_VERSION: semver::Version = semver::Version::new(0, 1, 2);
19+
20+
fn migrate(batch: RecordBatch) -> RecordBatch {
21+
migrate_blobs(batch)
22+
}
23+
}
24+
25+
/// Change datatype from `List[u8]` to `Binary` for blobs
26+
fn migrate_blobs(batch: RecordBatch) -> RecordBatch {
27+
re_tracing::profile_function!();
28+
29+
/// Is this a `List<List<u8>>` ?
30+
fn is_list_list_u8(datatype: &DataType) -> bool {
31+
if let DataType::List(list_field) = datatype
32+
&& let DataType::List(innermost_field) = list_field.data_type()
33+
{
34+
innermost_field.data_type() == &DataType::UInt8
35+
} else {
36+
false
37+
}
38+
}
39+
40+
fn is_blob_field(field: &Field) -> bool {
41+
let components_with_blobs = [
42+
"rerun.components.Blob",
43+
"rerun.components.ImageBuffer",
44+
"rerun.components.VideoSample",
45+
];
46+
47+
if let Some(component_type) = field.metadata().get("rerun:component_type")
48+
&& components_with_blobs.contains(&component_type.as_str())
49+
{
50+
is_list_list_u8(field.data_type())
51+
} else {
52+
false
53+
}
54+
}
55+
56+
let needs_migration = batch
57+
.schema()
58+
.fields()
59+
.iter()
60+
.any(|field| is_blob_field(field));
61+
62+
if !needs_migration {
63+
return batch;
64+
}
65+
66+
let num_columns = batch.num_columns();
67+
let mut fields: Vec<FieldRef> = Vec::with_capacity(num_columns);
68+
let mut columns: Vec<ArrayRef> = Vec::with_capacity(num_columns);
69+
70+
for (field, array) in itertools::izip!(batch.schema().fields(), batch.columns()) {
71+
if is_blob_field(field) {
72+
if let Some(new_array) = convert_list_list_u8_to_list_binary(array.as_ref()) {
73+
let new_field = Field::new(
74+
field.name(),
75+
new_array.data_type().clone(),
76+
field.is_nullable(),
77+
)
78+
.with_metadata(field.metadata().clone());
79+
80+
fields.push(new_field.into());
81+
columns.push(Arc::new(new_array));
82+
83+
re_log::debug_once!(
84+
"Changed datatype of '{}' from List[u8] to Binary",
85+
field.name()
86+
);
87+
continue;
88+
} else {
89+
re_log::warn_once!("Failed to convert {} to Binary", field.name());
90+
}
91+
}
92+
93+
fields.push(field.clone());
94+
columns.push(array.clone());
95+
}
96+
97+
let schema = Arc::new(Schema::new_with_metadata(
98+
fields,
99+
batch.schema().metadata.clone(),
100+
));
101+
102+
RecordBatch::try_new_with_options(
103+
schema.clone(),
104+
columns,
105+
&RecordBatchOptions::default().with_row_count(Some(batch.num_rows())),
106+
)
107+
.ok_or_log_error()
108+
.unwrap_or_else(|| RecordBatch::new_empty(schema))
109+
}
110+
111+
/// `List[List[u8]]` -> `List[Binary]`
112+
fn convert_list_list_u8_to_list_binary(list_array: &dyn Array) -> Option<ListArray> {
113+
re_tracing::profile_function!();
114+
115+
// The outer `List[List[u8]]`
116+
let list_array = list_array.as_list_opt()?;
117+
118+
// The inner List[u8] array
119+
let inner_list_array: &ListArray = list_array.values().as_list_opt()?;
120+
121+
// The underlying u8 values
122+
let u8_array: &UInt8Array = inner_list_array.values().as_primitive_opt()?;
123+
124+
// We consistently use 64-bit offsets for binary data in order to keep our backwards-compatibility checks simpler.
125+
// Create the binary array reusing existing buffers
126+
let binary_array = arrow::array::LargeBinaryArray::try_new(
127+
arrow::buffer::OffsetBuffer::new(
128+
inner_list_array
129+
.offsets()
130+
.iter()
131+
.map(|&o| o as i64)
132+
.collect(),
133+
),
134+
u8_array.values().clone().into_inner(),
135+
inner_list_array.nulls().cloned(),
136+
)
137+
.ok()?;
138+
139+
// Create the outer list array with binary inner type
140+
let outer_list = ListArray::try_new(
141+
Arc::new(Field::new("item", DataType::LargeBinary, true)),
142+
list_array.offsets().clone(),
143+
Arc::new(binary_array),
144+
list_array.nulls().cloned(),
145+
)
146+
.ok()?;
147+
148+
debug_assert_eq!(list_array.len(), outer_list.len());
149+
150+
Some(outer_list)
151+
}

crates/store/re_sorbet/src/sorbet_schema.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ impl SorbetSchema {
4242
/// This is bumped everytime we require a migration, but notable it is
4343
/// decoupled from the Rerun version to avoid confusion as there will not
4444
/// be a new Sorbet version for each Rerun version.
45-
pub(crate) const METADATA_VERSION: semver::Version = semver::Version::new(0, 1, 1);
45+
pub(crate) const METADATA_VERSION: semver::Version = semver::Version::new(0, 1, 2);
4646
}
4747

4848
impl SorbetSchema {

crates/store/re_types/definitions/rerun/datatypes/blob.fbs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,5 @@ table Blob (
1313
"attr.rust.repr": "transparent",
1414
"attr.rust.tuple_struct"
1515
) {
16-
data: [ubyte] (order: 100);
16+
data: [ubyte] (order: 100, "attr.rerun.override_type": "binary");
1717
}

crates/store/re_types/src/datatypes/blob.rs

Lines changed: 48 additions & 61 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)