Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 108 additions & 49 deletions crates/utils/re_mcap/src/layers/protobuf.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::collections::BTreeMap;

use arrow::{
array::{
ArrayBuilder, BinaryBuilder, BooleanBuilder, FixedSizeListBuilder, Float32Builder,
Expand All @@ -20,7 +18,7 @@ use crate::{Error, LayerIdentifier, MessageLayer};

struct ProtobufMessageParser {
message_descriptor: MessageDescriptor,
fields: BTreeMap<String, FixedSizeListBuilder<Box<dyn ArrayBuilder>>>,
builder: FixedSizeListBuilder<StructBuilder>,
}

#[derive(Debug, thiserror::Error)]
Expand All @@ -47,9 +45,6 @@ enum ProtobufError {
#[error("unknown enum number {0}")]
UnknownEnumNumber(i32),

#[error("unknown field name {0}")]
UnknownFieldName(String),

#[error("type {0} is not supported yet")]
UnsupportedType(&'static str),

Expand All @@ -59,19 +54,6 @@ enum ProtobufError {

impl ProtobufMessageParser {
fn new(num_rows: usize, message_descriptor: MessageDescriptor) -> Self {
let mut fields = BTreeMap::new();

// We recursively build up the Arrow builders for this particular message.
for field_descr in message_descriptor.fields() {
let name = field_descr.name().to_owned();
let builder = arrow_builder_from_field(&field_descr);
fields.insert(
name,
FixedSizeListBuilder::with_capacity(builder, 1, num_rows),
);
re_log::trace!("Added Arrow builder for fields: {}", field_descr.name());
}

if message_descriptor.oneofs().len() > 0 {
re_log::warn_once!(
"`oneof` in schema {} is not supported yet.",
Expand All @@ -84,9 +66,12 @@ impl ProtobufMessageParser {
);
}

let struct_builder = struct_builder_from_message(&message_descriptor);
let builder = FixedSizeListBuilder::with_capacity(struct_builder, 1, num_rows);

Self {
message_descriptor,
fields,
builder,
}
}
}
Expand All @@ -103,22 +88,31 @@ impl MessageParser for ProtobufMessageParser {
},
)?;

// We always need to make sure to iterate over all our builders, adding null values whenever
// a field is missing from the message that we received.
for (field, builder) in &mut self.fields {
if let Some(val) = dynamic_message.get_field_by_name(field.as_str()) {
let struct_builder = self.builder.values();

for (ith_arrow_field, field_builder) in
struct_builder.field_builders_mut().iter_mut().enumerate()
{
// Protobuf fields are 1-indexed, so we need to map the i-th builder.
let protobuf_number = ith_arrow_field as u32 + 1;

if let Some(val) = dynamic_message.get_field_by_number(protobuf_number) {
let field = dynamic_message
.descriptor()
.get_field_by_name(field)
.ok_or_else(|| ProtobufError::UnknownFieldName(field.to_owned()))?;
append_value(builder.values(), &field, val.as_ref())?;
builder.append(true);
.get_field(protobuf_number)
.ok_or_else(|| ProtobufError::MissingField {
field: protobuf_number,
})?;
append_value(field_builder, &field, val.as_ref())?;
re_log::trace!("Field {}: Finished writing to builders", field.full_name());
} else {
builder.append(false);
append_null_to_builder(field_builder)?;
}
}

struct_builder.append(true);
self.builder.append(true);

Ok(())
}

Expand All @@ -129,23 +123,20 @@ impl MessageParser for ProtobufMessageParser {

let Self {
message_descriptor,
fields,
mut builder,
} = *self;

let message_chunk = Chunk::from_auto_row_ids(
ChunkId::new(),
entity_path,
timelines,
fields
.into_iter()
.map(|(field, mut builder)| {
(
ComponentDescriptor::partial(field)
.with_builtin_archetype(message_descriptor.full_name()),
builder.finish().into(),
)
})
.collect(),
[(
ComponentDescriptor::partial("message")
.with_builtin_archetype(message_descriptor.full_name()),
builder.finish().into(),
)]
.into_iter()
.collect(),
)
.map_err(|err| Error::Other(anyhow::anyhow!(err)))?;

Expand All @@ -166,6 +157,36 @@ fn downcast_err<'a, T: std::any::Any>(
})
}

fn append_null_to_builder(builder: &mut dyn ArrayBuilder) -> Result<(), ProtobufError> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice if ArrayBuilder implemented append_null 🥲

// Try to append null by downcasting to known builder types
if let Some(b) = builder.as_any_mut().downcast_mut::<BooleanBuilder>() {
b.append_null();
} else if let Some(b) = builder.as_any_mut().downcast_mut::<Int32Builder>() {
b.append_null();
} else if let Some(b) = builder.as_any_mut().downcast_mut::<Int64Builder>() {
b.append_null();
} else if let Some(b) = builder.as_any_mut().downcast_mut::<UInt32Builder>() {
b.append_null();
} else if let Some(b) = builder.as_any_mut().downcast_mut::<UInt64Builder>() {
b.append_null();
} else if let Some(b) = builder.as_any_mut().downcast_mut::<Float32Builder>() {
b.append_null();
} else if let Some(b) = builder.as_any_mut().downcast_mut::<Float64Builder>() {
b.append_null();
} else if let Some(b) = builder.as_any_mut().downcast_mut::<StringBuilder>() {
b.append_null();
} else if let Some(b) = builder.as_any_mut().downcast_mut::<BinaryBuilder>() {
b.append_null();
} else if let Some(b) = builder.as_any_mut().downcast_mut::<StructBuilder>() {
b.append_null();
} else if let Some(b) = builder.as_any_mut().downcast_mut::<ListBuilder<Box<dyn ArrayBuilder>>>() {
b.append_null();
} else {
return Err(ProtobufError::UnsupportedType("Unknown builder type for append_null"));
}
Ok(())
}

fn append_value(
builder: &mut dyn ArrayBuilder,
field: &FieldDescriptor,
Expand Down Expand Up @@ -245,7 +266,19 @@ fn append_value(
let value = enum_descriptor
.get_value(*x)
.ok_or_else(|| ProtobufError::UnknownEnumNumber(*x))?;
downcast_err::<StringBuilder>(builder, val)?.append_value(value.name());

let struct_builder = downcast_err::<StructBuilder>(builder, val)?;
let field_builders = struct_builder.field_builders_mut();

// First field is "name" (String)
downcast_err::<StringBuilder>(field_builders[0].as_mut(), val)?
.append_value(value.name());

// Second field is "value" (Int32)
downcast_err::<Int32Builder>(field_builders[1].as_mut(), val)?
.append_value(*x);

struct_builder.append(true);
}
}

Expand Down Expand Up @@ -287,11 +320,18 @@ fn arrow_builder_from_field(descr: &FieldDescriptor) -> Box<dyn ArrayBuilder> {
Box::new(struct_builder_from_message(&message_descriptor)) as Box<dyn ArrayBuilder>
}
Kind::Enum(_) => {
// TODO(grtlr): It would be great to improve our `enum` support. Using `Utf8`
// means a lot of excess memory / storage usage. Ideally we would use something
// like `StringDictionary`, but it's not clear right now how this works with
// `dyn ArrayBuilder` and sharing entries across lists.
Box::new(StringBuilder::new())
// Create a struct with "name" (String) and "value" (Int32) fields.
// We can't use `DictionaryArray` because `concat` does not re-key, and there
// could be protobuf schema evolution with different enum values across chunks.
let fields = Fields::from(vec![
Field::new("name", DataType::Utf8, false),
Field::new("value", DataType::Int32, false),
]);
let field_builders: Vec<Box<dyn ArrayBuilder>> = vec![
Box::new(StringBuilder::new()),
Box::new(Int32Builder::new()),
];
Box::new(StructBuilder::new(fields, field_builders))
}
};

Expand All @@ -303,7 +343,21 @@ fn arrow_builder_from_field(descr: &FieldDescriptor) -> Box<dyn ArrayBuilder> {
}

fn arrow_field_from(descr: &FieldDescriptor) -> Field {
Field::new(descr.name(), datatype_from(descr), true)
let mut field = Field::new(descr.name(), datatype_from(descr), true);

// Add extension metadata for enum types
if matches!(descr.kind(), Kind::Enum(_)) {
field = field.with_metadata(
[(
"ARROW:extension:name".to_owned(),
"rerun.datatypes.ProtobufEnum".to_owned(),
)]
.into_iter()
.collect(),
);
}

field
}

fn datatype_from(descr: &FieldDescriptor) -> DataType {
Expand All @@ -325,8 +379,13 @@ fn datatype_from(descr: &FieldDescriptor) -> DataType {
DataType::Struct(fields)
}
Kind::Enum(_) => {
// TODO(grtlr): Explanation see above.
DataType::Utf8
// Struct with "name" (String) and "value" (Int32) fields.
// See comment in arrow_builder_from_field for why we use a struct.
let fields = Fields::from(vec![
Field::new("name", DataType::Utf8, false),
Field::new("value", DataType::Int32, false),
]);
DataType::Struct(fields)
}
};

Expand Down
Loading
Loading