diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 69d22c2e5..4ea637c00 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -23,13 +23,12 @@ use anyhow::anyhow; use arrow_schema::Field; use bytes::Bytes; use chrono::{DateTime, NaiveDateTime, Utc}; -use itertools::Itertools; use serde_json::Value; use crate::{ event::{ + self, format::{self, EventFormat}, - Event, }, handlers::{ http::{ingest::PostError, kinesis}, @@ -73,61 +72,174 @@ pub async fn push_logs( let custom_partition = STREAM_INFO.get_custom_partition(stream_name)?; let schema_version = STREAM_INFO.get_schema_version(stream_name)?; let body_val: Value = serde_json::from_slice(body)?; - let data = convert_array_to_object( - body_val, - time_partition.as_ref(), - time_partition_limit, - custom_partition.as_ref(), - schema_version, - )?; - for value in data { - let origin_size = serde_json::to_vec(&value).unwrap().len() as u64; // string length need not be the same as byte length - let parsed_timestamp = match time_partition.as_ref() { - Some(time_partition) => get_parsed_timestamp(&value, time_partition)?, - _ => Utc::now().naive_utc(), - }; - let custom_partition_values = match custom_partition.as_ref() { - Some(custom_partition) => { - let custom_partitions = custom_partition.split(',').collect_vec(); - get_custom_partition_values(&value, &custom_partitions) + let size: usize = body.len(); + let mut parsed_timestamp = Utc::now().naive_utc(); + if time_partition.is_none() { + if custom_partition.is_none() { + let size = size as u64; + create_process_record_batch( + stream_name, + req, + body_val, + static_schema_flag.as_ref(), + None, + parsed_timestamp, + &HashMap::new(), + size, + schema_version, + ) + .await?; + } else { + let data = convert_array_to_object( + body_val, + None, + None, + custom_partition.as_ref(), + schema_version, + )?; + let custom_partition = custom_partition.unwrap(); + let custom_partition_list = custom_partition.split(',').collect::>(); + + for value in data { + let custom_partition_values = + get_custom_partition_values(&value, &custom_partition_list); + + let size = value.to_string().into_bytes().len() as u64; + create_process_record_batch( + stream_name, + req, + value, + static_schema_flag.as_ref(), + None, + parsed_timestamp, + &custom_partition_values, + size, + schema_version, + ) + .await?; } - None => HashMap::new(), - }; - let schema = STREAM_INFO - .read() - .unwrap() - .get(stream_name) - .ok_or(PostError::StreamNotFound(stream_name.to_owned()))? - .schema - .clone(); - let (rb, is_first_event) = into_event_batch( - req, - &value, - schema, - static_schema_flag.as_ref(), + } + } else if custom_partition.is_none() { + let data = convert_array_to_object( + body_val, + time_partition.as_ref(), + time_partition_limit, + None, + schema_version, + )?; + for value in data { + parsed_timestamp = get_parsed_timestamp(&value, time_partition.as_ref().unwrap())?; + let size = value.to_string().into_bytes().len() as u64; + create_process_record_batch( + stream_name, + req, + value, + static_schema_flag.as_ref(), + time_partition.as_ref(), + parsed_timestamp, + &HashMap::new(), + size, + schema_version, + ) + .await?; + } + } else { + let data = convert_array_to_object( + body_val, time_partition.as_ref(), + time_partition_limit, + custom_partition.as_ref(), schema_version, )?; + let custom_partition = custom_partition.unwrap(); + let custom_partition_list = custom_partition.split(',').collect::>(); - Event { - rb, - stream_name: stream_name.to_owned(), - origin_format: "json", - origin_size, - is_first_event, - parsed_timestamp, - time_partition: time_partition.clone(), - custom_partition_values, - stream_type: StreamType::UserDefined, + for value in data { + let custom_partition_values = + get_custom_partition_values(&value, &custom_partition_list); + + parsed_timestamp = get_parsed_timestamp(&value, time_partition.as_ref().unwrap())?; + let size = value.to_string().into_bytes().len() as u64; + create_process_record_batch( + stream_name, + req, + value, + static_schema_flag.as_ref(), + time_partition.as_ref(), + parsed_timestamp, + &custom_partition_values, + size, + schema_version, + ) + .await?; } - .process() - .await?; } Ok(()) } +#[allow(clippy::too_many_arguments)] +pub async fn create_process_record_batch( + stream_name: &str, + req: &HttpRequest, + value: Value, + static_schema_flag: Option<&String>, + time_partition: Option<&String>, + parsed_timestamp: NaiveDateTime, + custom_partition_values: &HashMap, + origin_size: u64, + schema_version: SchemaVersion, +) -> Result<(), PostError> { + let (rb, is_first_event) = get_stream_schema( + stream_name, + req, + &value, + static_schema_flag, + time_partition, + schema_version, + )?; + event::Event { + rb, + stream_name: stream_name.to_owned(), + origin_format: "json", + origin_size, + is_first_event, + parsed_timestamp, + time_partition: time_partition.cloned(), + custom_partition_values: custom_partition_values.clone(), + stream_type: StreamType::UserDefined, + } + .process() + .await?; + + Ok(()) +} + +pub fn get_stream_schema( + stream_name: &str, + req: &HttpRequest, + body: &Value, + static_schema_flag: Option<&String>, + time_partition: Option<&String>, + schema_version: SchemaVersion, +) -> Result<(arrow_array::RecordBatch, bool), PostError> { + let hash_map = STREAM_INFO.read().unwrap(); + let schema = hash_map + .get(stream_name) + .ok_or(PostError::StreamNotFound(stream_name.to_owned()))? + .schema + .clone(); + into_event_batch( + req, + body, + schema, + static_schema_flag, + time_partition, + schema_version, + ) +} + pub fn into_event_batch( req: &HttpRequest, body: &Value, diff --git a/src/utils/json/flatten.rs b/src/utils/json/flatten.rs index afd17ace6..58809618c 100644 --- a/src/utils/json/flatten.rs +++ b/src/utils/json/flatten.rs @@ -273,59 +273,88 @@ pub fn flatten_array_objects( /// Recursively flattens a JSON value. /// - If the value is an array, it flattens all elements of the array. /// - If the value is an object, it flattens all nested objects and arrays. +/// - If the JSON value is heavily nested (with more than 4 levels of hierarchy), returns error /// - Otherwise, it returns the value itself in a vector. /// /// Examples: /// 1. `{"a": 1}` ~> `[{"a": 1}]` /// 2. `[{"a": 1}, {"b": 2}]` ~> `[{"a": 1}, {"b": 2}]` /// 3. `[{"a": [{"b": 1}, {"c": 2}]}]` ~> `[{"a": {"b": 1)}}, {"a": {"c": 2)}}]` -/// 3. `{"a": [{"b": 1}, {"c": 2}], "d": {"e": 4}}` ~> `[{"a": {"b":1}, "d": {"e":4}}, {"a": {"c":2}, "d": {"e":4}}]` -fn flattening_helper(value: &Value) -> Vec { +/// 4. `{"a": [{"b": 1}, {"c": 2}], "d": {"e": 4}}` ~> `[{"a": {"b":1}, "d": {"e":4}}, {"a": {"c":2}, "d": {"e":4}}]` +/// 5. `{"a":{"b":{"c":{"d":{"e":["a","b"]}}}}}` ~> returns error - heavily nested, cannot flatten this JSON +pub fn generic_flattening(value: &Value) -> Result, JsonFlattenError> { match value { - Value::Array(arr) => arr.iter().flat_map(flattening_helper).collect(), - Value::Object(map) => map + Value::Array(arr) => Ok(arr .iter() - .fold(vec![Map::new()], |results, (key, val)| match val { - Value::Array(arr) => arr - .iter() - .flat_map(flattening_helper) - .flat_map(|flattened_item| { - results.iter().map(move |result| { - let mut new_obj = result.clone(); - new_obj.insert(key.clone(), flattened_item.clone()); - new_obj + .flat_map(|flatten_item| generic_flattening(flatten_item).unwrap_or_default()) + .collect()), + Value::Object(map) => { + let results = map + .iter() + .fold(vec![Map::new()], |results, (key, val)| match val { + Value::Array(arr) => arr + .iter() + .flat_map(|flatten_item| { + generic_flattening(flatten_item).unwrap_or_default() + }) + .flat_map(|flattened_item| { + results.iter().map(move |result| { + let mut new_obj = result.clone(); + new_obj.insert(key.clone(), flattened_item.clone()); + new_obj + }) + }) + .collect(), + Value::Object(_) => generic_flattening(val) + .unwrap_or_default() + .iter() + .flat_map(|nested_result| { + results.iter().map(move |result| { + let mut new_obj = result.clone(); + new_obj.insert(key.clone(), nested_result.clone()); + new_obj + }) }) - }) - .collect(), - Value::Object(_) => flattening_helper(val) - .iter() - .flat_map(|nested_result| { - results.iter().map(move |result| { - let mut new_obj = result.clone(); - new_obj.insert(key.clone(), nested_result.clone()); - new_obj + .collect(), + _ => results + .into_iter() + .map(|mut result| { + result.insert(key.clone(), val.clone()); + result }) - }) - .collect(), - _ => results - .into_iter() - .map(|mut result| { - result.insert(key.clone(), val.clone()); - result - }) - .collect(), - }) - .into_iter() - .map(Value::Object) - .collect(), - _ => vec![value.clone()], + .collect(), + }); + + Ok(results.into_iter().map(Value::Object).collect()) + } + _ => Ok(vec![value.clone()]), + } +} + +/// recursively checks the level of nesting for the serde Value +/// if Value has more than 4 levels of hierarchy, returns true +/// example - +/// 1. `{"a":{"b":{"c":{"d":{"e":["a","b"]}}}}}` ~> returns true +/// 2. `{"a": [{"b": 1}, {"c": 2}], "d": {"e": 4}}` ~> returns false +pub fn has_more_than_four_levels(value: &Value, current_level: usize) -> bool { + if current_level > 4 { + return true; + } + match value { + Value::Array(arr) => arr + .iter() + .any(|item| has_more_than_four_levels(item, current_level)), + Value::Object(map) => map + .values() + .any(|val| has_more_than_four_levels(val, current_level + 1)), + _ => false, } } // Converts a Vector of values into a `Value::Array`, as long as all of them are objects -pub fn generic_flattening(json: Value) -> Result { - let mut flattened = Vec::new(); - for item in flattening_helper(&json) { +pub fn convert_to_array(flattened: Vec) -> Result { + let mut result = Vec::new(); + for item in flattened { let mut map = Map::new(); let Some(item) = item.as_object() else { return Err(JsonFlattenError::ExpectedObjectInArray); @@ -333,15 +362,16 @@ pub fn generic_flattening(json: Value) -> Result { for (key, value) in item { map.insert(key.clone(), value.clone()); } - flattened.push(Value::Object(map)); + result.push(Value::Object(map)); } - - Ok(Value::Array(flattened)) + Ok(Value::Array(result)) } #[cfg(test)] mod tests { - use crate::utils::json::flatten::flatten_array_objects; + use crate::utils::json::flatten::{ + flatten_array_objects, generic_flattening, has_more_than_four_levels, + }; use super::{flatten, JsonFlattenError}; use serde_json::{json, Map, Value}; @@ -599,4 +629,23 @@ mod tests { JsonFlattenError::FieldContainsPeriod(_) ); } + + #[test] + fn unacceptable_levels_of_nested_json() { + let value = json!({"a":{"b":{"c":{"d":{"e":["a","b"]}}}}}); + assert!(has_more_than_four_levels(&value, 1)); + } + + #[test] + fn acceptable_levels_of_nested_json() { + let value = json!({"a":{"b":{"e":["a","b"]}}}); + assert!(!has_more_than_four_levels(&value, 1)); + } + + #[test] + fn flatten_json() { + let value = json!({"a":{"b":{"e":["a","b"]}}}); + let expected = vec![json!({"a":{"b":{"e":"a"}}}), json!({"a":{"b":{"e":"b"}}})]; + assert_eq!(generic_flattening(&value).unwrap(), expected); + } } diff --git a/src/utils/json/mod.rs b/src/utils/json/mod.rs index 1ef31c804..0d3ac1e79 100644 --- a/src/utils/json/mod.rs +++ b/src/utils/json/mod.rs @@ -18,6 +18,7 @@ use std::num::NonZeroU32; +use flatten::{convert_to_array, generic_flattening, has_more_than_four_levels}; use serde_json; use serde_json::Value; @@ -25,6 +26,9 @@ use crate::metadata::SchemaVersion; pub mod flatten; +/// calls the function `flatten_json` which results Vec or Error +/// in case when Vec is returned, converts the Vec to Value of Array +/// this is to ensure recursive flattening does not happen for heavily nested jsons pub fn flatten_json_body( body: Value, time_partition: Option<&String>, @@ -33,12 +37,14 @@ pub fn flatten_json_body( schema_version: SchemaVersion, validation_required: bool, ) -> Result { - let mut nested_value = if schema_version == SchemaVersion::V1 { - flatten::generic_flattening(body)? - } else { - body - }; - + // Flatten the json body only if new schema and has less than 4 levels of nesting + let mut nested_value = + if schema_version == SchemaVersion::V0 || has_more_than_four_levels(&body, 1) { + body + } else { + let flattened_json = generic_flattening(&body)?; + convert_to_array(flattened_json)? + }; flatten::flatten( &mut nested_value, "_", @@ -47,7 +53,6 @@ pub fn flatten_json_body( custom_partition, validation_required, )?; - Ok(nested_value) } @@ -93,3 +98,45 @@ pub fn convert_to_string(value: &Value) -> Value { } } } + +#[cfg(test)] +mod tests { + use super::flatten_json_body; + use serde_json::json; + + #[test] + fn hierarchical_json_flattening_success() { + let value = json!({"a":{"b":{"e":["a","b"]}}}); + let expected = json!([{"a_b_e": "a"}, {"a_b_e": "b"}]); + assert_eq!( + flatten_json_body( + value, + None, + None, + None, + crate::metadata::SchemaVersion::V1, + false + ) + .unwrap(), + expected + ); + } + + #[test] + fn hierarchical_json_flattening_failure() { + let value = json!({"a":{"b":{"c":{"d":{"e":["a","b"]}}}}}); + let expected = json!({"a_b_c_d_e": ["a","b"]}); + assert_eq!( + flatten_json_body( + value, + None, + None, + None, + crate::metadata::SchemaVersion::V1, + false + ) + .unwrap(), + expected + ); + } +}