Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ impl EventFormat for Event {
));
}

// Rename JSON keys starting with '@' to '_' to match the schema
let value_arr = rename_json_keys(value_arr);
Comment on lines +121 to +122
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

cd src/event/format && wc -l json.rs

Repository: parseablehq/parseable

Length of output: 76


🏁 Script executed:

cd src/event/format && head -150 json.rs | tail -50

Repository: parseablehq/parseable

Length of output: 1820


🏁 Script executed:

cd src/event/format && sed -n '70,130p' json.rs

Repository: parseablehq/parseable

Length of output: 2711


🏁 Script executed:

rg -n "fn collect_keys" --type rust

Repository: parseablehq/parseable

Length of output: 184


🏁 Script executed:

rg -n "fn derive_arrow_schema" --type rust

Repository: parseablehq/parseable

Length of output: 117


🏁 Script executed:

rg -n "fn rename_json_keys" --type rust

Repository: parseablehq/parseable

Length of output: 149


🏁 Script executed:

sed -n '231,243p' src/event/format/json.rs

Repository: parseablehq/parseable

Length of output: 454


🏁 Script executed:

sed -n '244,263p' src/event/format/json.rs

Repository: parseablehq/parseable

Length of output: 691


🏁 Script executed:

sed -n '264,290p' src/event/format/json.rs

Repository: parseablehq/parseable

Length of output: 868


🏁 Script executed:

rg -n "fn normalize_field_name" --type rust -A 5

Repository: parseablehq/parseable

Length of output: 379


🏁 Script executed:

rg -n "stream_schema" src/event/format/json.rs --type rust -B 2 -A 2

Repository: parseablehq/parseable

Length of output: 1112


🏁 Script executed:

rg -n "stream_schema\|schema_schema" src/event/format/json.rs --type rust -B 3 -A 3 | head -60

Repository: parseablehq/parseable

Length of output: 47


🏁 Script executed:

rg -n "HashMap.*Field\|stream_schema.*insert" src/event/format/json.rs --type rust -B 2 -A 2

Repository: parseablehq/parseable

Length of output: 47


🏁 Script executed:

rg -n "pub fn into_event\|fn load_schema" --type rust | head -20

Repository: parseablehq/parseable

Length of output: 47


🏁 Script executed:

sed -n '55,80p' src/event/format/json.rs

Repository: parseablehq/parseable

Length of output: 1136


🏁 Script executed:

rg -n "impl.*Handler.*for.*JsonEvent" src/event/format/json.rs --type rust -B 2 -A 15

Repository: parseablehq/parseable

Length of output: 47


🏁 Script executed:

rg -n "HashMap<String, Arc<Field>>" --type rust | head -20

Repository: parseablehq/parseable

Length of output: 1416


🏁 Script executed:

rg -n "load_schema\|get_schema\|create_stream_and_schema" --type rust -B 2 -A 5 | head -40

Repository: parseablehq/parseable

Length of output: 47


🏁 Script executed:

sed -n '869,890p' src/streams.rs

Repository: parseablehq/parseable

Length of output: 122


🏁 Script executed:

rg -n "get_schema_raw" src/parseable --type rust -B 3 -A 10 | head -60

Repository: parseablehq/parseable

Length of output: 961


🏁 Script executed:

rg -n "infer_json_schema_from_iterator\|update_field_type_in_schema" src/event/format --type rust -B 2 -A 5 | head -80

Repository: parseablehq/parseable

Length of output: 47


🏁 Script executed:

sed -n '82,110p' src/event/format/json.rs

Repository: parseablehq/parseable

Length of output: 1443


🏁 Script executed:

rg -n "schema.insert\|fields_to_map\|update.*schema" src/handlers/http/ingest.rs --type rust -B 3 -A 8 | head -80

Repository: parseablehq/parseable

Length of output: 47


🏁 Script executed:

sed -n '594,620p' src/handlers/http/ingest.rs

Repository: parseablehq/parseable

Length of output: 861


🏁 Script executed:

rg -n "Metadata\|serialize_schema\|schema.clone\|write_schema" src/metadata.rs --type rust -B 2 -A 5 | head -100

Repository: parseablehq/parseable

Length of output: 47


🏁 Script executed:

sed -n '75,115p' src/metadata.rs

Repository: parseablehq/parseable

Length of output: 1421


🏁 Script executed:

rg -n "write.*metadata\|save_schema\|update_schema" src/parseable --type rust -B 3 -A 8 | head -120

Repository: parseablehq/parseable

Length of output: 47


🏁 Script executed:

rg -n "fn create.*stream\|LogStreamMetadata::new" src/parseable/streams.rs --type rust -B 5 -A 15 | head -100

Repository: parseablehq/parseable

Length of output: 47


🏁 Script executed:

rg -n "into_recordbatch\|into_event" src/event/format/json.rs --type rust -B 3 -A 20 | head -80

Repository: parseablehq/parseable

Length of output: 47


🏁 Script executed:

sed -n '140,180p' src/event/format/json.rs

Repository: parseablehq/parseable

Length of output: 1427


🏁 Script executed:

rg -n "fn create_stream\|schema.*insert\|metadata.*write" src/handlers/http/ingest.rs --type rust -B 5 -A 15 | head -150

Repository: parseablehq/parseable

Length of output: 47


🏁 Script executed:

rg -n "create_stream_and_schema" src --type rust -B 5 -A 20 | head -100

Repository: parseablehq/parseable

Length of output: 6112


🏁 Script executed:

