@@ -27,7 +27,10 @@ use chrono::{DateTime, NaiveDate, NaiveDateTime, Utc};
2727use datafusion:: arrow:: util:: bit_util:: round_upto_multiple_of_64;
2828use itertools:: Itertools ;
2929use serde_json:: Value ;
30- use std:: { collections:: HashMap , sync:: Arc } ;
30+ use std:: {
31+ collections:: { HashMap , HashSet } ,
32+ sync:: Arc ,
33+ } ;
3134use tracing:: error;
3235
3336use super :: EventFormat ;
@@ -74,6 +77,10 @@ impl EventFormat for Event {
7477 _ => unreachable ! ( "flatten would have failed beforehand" ) ,
7578 } ;
7679
80+ // Rename JSON keys starting with '@' to '_' to match the schema
81+ // Reject event if renaming would cause a key collision
82+ let value_arr = rename_json_keys ( value_arr) ?;
83+
7784 // collect all the keys from all the json objects in the request body
7885 let fields =
7986 collect_keys ( value_arr. iter ( ) ) . expect ( "fields can be collected from array of objects" ) ;
@@ -257,6 +264,49 @@ fn collect_keys<'a>(values: impl Iterator<Item = &'a Value>) -> Result<Vec<&'a s
257264 Ok ( keys)
258265}
259266
267+ /// Renames JSON keys to match the schema transformation using normalize_field_name.
268+ /// Returns an error if renaming would cause a key collision.
269+ fn rename_json_keys ( values : Vec < Value > ) -> Result < Vec < Value > , anyhow:: Error > {
270+ values
271+ . into_iter ( )
272+ . map ( |value| {
273+ if let Value :: Object ( map) = value {
274+ // Collect original keys to check for collisions
275+ let original_keys: HashSet < String > = map. keys ( ) . cloned ( ) . collect ( ) ;
276+
277+ // Check for collisions before renaming
278+ for key in map. keys ( ) {
279+ if key. starts_with ( '@' ) {
280+ let mut normalized_key = key. clone ( ) ;
281+ super :: normalize_field_name ( & mut normalized_key) ;
282+ if original_keys. contains ( & normalized_key) {
283+ return Err ( anyhow ! (
284+ "Key collision detected: '{}' and '{}' would both map to '{}'" ,
285+ key,
286+ normalized_key,
287+ normalized_key
288+ ) ) ;
289+ }
290+ }
291+ }
292+
293+ let new_map: serde_json:: Map < String , Value > = map
294+ . into_iter ( )
295+ . map ( |( mut key, val) | {
296+ if key. starts_with ( '@' ) {
297+ super :: normalize_field_name ( & mut key) ;
298+ }
299+ ( key, val)
300+ } )
301+ . collect ( ) ;
302+ Ok ( Value :: Object ( new_map) )
303+ } else {
304+ Ok ( value)
305+ }
306+ } )
307+ . collect ( )
308+ }
309+
260310fn fields_mismatch (
261311 schema : & [ Arc < Field > ] ,
262312 body : & Value ,
0 commit comments