1
+ use std:: sync:: Arc ;
2
+
1
3
use anyhow:: Context as _;
2
4
use arrow:: {
3
5
array:: {
4
- ArrayBuilder , ArrowPrimitiveType , BinaryBuilder , BooleanBuilder , FixedSizeListBuilder ,
5
- Float32Builder , Float64Builder , Int8Builder , Int16Builder , Int32Builder , Int64Builder ,
6
- ListBuilder , PrimitiveBuilder , StringBuilder , StructBuilder , UInt8Builder , UInt16Builder ,
7
- UInt32Builder , UInt64Builder ,
6
+ ArrayBuilder , ArrowPrimitiveType , BooleanBuilder , FixedSizeListBuilder , Float32Builder ,
7
+ Float64Builder , Int8Builder , Int16Builder , Int32Builder , Int64Builder , ListBuilder ,
8
+ PrimitiveBuilder , StringBuilder , StructBuilder , UInt8Builder , UInt16Builder , UInt32Builder ,
9
+ UInt64Builder ,
8
10
} ,
9
11
datatypes:: {
10
12
DataType , Field , Fields , Float32Type , Float64Type , Int8Type , Int16Type , Int32Type ,
11
13
Int64Type , UInt8Type , UInt16Type , UInt32Type , UInt64Type ,
12
14
} ,
13
15
} ;
14
16
use cdr_encoding:: CdrDeserializer ;
15
- use re_chunk:: { Chunk , ChunkId } ;
17
+ use re_chunk:: {
18
+ Chunk , ChunkId , EntityPath , TimeColumn , TimelineName , external:: nohash_hasher:: IntMap ,
19
+ } ;
16
20
use re_ros_msg:: {
17
21
MessageSchema ,
18
22
deserialize:: { MapResolver , MessageSeed , Value , primitive_array:: PrimitiveArray } ,
@@ -68,24 +72,24 @@ pub enum Ros2ReflectionError {
68
72
}
69
73
70
74
impl Ros2ReflectionMessageParser {
71
- fn new ( num_rows : usize , message_schema : MessageSchema ) -> Self {
75
+ fn new ( num_rows : usize , message_schema : MessageSchema ) -> anyhow :: Result < Self > {
72
76
let mut fields = Vec :: new ( ) ;
73
77
74
78
// Build Arrow builders for each field in the message, preserving order
75
79
for field in & message_schema. spec . fields {
76
80
let name = field. name . clone ( ) ;
77
- let builder = arrow_builder_from_type ( & field. ty , & message_schema. dependencies ) ;
81
+ let builder = arrow_builder_from_type ( & field. ty , & message_schema. dependencies ) ? ;
78
82
fields. push ( (
79
83
name. clone ( ) ,
80
84
FixedSizeListBuilder :: with_capacity ( builder, 1 , num_rows) ,
81
85
) ) ;
82
86
}
83
87
84
- Self {
88
+ Ok ( Self {
85
89
num_rows,
86
90
message_schema,
87
91
fields,
88
- }
92
+ } )
89
93
}
90
94
}
91
95
@@ -133,26 +137,8 @@ impl MessageParser for Ros2ReflectionMessageParser {
133
137
let archetype_name = message_schema. spec . name . clone ( ) . replace ( '/' , "." ) ;
134
138
135
139
if fields. is_empty ( ) {
136
- // Create a list array with `num_rows` entries, where each entry is an empty list
137
- let empty_list = arrow:: array:: ListArray :: new_null (
138
- std:: sync:: Arc :: new ( Field :: new ( "empty" , DataType :: Null , true ) ) ,
139
- num_rows,
140
- ) ;
141
-
142
- let chunk = Chunk :: from_auto_row_ids (
143
- ChunkId :: new ( ) ,
144
- entity_path,
145
- timelines,
146
- std:: iter:: once ( (
147
- ComponentDescriptor :: partial ( "empty" )
148
- . with_builtin_archetype ( archetype_name. clone ( ) ) ,
149
- empty_list,
150
- ) )
151
- . collect ( ) ,
152
- )
153
- . map_err ( |err| Error :: Other ( anyhow:: anyhow!( err) ) ) ?;
154
-
155
- return Ok ( vec ! [ chunk] ) ;
140
+ return create_empty_message_chunk ( entity_path, timelines, num_rows, & archetype_name)
141
+ . map ( |chunk| vec ! [ chunk] ) ;
156
142
}
157
143
158
144
let message_chunk = Chunk :: from_auto_row_ids (
@@ -176,7 +162,36 @@ impl MessageParser for Ros2ReflectionMessageParser {
176
162
}
177
163
}
178
164
179
- fn downcast_err < T : std:: any:: Any > (
165
+ fn create_empty_message_chunk (
166
+ entity_path : EntityPath ,
167
+ timelines : IntMap < TimelineName , TimeColumn > ,
168
+ num_rows : usize ,
169
+ archetype_name : & str ,
170
+ ) -> Result < Chunk , anyhow:: Error > {
171
+ let empty_list = arrow:: array:: ListArray :: new_null (
172
+ std:: sync:: Arc :: new ( Field :: new (
173
+ "empty" ,
174
+ DataType :: FixedSizeList ( Arc :: new ( Field :: new ( "item" , DataType :: Null , true ) ) , 1 ) ,
175
+ true ,
176
+ ) ) ,
177
+ num_rows,
178
+ ) ;
179
+
180
+ let chunk = Chunk :: from_auto_row_ids (
181
+ ChunkId :: new ( ) ,
182
+ entity_path,
183
+ timelines,
184
+ std:: iter:: once ( (
185
+ ComponentDescriptor :: partial ( "empty" ) . with_builtin_archetype ( archetype_name) ,
186
+ empty_list,
187
+ ) )
188
+ . collect ( ) ,
189
+ )
190
+ . map_err ( |err| Error :: Other ( anyhow:: anyhow!( err) ) ) ?;
191
+ Ok ( chunk)
192
+ }
193
+
194
+ fn downcast_builder < T : std:: any:: Any > (
180
195
builder : & mut dyn ArrayBuilder ,
181
196
) -> Result < & mut T , Ros2ReflectionError > {
182
197
builder. as_any_mut ( ) . downcast_mut :: < T > ( ) . ok_or_else ( || {
@@ -193,8 +208,8 @@ where
193
208
T : ArrowPrimitiveType ,
194
209
PrimitiveBuilder < T > : ' static ,
195
210
{
196
- let list_builder = downcast_err :: < ListBuilder < Box < dyn ArrayBuilder > > > ( builder) ?;
197
- let values_builder = downcast_err :: < PrimitiveBuilder < T > > ( list_builder. values ( ) ) ?;
211
+ let list_builder = downcast_builder :: < ListBuilder < Box < dyn ArrayBuilder > > > ( builder) ?;
212
+ let values_builder = downcast_builder :: < PrimitiveBuilder < T > > ( list_builder. values ( ) ) ?;
198
213
values_builder. append_slice ( vec) ;
199
214
list_builder. append ( true ) ;
200
215
Ok ( ( ) )
@@ -207,8 +222,8 @@ fn append_primitive_array(
207
222
match prim_array {
208
223
PrimitiveArray :: Bool ( vec) => {
209
224
// `Bool` is a special case since Arrow doesn't have a primitive boolean array
210
- let list_builder = downcast_err :: < ListBuilder < Box < dyn ArrayBuilder > > > ( builder) ?;
211
- let values_builder = downcast_err :: < BooleanBuilder > ( list_builder. values ( ) ) ?;
225
+ let list_builder = downcast_builder :: < ListBuilder < Box < dyn ArrayBuilder > > > ( builder) ?;
226
+ let values_builder = downcast_builder :: < BooleanBuilder > ( list_builder. values ( ) ) ?;
212
227
values_builder. append_slice ( vec) ;
213
228
list_builder. append ( true ) ;
214
229
Ok ( ( ) )
@@ -224,8 +239,8 @@ fn append_primitive_array(
224
239
PrimitiveArray :: F32 ( vec) => append_slice_to_list :: < Float32Type > ( builder, vec) ,
225
240
PrimitiveArray :: F64 ( vec) => append_slice_to_list :: < Float64Type > ( builder, vec) ,
226
241
PrimitiveArray :: String ( items) => {
227
- let list_builder = downcast_err :: < ListBuilder < Box < dyn ArrayBuilder > > > ( builder) ?;
228
- let values_builder = downcast_err :: < StringBuilder > ( list_builder. values ( ) ) ?;
242
+ let list_builder = downcast_builder :: < ListBuilder < Box < dyn ArrayBuilder > > > ( builder) ?;
243
+ let values_builder = downcast_builder :: < StringBuilder > ( list_builder. values ( ) ) ?;
229
244
for item in items {
230
245
values_builder. append_value ( item) ;
231
246
}
@@ -241,22 +256,22 @@ fn append_value(
241
256
schema : & MessageSchema ,
242
257
) -> Result < ( ) , Ros2ReflectionError > {
243
258
match val {
244
- Value :: Bool ( x) => downcast_err :: < BooleanBuilder > ( builder) ?. append_value ( * x) ,
245
- Value :: I8 ( x) => downcast_err :: < Int8Builder > ( builder) ?. append_value ( * x) ,
246
- Value :: U8 ( x) => downcast_err :: < UInt8Builder > ( builder) ?. append_value ( * x) ,
247
- Value :: I16 ( x) => downcast_err :: < Int16Builder > ( builder) ?. append_value ( * x) ,
248
- Value :: U16 ( x) => downcast_err :: < UInt16Builder > ( builder) ?. append_value ( * x) ,
249
- Value :: I32 ( x) => downcast_err :: < Int32Builder > ( builder) ?. append_value ( * x) ,
250
- Value :: U32 ( x) => downcast_err :: < UInt32Builder > ( builder) ?. append_value ( * x) ,
251
- Value :: I64 ( x) => downcast_err :: < Int64Builder > ( builder) ?. append_value ( * x) ,
252
- Value :: U64 ( x) => downcast_err :: < UInt64Builder > ( builder) ?. append_value ( * x) ,
253
- Value :: F32 ( x) => downcast_err :: < Float32Builder > ( builder) ?. append_value ( * x) ,
254
- Value :: F64 ( x) => downcast_err :: < Float64Builder > ( builder) ?. append_value ( * x) ,
259
+ Value :: Bool ( x) => downcast_builder :: < BooleanBuilder > ( builder) ?. append_value ( * x) ,
260
+ Value :: I8 ( x) => downcast_builder :: < Int8Builder > ( builder) ?. append_value ( * x) ,
261
+ Value :: U8 ( x) => downcast_builder :: < UInt8Builder > ( builder) ?. append_value ( * x) ,
262
+ Value :: I16 ( x) => downcast_builder :: < Int16Builder > ( builder) ?. append_value ( * x) ,
263
+ Value :: U16 ( x) => downcast_builder :: < UInt16Builder > ( builder) ?. append_value ( * x) ,
264
+ Value :: I32 ( x) => downcast_builder :: < Int32Builder > ( builder) ?. append_value ( * x) ,
265
+ Value :: U32 ( x) => downcast_builder :: < UInt32Builder > ( builder) ?. append_value ( * x) ,
266
+ Value :: I64 ( x) => downcast_builder :: < Int64Builder > ( builder) ?. append_value ( * x) ,
267
+ Value :: U64 ( x) => downcast_builder :: < UInt64Builder > ( builder) ?. append_value ( * x) ,
268
+ Value :: F32 ( x) => downcast_builder :: < Float32Builder > ( builder) ?. append_value ( * x) ,
269
+ Value :: F64 ( x) => downcast_builder :: < Float64Builder > ( builder) ?. append_value ( * x) ,
255
270
Value :: String ( x) => {
256
- downcast_err :: < StringBuilder > ( builder) ?. append_value ( x. clone ( ) ) ;
271
+ downcast_builder :: < StringBuilder > ( builder) ?. append_value ( x. clone ( ) ) ;
257
272
}
258
273
Value :: Message ( message_fields) => {
259
- let struct_builder = downcast_err :: < StructBuilder > ( builder) ?;
274
+ let struct_builder = downcast_builder :: < StructBuilder > ( builder) ?;
260
275
261
276
// For nested messages, we need to find the matching specification from dependencies
262
277
// Since we don't have type information here, we'll try to match by field names
@@ -287,7 +302,7 @@ fn append_value(
287
302
struct_builder. append ( true ) ;
288
303
}
289
304
Value :: Array ( vec) | Value :: Sequence ( vec) => {
290
- let list_builder = downcast_err :: < ListBuilder < Box < dyn ArrayBuilder > > > ( builder) ?;
305
+ let list_builder = downcast_builder :: < ListBuilder < Box < dyn ArrayBuilder > > > ( builder) ?;
291
306
292
307
for val in vec {
293
308
append_value ( list_builder. values ( ) , val, schema) ?;
@@ -318,29 +333,29 @@ fn find_matching_message_spec<'a>(
318
333
fn struct_builder_from_message_spec (
319
334
spec : & MessageSpecification ,
320
335
dependencies : & [ MessageSpecification ] ,
321
- ) -> StructBuilder {
336
+ ) -> anyhow :: Result < StructBuilder > {
322
337
let fields = spec
323
338
. fields
324
339
. iter ( )
325
340
. map ( |f| {
326
- (
327
- arrow_field_from_type ( & f. ty , & f. name , dependencies) ,
328
- arrow_builder_from_type ( & f. ty , dependencies) ,
329
- )
341
+ Ok ( (
342
+ arrow_field_from_type ( & f. ty , & f. name , dependencies) ? ,
343
+ arrow_builder_from_type ( & f. ty , dependencies) ? ,
344
+ ) )
330
345
} )
331
- . collect :: < Vec < _ > > ( ) ;
346
+ . collect :: < anyhow :: Result < Vec < _ > > > ( ) ? ;
332
347
333
348
let ( fields, field_builders) : ( Vec < Field > , Vec < Box < dyn ArrayBuilder > > ) =
334
349
fields. into_iter ( ) . unzip ( ) ;
335
350
336
- StructBuilder :: new ( fields, field_builders)
351
+ Ok ( StructBuilder :: new ( fields, field_builders) )
337
352
}
338
353
339
354
fn arrow_builder_from_type (
340
355
ty : & Type ,
341
356
dependencies : & [ MessageSpecification ] ,
342
- ) -> Box < dyn ArrayBuilder > {
343
- match ty {
357
+ ) -> anyhow :: Result < Box < dyn ArrayBuilder > > {
358
+ Ok ( match ty {
344
359
Type :: BuiltIn ( p) => match p {
345
360
BuiltInType :: Bool => Box :: new ( BooleanBuilder :: new ( ) ) ,
346
361
BuiltInType :: Byte | BuiltInType :: UInt8 => Box :: new ( UInt8Builder :: new ( ) ) ,
@@ -357,25 +372,30 @@ fn arrow_builder_from_type(
357
372
} ,
358
373
Type :: Complex ( complex_type) => {
359
374
// Look up the message spec in dependencies
360
- if let Some ( spec) = resolve_complex_type ( complex_type, dependencies) {
361
- Box :: new ( struct_builder_from_message_spec ( spec, dependencies) )
362
- } else {
363
- re_log:: warn_once!( "Could not resolve complex type: {:?}" , complex_type) ;
364
- Box :: new ( BinaryBuilder :: new ( ) ) // Fallback to binary
365
- }
375
+ let spec = resolve_complex_type ( complex_type, dependencies) . ok_or_else ( || {
376
+ anyhow:: anyhow!( "Could not resolve complex type: {:?}" , complex_type)
377
+ } ) ?;
378
+ Box :: new ( struct_builder_from_message_spec ( spec, dependencies) ?)
366
379
}
367
380
Type :: Array { ty, .. } => {
368
- Box :: new ( ListBuilder :: new ( arrow_builder_from_type ( ty, dependencies) ) )
381
+ Box :: new ( ListBuilder :: new ( arrow_builder_from_type ( ty, dependencies) ? ) )
369
382
}
370
- }
383
+ } )
371
384
}
372
385
373
- fn arrow_field_from_type ( ty : & Type , name : & str , dependencies : & [ MessageSpecification ] ) -> Field {
374
- Field :: new ( name, datatype_from_type ( ty, dependencies) , true )
386
+ fn arrow_field_from_type (
387
+ ty : & Type ,
388
+ name : & str ,
389
+ dependencies : & [ MessageSpecification ] ,
390
+ ) -> anyhow:: Result < Field > {
391
+ datatype_from_type ( ty, dependencies) . map ( |data_type| Field :: new ( name, data_type, true ) )
375
392
}
376
393
377
- fn datatype_from_type ( ty : & Type , dependencies : & [ MessageSpecification ] ) -> DataType {
378
- match ty {
394
+ fn datatype_from_type (
395
+ ty : & Type ,
396
+ dependencies : & [ MessageSpecification ] ,
397
+ ) -> anyhow:: Result < DataType > {
398
+ Ok ( match ty {
379
399
Type :: BuiltIn ( p) => match p {
380
400
BuiltInType :: Bool => DataType :: Boolean ,
381
401
BuiltInType :: Byte | BuiltInType :: UInt8 => DataType :: UInt8 ,
@@ -391,23 +411,22 @@ fn datatype_from_type(ty: &Type, dependencies: &[MessageSpecification]) -> DataT
391
411
BuiltInType :: String ( _) | BuiltInType :: WString ( _) => DataType :: Utf8 , // No wstring in Arrow
392
412
} ,
393
413
Type :: Complex ( complex_type) => {
394
- if let Some ( spec) = resolve_complex_type ( complex_type, dependencies) {
395
- let fields = spec
396
- . fields
397
- . iter ( )
398
- . map ( |f| arrow_field_from_type ( & f. ty , & f. name , dependencies) )
399
- . collect :: < Fields > ( ) ;
400
- DataType :: Struct ( fields)
401
- } else {
402
- DataType :: Binary // Fallback
403
- }
414
+ let spec = resolve_complex_type ( complex_type, dependencies) . ok_or_else ( || {
415
+ anyhow:: anyhow!( "Could not resolve complex type: {:?}" , complex_type)
416
+ } ) ?;
417
+ let fields = spec
418
+ . fields
419
+ . iter ( )
420
+ . map ( |f| arrow_field_from_type ( & f. ty , & f. name , dependencies) )
421
+ . collect :: < anyhow:: Result < Fields > > ( ) ?;
422
+ DataType :: Struct ( fields)
404
423
}
405
424
Type :: Array { ty, size } => match size {
406
425
ArraySize :: Fixed ( _) | ArraySize :: Bounded ( _) | ArraySize :: Unbounded => {
407
- DataType :: new_list ( datatype_from_type ( ty, dependencies) , true )
426
+ DataType :: new_list ( datatype_from_type ( ty, dependencies) ? , true )
408
427
}
409
428
} ,
410
- }
429
+ } )
411
430
}
412
431
413
432
fn resolve_complex_type < ' a > (
@@ -461,7 +480,12 @@ impl MessageLayer for McapRos2ReflectionLayer {
461
480
let found = self
462
481
. schemas_per_topic
463
482
. insert ( channel. topic . clone ( ) , message_schema) ;
464
- debug_assert ! ( found. is_none( ) ) ;
483
+
484
+ debug_assert ! (
485
+ found. is_none( ) ,
486
+ "Duplicate schema for topic {}" ,
487
+ channel. topic
488
+ ) ;
465
489
}
466
490
467
491
Ok ( ( ) )
@@ -493,9 +517,8 @@ impl MessageLayer for McapRos2ReflectionLayer {
493
517
num_rows : usize ,
494
518
) -> Option < Box < dyn MessageParser > > {
495
519
let message_schema = self . schemas_per_topic . get ( & channel. topic ) ?;
496
- Some ( Box :: new ( Ros2ReflectionMessageParser :: new (
497
- num_rows,
498
- message_schema. clone ( ) ,
499
- ) ) )
520
+ Some ( Box :: new (
521
+ Ros2ReflectionMessageParser :: new ( num_rows, message_schema. clone ( ) ) . ok ( ) ?,
522
+ ) )
500
523
}
501
524
}
0 commit comments