Skip to content

Commit 0e0a8f8

Browse files
reject event on key collision, log error on ingestion failure
1 parent 8a2a412 commit 0e0a8f8

File tree

2 files changed

+38
-13
lines changed

2 files changed

+38
-13
lines changed

src/event/format/json.rs

Lines changed: 36 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ use chrono::{DateTime, NaiveDate, NaiveDateTime, Utc};
2727
use datafusion::arrow::util::bit_util::round_upto_multiple_of_64;
2828
use itertools::Itertools;
2929
use serde_json::Value;
30-
use std::{collections::HashMap, sync::Arc};
30+
use std::{
31+
collections::{HashMap, HashSet},
32+
sync::Arc,
33+
};
3134
use tracing::error;
3235

3336
use super::EventFormat;
@@ -74,6 +77,10 @@ impl EventFormat for Event {
7477
_ => unreachable!("flatten would have failed beforehand"),
7578
};
7679

80+
// Rename JSON keys starting with '@' to '_' to match the schema
81+
// Reject event if renaming would cause a key collision
82+
let value_arr = rename_json_keys(value_arr)?;
83+
7784
// collect all the keys from all the json objects in the request body
7885
let fields =
7986
collect_keys(value_arr.iter()).expect("fields can be collected from array of objects");
@@ -118,9 +125,6 @@ impl EventFormat for Event {
118125
));
119126
}
120127

121-
// Rename JSON keys starting with '@' to '_' to match the schema
122-
let value_arr = rename_json_keys(value_arr);
123-
124128
Ok((value_arr, schema, is_first))
125129
}
126130

@@ -260,22 +264,44 @@ fn collect_keys<'a>(values: impl Iterator<Item = &'a Value>) -> Result<Vec<&'a s
260264
Ok(keys)
261265
}
262266

263-
/// Renames JSON keys to match the schema transformation using normalize_field_name
264-
fn rename_json_keys(values: Vec<Value>) -> Vec<Value> {
267+
/// Renames JSON keys to match the schema transformation using normalize_field_name.
268+
/// Returns an error if renaming would cause a key collision.
269+
fn rename_json_keys(values: Vec<Value>) -> Result<Vec<Value>, anyhow::Error> {
265270
values
266271
.into_iter()
267272
.map(|value| {
268273
if let Value::Object(map) = value {
274+
// Collect original keys to check for collisions
275+
let original_keys: HashSet<String> = map.keys().cloned().collect();
276+
277+
// Check for collisions before renaming
278+
for key in map.keys() {
279+
if key.starts_with('@') {
280+
let mut normalized_key = key.clone();
281+
super::normalize_field_name(&mut normalized_key);
282+
if original_keys.contains(&normalized_key) {
283+
return Err(anyhow!(
284+
"Key collision detected: '{}' and '{}' would both map to '{}'",
285+
key,
286+
normalized_key,
287+
normalized_key
288+
));
289+
}
290+
}
291+
}
292+
269293
let new_map: serde_json::Map<String, Value> = map
270294
.into_iter()
271295
.map(|(mut key, val)| {
272-
super::normalize_field_name(&mut key);
296+
if key.starts_with('@') {
297+
super::normalize_field_name(&mut key);
298+
}
273299
(key, val)
274300
})
275301
.collect();
276-
Value::Object(new_map)
302+
Ok(Value::Object(new_map))
277303
} else {
278-
value
304+
Ok(value)
279305
}
280306
})
281307
.collect()
@@ -291,10 +317,7 @@ fn fields_mismatch(
291317
if val.is_null() {
292318
continue;
293319
}
294-
// Normalize field name to match schema transformation
295-
let mut lookup_name = name.to_string();
296-
super::normalize_field_name(&mut lookup_name);
297-
let Some(field) = get_field(schema, &lookup_name) else {
320+
let Some(field) = get_field(schema, name) else {
298321
return true;
299322
};
300323
if !valid_type(field, val, schema_version, static_schema_flag) {

src/handlers/http/ingest.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use actix_web::{HttpRequest, HttpResponse, http::header::ContentType};
2424
use arrow_array::RecordBatch;
2525
use bytes::Bytes;
2626
use chrono::Utc;
27+
use tracing::error;
2728

2829
use crate::event::error::EventError;
2930
use crate::event::format::known_schema::{self, KNOWN_SCHEMA_LIST};
@@ -543,6 +544,7 @@ impl actix_web::ResponseError for PostError {
543544
}
544545

545546
fn error_response(&self) -> actix_web::HttpResponse<actix_web::body::BoxBody> {
547+
error!("{self}");
546548
match self {
547549
PostError::MetastoreError(metastore_error) => {
548550
actix_web::HttpResponse::build(metastore_error.status_code())

0 commit comments

Comments
 (0)