rg -n "schema.*persist\|save.*field\|store.*schema" src/parseable --type rust -B 5 -A 10 | head -80

Repository: parseablehq/parseable

Length of output: 47


🏁 Script executed:

rg -n "create_schema_from_metastore\|persist_schema\|set_schema" src/parseable --type rust -B 3 -A 8

Repository: parseablehq/parseable

Length of output: 47


🏁 Script executed:

rg -n "normalize_field_name" src --type rust -B 5 -A 5

Repository: parseablehq/parseable

Length of output: 3561


🏁 Script executed:

sed -n '1,60p' src/event/format/json.rs

Repository: parseablehq/parseable

Length of output: 1954


Normalize collected keys before schema lookup to avoid unnecessary schema inference.

For existing streams, the stored schema uses normalized field names (e.g., _foo instead of @foo). However, collect_keys extracts field names as-is from the incoming JSON. When derive_arrow_schema attempts to match these non-normalized keys against the stored schema, the lookup fails and triggers a full schema inference path.

This causes unnecessary overhead on every event containing "@" prefixed fields, since:

  1. collect_keys extracts "@foo"
  2. derive_arrow_schema tries to find "@foo" in schema (which has "_foo")
  3. Lookup fails, schema inference is triggered
  4. Keys are only normalized later at line 121 via rename_json_keys

Normalize the collected keys before passing them to derive_arrow_schema to avoid repeated schema inference for existing streams.

🤖 Prompt for AI Agents
In @src/event/format/json.rs around lines 121 - 122, collect_keys currently
returns raw field names (e.g., "@foo") which are only normalized later by
rename_json_keys, causing derive_arrow_schema to miss matches and re-infer
schemas; fix by normalizing the collected keys immediately after collect_keys
and before calling derive_arrow_schema (i.e., pass
rename_json_keys(collect_keys(...)) into derive_arrow_schema or call
rename_json_keys on the keys array returned by collect_keys), ensuring functions
rename_json_keys, collect_keys, and derive_arrow_schema are updated accordingly
so schema lookup uses the normalized names.


Ok((value_arr, schema, is_first))
}

Expand Down Expand Up @@ -257,6 +260,27 @@ fn collect_keys<'a>(values: impl Iterator<Item = &'a Value>) -> Result<Vec<&'a s
Ok(keys)
}

/// Renames JSON keys to match the schema transformation using normalize_field_name
fn rename_json_keys(values: Vec<Value>) -> Vec<Value> {
values
.into_iter()
.map(|value| {
if let Value::Object(map) = value {
let new_map: serde_json::Map<String, Value> = map
.into_iter()
.map(|(mut key, val)| {
super::normalize_field_name(&mut key);
(key, val)
})
.collect();
Value::Object(new_map)
} else {
value
}
})
.collect()
}
Comment on lines +263 to +282
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Implementation is correct, but consider key collision edge case.

The function correctly normalizes JSON object keys. However, if an input object contains both "@foo" and "_foo", they will collide after normalization, potentially causing data loss (the second insertion would overwrite the first).

While this edge case may be rare, consider whether validation or error handling is needed:

🛡️ Potential collision detection
 fn rename_json_keys(values: Vec<Value>) -> Vec<Value> {
     values
         .into_iter()
         .map(|value| {
             if let Value::Object(map) = value {
+                let mut seen_keys = std::collections::HashSet::new();
                 let new_map: serde_json::Map<String, Value> = map
                     .into_iter()
                     .map(|(mut key, val)| {
+                        let original_key = key.clone();
                         super::normalize_field_name(&mut key);
+                        if !seen_keys.insert(key.clone()) {
+                            tracing::warn!("Key collision detected: '{}' normalizes to existing key '{}'", original_key, key);
+                        }
                         (key, val)
                     })
                     .collect();
                 Value::Object(new_map)
             } else {
                 value
             }
         })
         .collect()
 }


fn fields_mismatch(
schema: &[Arc<Field>],
body: &Value,
Expand All @@ -267,7 +291,10 @@ fn fields_mismatch(
if val.is_null() {
continue;
}
let Some(field) = get_field(schema, name) else {
// Normalize field name to match schema transformation
let mut lookup_name = name.to_string();
super::normalize_field_name(&mut lookup_name);
let Some(field) = get_field(schema, &lookup_name) else {
return true;
};
if !valid_type(field, val, schema_version, static_schema_flag) {
Expand Down
13 changes: 12 additions & 1 deletion src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ static TIME_FIELD_NAME_PARTS: [&str; 11] = [
];
type EventSchema = Vec<Arc<Field>>;

/// Normalizes a field name by replacing leading '@' with '_'.
/// Fields starting with '@' are renamed to start with '_'.
#[inline]
pub fn normalize_field_name(name: &mut String) {
if let Some(stripped) = name.strip_prefix('@') {
*name = format!("_{}", stripped);
}
}

/// Source of the logs, used to perform special processing for certain sources
#[derive(Default, Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)]
pub enum LogSource {
Expand Down Expand Up @@ -335,7 +344,9 @@ pub fn override_data_type(
.fields()
.iter()
.map(|field| {
let field_name = field.name().as_str();
// Normalize field names - replace '@' prefix with '_'
let mut field_name = field.name().to_string();
normalize_field_name(&mut field_name);
match (schema_version, map.get(field.name())) {
// in V1 for new fields in json named "time"/"date" or such and having inferred
// type string, that can be parsed as timestamp, use the timestamp type.
Expand Down
Loading