Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Git LFS file not shown
106 changes: 35 additions & 71 deletions crates/utils/re_mcap/src/layers/ros2_reflection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,16 @@ use arrow::{
array::{
ArrayBuilder, ArrowPrimitiveType, BooleanBuilder, FixedSizeListBuilder, Float32Builder,
Float64Builder, Int8Builder, Int16Builder, Int32Builder, Int64Builder, ListBuilder,
NullBuilder, PrimitiveBuilder, StringBuilder, StructBuilder, UInt8Builder, UInt16Builder,
UInt32Builder, UInt64Builder,
PrimitiveBuilder, StringBuilder, StructBuilder, UInt8Builder, UInt16Builder, UInt32Builder,
UInt64Builder,
},
datatypes::{
DataType, Field, Fields, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type,
Int64Type, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
},
};
use cdr_encoding::CdrDeserializer;
use re_chunk::{
Chunk, ChunkId, EntityPath, TimeColumn, TimelineName, external::nohash_hasher::IntMap,
};
use re_chunk::{Chunk, ChunkId};
use re_ros_msg::{
MessageSchema,
deserialize::{MapResolver, MessageSeed, Value, primitive_array::PrimitiveArray},
Expand Down Expand Up @@ -53,9 +51,8 @@ pub fn decode_bytes(top: &MessageSchema, buf: &[u8]) -> anyhow::Result<Value> {
}

struct Ros2ReflectionMessageParser {
num_rows: usize,
message_schema: MessageSchema,
fields: Vec<(String, FixedSizeListBuilder<Box<dyn ArrayBuilder>>)>,
builder: FixedSizeListBuilder<MessageStructBuilder>,
}

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -109,22 +106,13 @@ impl ArrayBuilder for MessageStructBuilder {

impl Ros2ReflectionMessageParser {
fn new(num_rows: usize, message_schema: MessageSchema) -> anyhow::Result<Self> {
let mut fields = Vec::new();

// Build Arrow builders for each field in the message, preserving order
for field in &message_schema.spec.fields {
let name = field.name.clone();
let builder = arrow_builder_from_type(&field.ty, &message_schema.dependencies)?;
fields.push((
name.clone(),
FixedSizeListBuilder::with_capacity(builder, 1, num_rows),
));
}
let struct_builder =
struct_builder_from_message_spec(&message_schema.spec, &message_schema.dependencies)?;
let builder = FixedSizeListBuilder::with_capacity(struct_builder, 1, num_rows);

Ok(Self {
num_rows,
message_schema,
fields,
builder,
})
}
}
Expand All @@ -142,16 +130,29 @@ impl MessageParser for Ros2ReflectionMessageParser {
})?;

if let Value::Message(message_fields) = value {
// We always need to make sure to iterate over all our builders, adding null values whenever
// a field is missing from the message that we received.
for (field_name, builder) in &mut self.fields {
if let Some(field_value) = message_fields.get(field_name) {
append_value(builder.values(), field_value)?;
builder.append(true);
} else {
builder.append(false);
let message_struct_builder = self.builder.values();
let spec = &message_struct_builder.spec;

// Iterate over all struct fields based on the message spec order
for (i, spec_field) in spec.fields.iter().enumerate() {
if let Some(field_builder) = message_struct_builder
.builder
.field_builders_mut()
.get_mut(i)
{
if let Some(field_value) = message_fields.get(&spec_field.name) {
append_value(field_builder, field_value)?;
} else {
re_log::warn_once!(
"Field {} is missing from message content",
spec_field.name
);
}
}
}

message_struct_builder.builder.append(true);
self.builder.append(true);
} else {
return Err(anyhow::anyhow!("Expected message value, got {value:?}"));
}
Expand All @@ -165,65 +166,28 @@ impl MessageParser for Ros2ReflectionMessageParser {
let timelines = ctx.build_timelines();

let Self {
num_rows,
message_schema,
fields,
mut builder,
} = *self;

let archetype_name = message_schema.spec.name.clone().replace('/', ".");

if fields.is_empty() {
return create_empty_message_chunk(entity_path, timelines, num_rows, &archetype_name)
.map(|chunk| vec![chunk]);
}

let message_chunk = Chunk::from_auto_row_ids(
ChunkId::new(),
entity_path,
timelines,
fields
.into_iter()
.map(|(field_name, mut builder)| {
(
ComponentDescriptor::partial(field_name)
.with_builtin_archetype(archetype_name.clone()),
builder.finish().into(),
)
})
.collect(),
std::iter::once((
ComponentDescriptor::partial("message").with_builtin_archetype(archetype_name),
builder.finish().into(),
))
.collect(),
)
.map_err(|err| Error::Other(anyhow::anyhow!(err)))?;

Ok(vec![message_chunk])
}
}

fn create_empty_message_chunk(
entity_path: EntityPath,
timelines: IntMap<TimelineName, TimeColumn>,
num_rows: usize,
archetype_name: &str,
) -> anyhow::Result<Chunk> {
let mut empty_list = ListBuilder::with_capacity(NullBuilder::new(), num_rows);
for _ in 0..num_rows {
empty_list.values().append_null();
empty_list.append(true);
}

let chunk = Chunk::from_auto_row_ids(
ChunkId::new(),
entity_path,
timelines,
std::iter::once((
ComponentDescriptor::partial("empty").with_builtin_archetype(archetype_name),
empty_list.finish(),
))
.collect(),
)
.map_err(|err| Error::Other(anyhow::anyhow!(err)))?;
Ok(chunk)
}

fn downcast_builder<T: std::any::Any>(
builder: &mut dyn ArrayBuilder,
) -> Result<&mut T, Ros2ReflectionError> {
Expand Down
4 changes: 2 additions & 2 deletions tests/assets/rrd/snippets/howto/load_mcap.rrd
Git LFS file not shown