@@ -154,34 +154,39 @@ impl EventFormat for Event {
154
154
collect_keys ( flattened. iter ( ) ) . expect ( "fields can be collected from array of objects" ) ;
155
155
156
156
let mut is_first = false ;
157
- let schema = match derive_arrow_schema ( stored_schema, fields) {
158
- Some ( schema) => schema,
159
- _ => {
160
- let mut infer_schema = infer_json_schema_from_iterator ( flattened. iter ( ) . map ( Ok ) )
161
- . map_err ( |err| {
162
- anyhow ! ( "Could not infer schema for this event due to err {:?}" , err)
163
- } ) ?;
164
- let new_infer_schema = super :: update_field_type_in_schema (
165
- Arc :: new ( infer_schema) ,
166
- Some ( stored_schema) ,
167
- time_partition,
168
- Some ( & flattened) ,
169
- schema_version,
170
- ) ;
171
- infer_schema = Schema :: new ( new_infer_schema. fields ( ) . clone ( ) ) ;
172
- Schema :: try_merge ( vec ! [
173
- Schema :: new( stored_schema. values( ) . cloned( ) . collect:: <Fields >( ) ) ,
174
- infer_schema. clone( ) ,
175
- ] ) . map_err ( |err| anyhow ! ( "Could not merge schema of this event with that of the existing stream. {:?}" , err) ) ?;
176
- is_first = true ;
177
- infer_schema
178
- . fields
179
- . iter ( )
180
- . filter ( |field| !field. data_type ( ) . is_null ( ) )
181
- . cloned ( )
182
- . sorted_by ( |a, b| a. name ( ) . cmp ( b. name ( ) ) )
183
- . collect ( )
184
- }
157
+ let schema = if let Some ( schema) = derive_arrow_schema ( stored_schema, fields) {
158
+ schema
159
+ } else {
160
+ let mut infer_schema = infer_json_schema_from_iterator ( flattened. iter ( ) . map ( Ok ) )
161
+ . map_err ( |err| {
162
+ anyhow ! ( "Could not infer schema for this event due to err {:?}" , err)
163
+ } ) ?;
164
+ let new_infer_schema = super :: update_field_type_in_schema (
165
+ Arc :: new ( infer_schema) ,
166
+ Some ( stored_schema) ,
167
+ time_partition,
168
+ Some ( & flattened) ,
169
+ schema_version,
170
+ ) ;
171
+ infer_schema = Schema :: new ( new_infer_schema. fields ( ) . clone ( ) ) ;
172
+ Schema :: try_merge ( vec ! [
173
+ Schema :: new( stored_schema. values( ) . cloned( ) . collect:: <Fields >( ) ) ,
174
+ infer_schema. clone( ) ,
175
+ ] )
176
+ . map_err ( |err| {
177
+ anyhow ! (
178
+ "Could not merge schema of this event with that of the existing stream. {:?}" ,
179
+ err
180
+ )
181
+ } ) ?;
182
+ is_first = true ;
183
+ infer_schema
184
+ . fields
185
+ . iter ( )
186
+ . filter ( |field| !field. data_type ( ) . is_null ( ) )
187
+ . cloned ( )
188
+ . sorted_by ( |a, b| a. name ( ) . cmp ( b. name ( ) ) )
189
+ . collect ( )
185
190
} ;
186
191
187
192
if flattened
0 commit comments