1
- use std:: collections:: BTreeMap ;
2
-
3
1
use arrow:: {
4
2
array:: {
5
3
ArrayBuilder , BinaryBuilder , BooleanBuilder , FixedSizeListBuilder , Float32Builder ,
@@ -20,7 +18,7 @@ use crate::{Error, LayerIdentifier, MessageLayer};
20
18
21
19
struct ProtobufMessageParser {
22
20
message_descriptor : MessageDescriptor ,
23
- fields : BTreeMap < String , FixedSizeListBuilder < Box < dyn ArrayBuilder > > > ,
21
+ builder : FixedSizeListBuilder < StructBuilder > ,
24
22
}
25
23
26
24
#[ derive( Debug , thiserror:: Error ) ]
@@ -47,9 +45,6 @@ enum ProtobufError {
47
45
#[ error( "unknown enum number {0}" ) ]
48
46
UnknownEnumNumber ( i32 ) ,
49
47
50
- #[ error( "unknown field name {0}" ) ]
51
- UnknownFieldName ( String ) ,
52
-
53
48
#[ error( "type {0} is not supported yet" ) ]
54
49
UnsupportedType ( & ' static str ) ,
55
50
@@ -59,19 +54,6 @@ enum ProtobufError {
59
54
60
55
impl ProtobufMessageParser {
61
56
fn new ( num_rows : usize , message_descriptor : MessageDescriptor ) -> Self {
62
- let mut fields = BTreeMap :: new ( ) ;
63
-
64
- // We recursively build up the Arrow builders for this particular message.
65
- for field_descr in message_descriptor. fields ( ) {
66
- let name = field_descr. name ( ) . to_owned ( ) ;
67
- let builder = arrow_builder_from_field ( & field_descr) ;
68
- fields. insert (
69
- name,
70
- FixedSizeListBuilder :: with_capacity ( builder, 1 , num_rows) ,
71
- ) ;
72
- re_log:: trace!( "Added Arrow builder for fields: {}" , field_descr. name( ) ) ;
73
- }
74
-
75
57
if message_descriptor. oneofs ( ) . len ( ) > 0 {
76
58
re_log:: warn_once!(
77
59
"`oneof` in schema {} is not supported yet." ,
@@ -84,9 +66,12 @@ impl ProtobufMessageParser {
84
66
) ;
85
67
}
86
68
69
+ let struct_builder = struct_builder_from_message ( & message_descriptor) ;
70
+ let builder = FixedSizeListBuilder :: with_capacity ( struct_builder, 1 , num_rows) ;
71
+
87
72
Self {
88
73
message_descriptor,
89
- fields ,
74
+ builder ,
90
75
}
91
76
}
92
77
}
@@ -103,22 +88,31 @@ impl MessageParser for ProtobufMessageParser {
103
88
} ,
104
89
) ?;
105
90
106
- // We always need to make sure to iterate over all our builders, adding null values whenever
107
- // a field is missing from the message that we received.
108
- for ( field, builder) in & mut self . fields {
109
- if let Some ( val) = dynamic_message. get_field_by_name ( field. as_str ( ) ) {
91
+ let struct_builder = self . builder . values ( ) ;
92
+
93
+ for ( ith_arrow_field, field_builder) in
94
+ struct_builder. field_builders_mut ( ) . iter_mut ( ) . enumerate ( )
95
+ {
96
+ // Protobuf fields are 1-indexed, so we need to map the i-th builder.
97
+ let protobuf_number = ith_arrow_field as u32 + 1 ;
98
+
99
+ if let Some ( val) = dynamic_message. get_field_by_number ( protobuf_number) {
110
100
let field = dynamic_message
111
101
. descriptor ( )
112
- . get_field_by_name ( field)
113
- . ok_or_else ( || ProtobufError :: UnknownFieldName ( field. to_owned ( ) ) ) ?;
114
- append_value ( builder. values ( ) , & field, val. as_ref ( ) ) ?;
115
- builder. append ( true ) ;
102
+ . get_field ( protobuf_number)
103
+ . ok_or_else ( || ProtobufError :: MissingField {
104
+ field : protobuf_number,
105
+ } ) ?;
106
+ append_value ( field_builder, & field, val. as_ref ( ) ) ?;
116
107
re_log:: trace!( "Field {}: Finished writing to builders" , field. full_name( ) ) ;
117
108
} else {
118
- builder . append ( false ) ;
109
+ append_null_to_builder ( field_builder ) ? ;
119
110
}
120
111
}
121
112
113
+ struct_builder. append ( true ) ;
114
+ self . builder . append ( true ) ;
115
+
122
116
Ok ( ( ) )
123
117
}
124
118
@@ -129,23 +123,19 @@ impl MessageParser for ProtobufMessageParser {
129
123
130
124
let Self {
131
125
message_descriptor,
132
- fields ,
126
+ mut builder ,
133
127
} = * self ;
134
128
135
129
let message_chunk = Chunk :: from_auto_row_ids (
136
130
ChunkId :: new ( ) ,
137
131
entity_path,
138
132
timelines,
139
- fields
140
- . into_iter ( )
141
- . map ( |( field, mut builder) | {
142
- (
143
- ComponentDescriptor :: partial ( field)
144
- . with_builtin_archetype ( message_descriptor. full_name ( ) ) ,
145
- builder. finish ( ) . into ( ) ,
146
- )
147
- } )
148
- . collect ( ) ,
133
+ std:: iter:: once ( (
134
+ ComponentDescriptor :: partial ( "message" )
135
+ . with_builtin_archetype ( message_descriptor. full_name ( ) ) ,
136
+ builder. finish ( ) . into ( ) ,
137
+ ) )
138
+ . collect ( ) ,
149
139
)
150
140
. map_err ( |err| Error :: Other ( anyhow:: anyhow!( err) ) ) ?;
151
141
@@ -166,6 +156,41 @@ fn downcast_err<'a, T: std::any::Any>(
166
156
} )
167
157
}
168
158
159
+ fn append_null_to_builder ( builder : & mut dyn ArrayBuilder ) -> Result < ( ) , ProtobufError > {
160
+ // Try to append null by downcasting to known builder types
161
+ if let Some ( b) = builder. as_any_mut ( ) . downcast_mut :: < BooleanBuilder > ( ) {
162
+ b. append_null ( ) ;
163
+ } else if let Some ( b) = builder. as_any_mut ( ) . downcast_mut :: < Int32Builder > ( ) {
164
+ b. append_null ( ) ;
165
+ } else if let Some ( b) = builder. as_any_mut ( ) . downcast_mut :: < Int64Builder > ( ) {
166
+ b. append_null ( ) ;
167
+ } else if let Some ( b) = builder. as_any_mut ( ) . downcast_mut :: < UInt32Builder > ( ) {
168
+ b. append_null ( ) ;
169
+ } else if let Some ( b) = builder. as_any_mut ( ) . downcast_mut :: < UInt64Builder > ( ) {
170
+ b. append_null ( ) ;
171
+ } else if let Some ( b) = builder. as_any_mut ( ) . downcast_mut :: < Float32Builder > ( ) {
172
+ b. append_null ( ) ;
173
+ } else if let Some ( b) = builder. as_any_mut ( ) . downcast_mut :: < Float64Builder > ( ) {
174
+ b. append_null ( ) ;
175
+ } else if let Some ( b) = builder. as_any_mut ( ) . downcast_mut :: < StringBuilder > ( ) {
176
+ b. append_null ( ) ;
177
+ } else if let Some ( b) = builder. as_any_mut ( ) . downcast_mut :: < BinaryBuilder > ( ) {
178
+ b. append_null ( ) ;
179
+ } else if let Some ( b) = builder. as_any_mut ( ) . downcast_mut :: < StructBuilder > ( ) {
180
+ b. append_null ( ) ;
181
+ } else if let Some ( b) = builder
182
+ . as_any_mut ( )
183
+ . downcast_mut :: < ListBuilder < Box < dyn ArrayBuilder > > > ( )
184
+ {
185
+ b. append_null ( ) ;
186
+ } else {
187
+ return Err ( ProtobufError :: UnsupportedType (
188
+ "Unknown builder type for append_null" ,
189
+ ) ) ;
190
+ }
191
+ Ok ( ( ) )
192
+ }
193
+
169
194
fn append_value (
170
195
builder : & mut dyn ArrayBuilder ,
171
196
field : & FieldDescriptor ,
0 commit comments