@@ -59,12 +59,10 @@ impl EventFormat for Event {
59
59
// also extract the arrow schema, tags and metadata from the incoming json
60
60
fn to_data (
61
61
self ,
62
- schema : & HashMap < String , Arc < Field > > ,
62
+ stored_schema : & HashMap < String , Arc < Field > > ,
63
63
time_partition : Option < & String > ,
64
64
schema_version : SchemaVersion ,
65
65
) -> Result < ( Self :: Data , Vec < Arc < Field > > , bool ) , anyhow:: Error > {
66
- let stream_schema = schema;
67
-
68
66
// incoming event may be a single json or a json array
69
67
// but Data (type defined above) is a vector of json values
70
68
// hence we need to convert the incoming event to a vector of json values
@@ -79,23 +77,23 @@ impl EventFormat for Event {
79
77
collect_keys ( value_arr. iter ( ) ) . expect ( "fields can be collected from array of objects" ) ;
80
78
81
79
let mut is_first = false ;
82
- let schema = match derive_arrow_schema ( stream_schema , fields) {
83
- Ok ( schema) => schema,
84
- Err ( _ ) => {
80
+ let schema = match derive_arrow_schema ( stored_schema , fields) {
81
+ Some ( schema) => schema,
82
+ _ => {
85
83
let mut infer_schema = infer_json_schema_from_iterator ( value_arr. iter ( ) . map ( Ok ) )
86
84
. map_err ( |err| {
87
85
anyhow ! ( "Could not infer schema for this event due to err {:?}" , err)
88
86
} ) ?;
89
87
let new_infer_schema = super :: update_field_type_in_schema (
90
88
Arc :: new ( infer_schema) ,
91
- Some ( stream_schema ) ,
89
+ Some ( stored_schema ) ,
92
90
time_partition,
93
91
Some ( & value_arr) ,
94
92
schema_version,
95
93
) ;
96
94
infer_schema = Schema :: new ( new_infer_schema. fields ( ) . clone ( ) ) ;
97
95
Schema :: try_merge ( vec ! [
98
- Schema :: new( stream_schema . values( ) . cloned( ) . collect:: <Fields >( ) ) ,
96
+ Schema :: new( stored_schema . values( ) . cloned( ) . collect:: <Fields >( ) ) ,
99
97
infer_schema. clone( ) ,
100
98
] ) . map_err ( |err| anyhow ! ( "Could not merge schema of this event with that of the existing stream. {:?}" , err) ) ?;
101
99
is_first = true ;
@@ -221,51 +219,44 @@ fn extract_and_parse_time(
221
219
222
220
// Returns arrow schema with the fields that are present in the request body
223
221
// This schema is an input to convert the request body to arrow record batch
222
+ // Returns None if even one of the fields in the json is new and not seen before
224
223
fn derive_arrow_schema (
225
224
schema : & HashMap < String , Arc < Field > > ,
226
- fields : Vec < & str > ,
227
- ) -> Result < Vec < Arc < Field > > , ( ) > {
225
+ fields : HashSet < & str > ,
226
+ ) -> Option < Vec < Arc < Field > > > {
228
227
let mut res = Vec :: with_capacity ( fields. len ( ) ) ;
229
- let fields = fields. into_iter ( ) . map ( |field_name| schema. get ( field_name) ) ;
230
- for field in fields {
231
- let Some ( field) = field else { return Err ( ( ) ) } ;
228
+ for field_name in fields {
229
+ let field = schema. get ( field_name) ?;
232
230
res. push ( field. clone ( ) )
233
231
}
234
- Ok ( res)
232
+
233
+ Some ( res)
235
234
}
236
235
237
- fn collect_keys < ' a > ( values : impl Iterator < Item = & ' a Value > ) -> Result < Vec < & ' a str > , ( ) > {
238
- let mut keys = Vec :: new ( ) ;
236
+ // Returns a list of keys that are present in the given iterable of JSON objects
237
+ // Returns None if even one of the value is not an Object
238
+ fn collect_keys < ' a > ( values : impl Iterator < Item = & ' a Value > ) -> Option < HashSet < & ' a str > > {
239
+ let mut keys = HashSet :: new ( ) ;
239
240
for value in values {
240
- if let Some ( obj) = value. as_object ( ) {
241
- for key in obj. keys ( ) {
242
- match keys. binary_search ( & key. as_str ( ) ) {
243
- Ok ( _) => ( ) ,
244
- Err ( pos) => {
245
- keys. insert ( pos, key. as_str ( ) ) ;
246
- }
247
- }
248
- }
249
- } else {
250
- return Err ( ( ) ) ;
241
+ let obj = value. as_object ( ) ?;
242
+ for key in obj. keys ( ) {
243
+ keys. insert ( key. as_str ( ) ) ;
251
244
}
252
245
}
253
- Ok ( keys)
246
+
247
+ Some ( keys)
254
248
}
255
249
250
+ // Returns true when the field doesn't exist in schema or has an invalid type
256
251
fn fields_mismatch ( schema : & [ Arc < Field > ] , body : & Value , schema_version : SchemaVersion ) -> bool {
257
- for ( name, val) in body. as_object ( ) . expect ( "body is of object variant" ) {
258
- if val. is_null ( ) {
259
- continue ;
260
- }
261
- let Some ( field) = get_field ( schema, name) else {
262
- return true ;
263
- } ;
264
- if !valid_type ( field. data_type ( ) , val, schema_version) {
265
- return true ;
266
- }
267
- }
268
- false
252
+ body. as_object ( )
253
+ . expect ( "body is of object variant" )
254
+ . iter ( )
255
+ . any ( |( key, value) | {
256
+ !value. is_null ( )
257
+ && get_field ( schema, key)
258
+ . is_none_or ( |field| !valid_type ( field. data_type ( ) , value, schema_version) )
259
+ } )
269
260
}
270
261
271
262
fn valid_type ( data_type : & DataType , value : & Value , schema_version : SchemaVersion ) -> bool {
0 commit comments