diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 76b857f0e..b782d1f3c 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -118,6 +118,9 @@ impl EventFormat for Event { )); } + // Rename JSON keys starting with '@' to '_' to match the schema + let value_arr = rename_json_keys(value_arr); + Ok((value_arr, schema, is_first)) } @@ -257,6 +260,27 @@ fn collect_keys<'a>(values: impl Iterator) -> Result) -> Vec { + values + .into_iter() + .map(|value| { + if let Value::Object(map) = value { + let new_map: serde_json::Map = map + .into_iter() + .map(|(mut key, val)| { + super::normalize_field_name(&mut key); + (key, val) + }) + .collect(); + Value::Object(new_map) + } else { + value + } + }) + .collect() +} + fn fields_mismatch( schema: &[Arc], body: &Value, @@ -267,7 +291,10 @@ fn fields_mismatch( if val.is_null() { continue; } - let Some(field) = get_field(schema, name) else { + // Normalize field name to match schema transformation + let mut lookup_name = name.to_string(); + super::normalize_field_name(&mut lookup_name); + let Some(field) = get_field(schema, &lookup_name) else { return true; }; if !valid_type(field, val, schema_version, static_schema_flag) { diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index 56d3a676d..4157627b6 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -57,6 +57,15 @@ static TIME_FIELD_NAME_PARTS: [&str; 11] = [ ]; type EventSchema = Vec>; +/// Normalizes a field name by replacing leading '@' with '_'. +/// Fields starting with '@' are renamed to start with '_'. +#[inline] +pub fn normalize_field_name(name: &mut String) { + if let Some(stripped) = name.strip_prefix('@') { + *name = format!("_{}", stripped); + } +} + /// Source of the logs, used to perform special processing for certain sources #[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)] pub enum LogSource { @@ -335,7 +344,9 @@ pub fn override_data_type( .fields() .iter() .map(|field| { - let field_name = field.name().as_str(); + // Normalize field names - replace '@' prefix with '_' + let mut field_name = field.name().to_string(); + normalize_field_name(&mut field_name); match (schema_version, map.get(field.name())) { // in V1 for new fields in json named "time"/"date" or such and having inferred // type string, that can be parsed as timestamp, use the timestamp type.