diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index 8fce4bcec..121bc6bb8 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -108,11 +108,10 @@ impl Alert { ); let deployment_id = storage::StorageMetadata::global().deployment_id; let deployment_mode = storage::StorageMetadata::global().mode.to_string(); - let additional_labels = + let mut additional_labels = serde_json::to_value(rule).expect("rule is perfectly deserializable"); - let flatten_additional_labels = - utils::json::flatten::flatten_with_parent_prefix(additional_labels, "rule", "_") - .expect("can be flattened"); + utils::json::flatten::flatten_with_parent_prefix(&mut additional_labels, "rule", "_") + .expect("can be flattened"); Context::new( stream_name, AlertInfo::new( @@ -122,7 +121,7 @@ impl Alert { alert_state, ), DeploymentInfo::new(deployment_instance, deployment_id, deployment_mode), - flatten_additional_labels, + additional_labels, ) } } diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 2c70dca45..900219a01 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -196,7 +196,7 @@ pub async fn create_stream_if_not_exists( super::logstream::create_stream( stream_name.to_string(), "", - "", + None, "", "", Arc::new(Schema::empty()), diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 98ab11ab8..dad5df00e 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -49,6 +49,7 @@ use http::{HeaderName, HeaderValue}; use serde_json::Value; use std::collections::HashMap; use std::fs; +use std::num::NonZeroU32; use std::str::FromStr; use std::sync::Arc; use tracing::{error, warn}; @@ -471,7 +472,7 @@ fn remove_id_from_alerts(value: &mut Value) { pub async fn create_stream( stream_name: String, time_partition: &str, - time_partition_limit: &str, + time_partition_limit: Option, custom_partition: &str, static_schema_flag: &str, schema: Arc, @@ -511,7 +512,7 @@ pub async fn create_stream( stream_name.to_string(), created_at, time_partition.to_string(), - time_partition_limit.to_string(), + time_partition_limit, custom_partition.to_string(), static_schema_flag.to_string(), static_schema, @@ -561,7 +562,9 @@ pub async fn get_stream_info(req: HttpRequest) -> Result Result<&str, CreateStreamError> { +) -> Result { if !time_partition_limit.ends_with('d') { return Err(CreateStreamError::Custom { msg: "Missing 'd' suffix for duration value".to_string(), @@ -215,12 +215,12 @@ pub fn validate_time_partition_limit( }); } let days = &time_partition_limit[0..time_partition_limit.len() - 1]; - if days.parse::().is_err() { + let Ok(days) = days.parse::() else { return Err(CreateStreamError::Custom { msg: "Could not convert duration to an unsigned number".to_string(), status: StatusCode::BAD_REQUEST, }); - } + }; Ok(days) } @@ -288,7 +288,7 @@ pub fn validate_static_schema( pub async fn update_time_partition_limit_in_stream( stream_name: String, - time_partition_limit: &str, + time_partition_limit: NonZeroU32, ) -> Result<(), CreateStreamError> { let storage = CONFIG.storage().get_object_store(); if let Err(err) = storage @@ -299,7 +299,7 @@ pub async fn update_time_partition_limit_in_stream( } if metadata::STREAM_INFO - .update_time_partition_limit(&stream_name, time_partition_limit.to_string()) + .update_time_partition_limit(&stream_name, time_partition_limit) .is_err() { return Err(CreateStreamError::Custom { @@ -381,7 +381,7 @@ pub async fn update_custom_partition_in_stream( pub async fn create_stream( stream_name: String, time_partition: &str, - time_partition_limit: &str, + time_partition_limit: Option, custom_partition: &str, static_schema_flag: &str, schema: Arc, @@ -421,7 +421,7 @@ pub async fn create_stream( stream_name.to_string(), created_at, time_partition.to_string(), - time_partition_limit.to_string(), + time_partition_limit, custom_partition.to_string(), static_schema_flag.to_string(), static_schema, @@ -470,8 +470,7 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result< let time_partition = stream_metadata.time_partition.as_deref().unwrap_or(""); let time_partition_limit = stream_metadata .time_partition_limit - .as_deref() - .unwrap_or(""); + .and_then(|limit| limit.parse().ok()); let custom_partition = stream_metadata.custom_partition.as_deref().unwrap_or(""); let static_schema_flag = stream_metadata.static_schema_flag.as_deref().unwrap_or(""); let stream_type = stream_metadata.stream_type.as_deref().unwrap_or(""); @@ -480,7 +479,7 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result< stream_name.to_string(), stream_metadata.created_at, time_partition.to_string(), - time_partition_limit.to_string(), + time_partition_limit, custom_partition.to_string(), static_schema_flag.to_string(), static_schema, diff --git a/src/metadata.rs b/src/metadata.rs index 1fe01034c..5a5ff9bb1 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -23,6 +23,7 @@ use itertools::Itertools; use once_cell::sync::Lazy; use serde_json::Value; use std::collections::HashMap; +use std::num::NonZeroU32; use std::sync::{Arc, RwLock}; use self::error::stream_info::{CheckAlertError, LoadError, MetadataError}; @@ -53,7 +54,7 @@ pub struct LogStreamMetadata { pub created_at: String, pub first_event_at: Option, pub time_partition: Option, - pub time_partition_limit: Option, + pub time_partition_limit: Option, pub custom_partition: Option, pub static_schema_flag: Option, pub hot_tier_enabled: Option, @@ -113,11 +114,11 @@ impl StreamInfo { pub fn get_time_partition_limit( &self, stream_name: &str, - ) -> Result, MetadataError> { + ) -> Result, MetadataError> { let map = self.read().expect(LOCK_EXPECT); map.get(stream_name) .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) - .map(|metadata| metadata.time_partition_limit.clone()) + .map(|metadata| metadata.time_partition_limit) } pub fn get_custom_partition(&self, stream_name: &str) -> Result, MetadataError> { @@ -202,7 +203,7 @@ impl StreamInfo { pub fn update_time_partition_limit( &self, stream_name: &str, - time_partition_limit: String, + time_partition_limit: NonZeroU32, ) -> Result<(), MetadataError> { let mut map = self.write().expect(LOCK_EXPECT); map.get_mut(stream_name) @@ -244,7 +245,7 @@ impl StreamInfo { stream_name: String, created_at: String, time_partition: String, - time_partition_limit: String, + time_partition_limit: Option, custom_partition: String, static_schema_flag: String, static_schema: HashMap>, @@ -262,11 +263,7 @@ impl StreamInfo { } else { Some(time_partition) }, - time_partition_limit: if time_partition_limit.is_empty() { - None - } else { - Some(time_partition_limit) - }, + time_partition_limit, custom_partition: if custom_partition.is_empty() { None } else { @@ -320,7 +317,9 @@ impl StreamInfo { created_at: meta.created_at, first_event_at: meta.first_event_at, time_partition: meta.time_partition, - time_partition_limit: meta.time_partition_limit, + time_partition_limit: meta + .time_partition_limit + .and_then(|limit| limit.parse().ok()), custom_partition: meta.custom_partition, static_schema_flag: meta.static_schema_flag, hot_tier_enabled: meta.hot_tier_enabled, @@ -473,7 +472,9 @@ pub async fn load_stream_metadata_on_server_start( created_at: meta.created_at, first_event_at: meta.first_event_at, time_partition: meta.time_partition, - time_partition_limit: meta.time_partition_limit, + time_partition_limit: meta + .time_partition_limit + .and_then(|limit| limit.parse().ok()), custom_partition: meta.custom_partition, static_schema_flag: meta.static_schema_flag, hot_tier_enabled: meta.hot_tier_enabled, diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 34a1bb631..e7fb38e6e 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -50,6 +50,7 @@ use relative_path::RelativePathBuf; use tracing::error; use std::collections::BTreeMap; +use std::num::NonZeroU32; use std::{ collections::HashMap, fs, @@ -145,7 +146,7 @@ pub trait ObjectStorage: Send + Sync + 'static { &self, stream_name: &str, time_partition: &str, - time_partition_limit: &str, + time_partition_limit: Option, custom_partition: &str, static_schema_flag: &str, schema: Arc, @@ -162,11 +163,7 @@ pub trait ObjectStorage: Send + Sync + 'static { } else { format.time_partition = Some(time_partition.to_string()); } - if time_partition_limit.is_empty() { - format.time_partition_limit = None; - } else { - format.time_partition_limit = Some(time_partition_limit.to_string()); - } + format.time_partition_limit = time_partition_limit.map(|limit| limit.to_string()); if custom_partition.is_empty() { format.custom_partition = None; } else { @@ -190,14 +187,10 @@ pub trait ObjectStorage: Send + Sync + 'static { async fn update_time_partition_limit_in_stream( &self, stream_name: &str, - time_partition_limit: &str, + time_partition_limit: NonZeroU32, ) -> Result<(), ObjectStorageError> { let mut format = self.get_object_store_format(stream_name).await?; - if time_partition_limit.is_empty() { - format.time_partition_limit = None; - } else { - format.time_partition_limit = Some(time_partition_limit.to_string()); - } + format.time_partition_limit = Some(time_partition_limit.to_string()); let format_json = to_bytes(&format); self.put_object(&stream_json_path(stream_name), format_json) .await?; diff --git a/src/utils/json/flatten.rs b/src/utils/json/flatten.rs index 9399770a3..298954972 100644 --- a/src/utils/json/flatten.rs +++ b/src/utils/json/flatten.rs @@ -16,467 +16,433 @@ * */ -use anyhow::anyhow; +use std::collections::BTreeMap; +use std::num::NonZeroU32; + use chrono::{DateTime, Duration, Utc}; -use itertools::Itertools; use serde_json::map::Map; use serde_json::value::Value; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum JsonFlattenError { + #[error("Cannot flatten this JSON")] + CannotFlatten, + #[error("Ingestion failed as field {0} is not part of the log")] + FieldNotPartOfLog(String), + #[error("Ingestion failed as field {0} is empty or 'null'")] + FieldEmptyOrNull(String), + #[error("Ingestion failed as field {0} is an object")] + FieldIsObject(String), + #[error("Ingestion failed as field {0} is an array")] + FieldIsArray(String), + #[error("Ingestion failed as field {0} contains a period in the value")] + FieldContainsPeriod(String), + #[error("Ingestion failed as field {0} is not a string")] + FieldNotString(String), + #[error("Field {0} is not in the correct datetime format")] + InvalidDatetimeFormat(String), + #[error("Field {0} value is more than {1} days old")] + TimestampTooOld(String, i64), + #[error("Expected object in array of objects")] + ExpectedObjectInArray, + #[error("Found non-object element while flattening array of objects")] + NonObjectInArray, +} + +// Recursively flattens JSON objects and arrays, e.g. with the separator `.`, starting from the TOP +// `{"key": "value", "nested_key": {"key":"value"}}` becomes `{"key": "value", "nested_key.key": "value"}` pub fn flatten( - nested_value: Value, + nested_value: &mut Value, separator: &str, time_partition: Option<&String>, - time_partition_limit: Option<&String>, + time_partition_limit: Option, custom_partition: Option<&String>, validation_required: bool, -) -> Result { +) -> Result<(), JsonFlattenError> { match nested_value { Value::Object(nested_dict) => { if validation_required { - let validate_time_partition_result = validate_time_partition( - &Value::Object(nested_dict.clone()), + validate_time_partition(nested_dict, time_partition, time_partition_limit)?; + validate_custom_partition(nested_dict, custom_partition)?; + } + let mut map = Map::new(); + flatten_object(&mut map, None, nested_dict, separator)?; + *nested_dict = map; + } + Value::Array(arr) => { + for nested_value in arr { + // Recursively flatten each element, ONLY in the TOP array + flatten( + nested_value, + separator, time_partition, time_partition_limit, - ); - - let validate_custom_partition_result = validate_custom_partition( - &Value::Object(nested_dict.clone()), custom_partition, - ); - if validate_time_partition_result.is_ok() { - if validate_custom_partition_result.is_ok() { - let mut map = Map::new(); - flatten_object(&mut map, None, nested_dict, separator)?; - Ok(Value::Object(map)) - } else { - Err(anyhow!(validate_custom_partition_result.unwrap_err())) - } - } else { - Err(anyhow!(validate_time_partition_result.unwrap_err())) - } - } else { - let mut map = Map::new(); - flatten_object(&mut map, None, nested_dict, separator)?; - Ok(Value::Object(map)) - } - } - Value::Array(mut arr) => { - for _value in &mut arr { - let value: Value = _value.clone(); - if validation_required { - let validate_time_partition_result = - validate_time_partition(&value, time_partition, time_partition_limit); - let validate_custom_partition_result = - validate_custom_partition(&value, custom_partition); - if validate_time_partition_result.is_ok() { - if validate_custom_partition_result.is_ok() { - let value = std::mem::replace(_value, Value::Null); - let mut map = Map::new(); - let Value::Object(obj) = value else { - return Err(anyhow!("Expected object in array of objects")); - }; - flatten_object(&mut map, None, obj, separator)?; - *_value = Value::Object(map); - } else { - return Err(anyhow!(validate_custom_partition_result.unwrap_err())); - } - } else { - return Err(anyhow!(validate_time_partition_result.unwrap_err())); - } - } else { - let value = std::mem::replace(_value, Value::Null); - let mut map = Map::new(); - let Value::Object(obj) = value else { - return Err(anyhow!("Expected object in array of objects")); - }; - flatten_object(&mut map, None, obj, separator)?; - *_value = Value::Object(map); - } + validation_required, + )?; } - Ok(Value::Array(arr)) } - _ => Err(anyhow!("Cannot flatten this JSON")), + _ => return Err(JsonFlattenError::CannotFlatten), } + + Ok(()) } +// Validates the presence and content of custom partition fields, that it is +// not null, empty, an object , an array, or contain a `.` when serialized pub fn validate_custom_partition( - value: &Value, + value: &Map, custom_partition: Option<&String>, -) -> Result { - if custom_partition.is_none() { - return Ok(true); - } else { - let custom_partition = custom_partition.unwrap(); - let custom_partition_list = custom_partition.split(',').collect::>(); - for custom_partition_field in &custom_partition_list { - if value.get(custom_partition_field.trim()).is_none() { - return Err(anyhow!(format!( - "ingestion failed as field {} is not part of the log", - custom_partition_field - ))); - } else { - let custom_partition_value = value - .get(custom_partition_field.trim()) - .unwrap() - .to_string(); - if custom_partition_value.is_empty() - || custom_partition_value.eq_ignore_ascii_case("null") - { - return Err(anyhow!(format!( - "ingestion failed as field {} is empty", - custom_partition_field - ))); - } - if custom_partition_value.contains('.') { - return Err(anyhow!(format!( - "ingestion failed as field {} contains a period", - custom_partition_field - ))); - } +) -> Result<(), JsonFlattenError> { + let Some(custom_partition) = custom_partition else { + return Ok(()); + }; + let custom_partition_list = custom_partition.split(',').collect::>(); + + for field in custom_partition_list { + let trimmed_field = field.trim(); + let Some(field_value) = value.get(trimmed_field) else { + return Err(JsonFlattenError::FieldNotPartOfLog( + trimmed_field.to_owned(), + )); + }; + + // The field should not be null, empty, an object, an array or contain a `.` in the value + match field_value { + Value::Null => { + return Err(JsonFlattenError::FieldEmptyOrNull(trimmed_field.to_owned())) + } + Value::String(s) if s.is_empty() => { + return Err(JsonFlattenError::FieldEmptyOrNull(trimmed_field.to_owned())) + } + Value::Object(_) => { + return Err(JsonFlattenError::FieldIsObject(trimmed_field.to_owned())) } + Value::Array(_) => { + return Err(JsonFlattenError::FieldIsArray(trimmed_field.to_owned())) + } + Value::String(s) if s.contains('.') => { + return Err(JsonFlattenError::FieldContainsPeriod( + trimmed_field.to_owned(), + )) + } + Value::Number(n) if n.is_f64() => { + return Err(JsonFlattenError::FieldContainsPeriod( + trimmed_field.to_owned(), + )) + } + _ => {} } } - Ok(true) + Ok(()) } +// Validates time partitioning constraints, checking if a timestamp is a string +// that can be parsed as datetime within the configured time limit pub fn validate_time_partition( - value: &Value, + value: &Map, time_partition: Option<&String>, - time_partition_limit: Option<&String>, -) -> Result { - if time_partition.is_none() { - Ok(true) + time_partition_limit: Option, +) -> Result<(), JsonFlattenError> { + let Some(partition_key) = time_partition else { + return Ok(()); + }; + + let limit_days = time_partition_limit.map_or(30, |days| days.get() as i64); + + let Some(timestamp_value) = value.get(partition_key) else { + return Err(JsonFlattenError::FieldNotPartOfLog( + partition_key.to_owned(), + )); + }; + + let Value::String(timestamp_str) = timestamp_value else { + return Err(JsonFlattenError::FieldNotString(partition_key.to_owned())); + }; + let Ok(parsed_timestamp) = timestamp_str.parse::>() else { + return Err(JsonFlattenError::InvalidDatetimeFormat( + partition_key.to_owned(), + )); + }; + let cutoff_date = Utc::now().naive_utc() - Duration::days(limit_days); + if parsed_timestamp.naive_utc() >= cutoff_date { + Ok(()) } else { - let time_partition_limit: i64 = if let Some(time_partition_limit) = time_partition_limit { - time_partition_limit.parse().unwrap_or(30) - } else { - 30 - }; - let body_timestamp = value.get(time_partition.unwrap().to_string()); - if body_timestamp.is_some() && body_timestamp.unwrap().to_owned().as_str().is_some() { - if body_timestamp - .unwrap() - .to_owned() - .as_str() - .unwrap() - .parse::>() - .is_ok() - { - let parsed_timestamp = body_timestamp - .unwrap() - .to_owned() - .as_str() - .unwrap() - .parse::>() - .unwrap() - .naive_utc(); - - if parsed_timestamp >= Utc::now().naive_utc() - Duration::days(time_partition_limit) - { - Ok(true) - } else { - Err(anyhow!(format!( - "field {} value is more than {} days old", - time_partition.unwrap(), - time_partition_limit - ))) - } - } else { - Err(anyhow!(format!( - "field {} is not in the correct datetime format", - time_partition.unwrap() - ))) - } - } else { - Err(anyhow!(format!( - "ingestion failed as field {} is not part of the log", - time_partition.unwrap() - ))) - } + Err(JsonFlattenError::TimestampTooOld( + partition_key.to_owned(), + limit_days, + )) } } +// Flattens starting from only object types at TOP, e.g. with the parent_key `root` and separator `_` +// `{ "a": { "b": 1, c: { "d": 2 } } }` becomes `{"root_a_b":1,"root_a_c_d":2}` pub fn flatten_with_parent_prefix( - nested_value: Value, + nested_value: &mut Value, prefix: &str, separator: &str, -) -> Result { +) -> Result<(), JsonFlattenError> { + let Value::Object(nested_obj) = nested_value else { + return Err(JsonFlattenError::NonObjectInArray); + }; + let mut map = Map::new(); - if let Value::Object(nested_dict) = nested_value { - flatten_object(&mut map, Some(prefix), nested_dict, separator)?; - } else { - return Err(anyhow!("Must be an object")); - } - Ok(Value::Object(map)) + flatten_object(&mut map, Some(prefix), nested_obj, separator)?; + *nested_obj = map; + + Ok(()) } -pub fn flatten_object( - map: &mut Map, +// Flattens a nested JSON Object/Map into another target Map +fn flatten_object( + output_map: &mut Map, parent_key: Option<&str>, - nested_dict: Map, + nested_map: &mut Map, separator: &str, -) -> Result<(), anyhow::Error> { - for (key, value) in nested_dict.into_iter() { - let new_key = parent_key.map_or_else( - || key.clone(), - |parent_key| format!("{parent_key}{separator}{key}"), - ); - match value { - Value::Object(obj) => flatten_object(map, Some(&new_key), obj, separator)?, - Value::Array(arr) => { - // if value is object then decompose this list into lists - if arr.iter().any(|value| value.is_object()) { - flatten_array_objects(map, &new_key, arr, separator)?; - } else { - map.insert(new_key, Value::Array(arr)); - } +) -> Result<(), JsonFlattenError> { + for (key, mut value) in nested_map { + let new_key = match parent_key { + Some(parent) => format!("{parent}{separator}{key}"), + None => key.to_string(), + }; + + match &mut value { + Value::Object(obj) => { + flatten_object(output_map, Some(&new_key), obj, separator)?; + } + Value::Array(arr) if arr.iter().any(Value::is_object) => { + flatten_array_objects(output_map, &new_key, arr, separator)?; } - x => { - map.insert(new_key, x); + _ => { + output_map.insert(new_key, std::mem::take(value)); } } } Ok(()) } +// Flattens a nested JSON Array into the parent Map pub fn flatten_array_objects( - map: &mut Map, + output_map: &mut Map, parent_key: &str, - arr: Vec, + arr: &mut [Value], separator: &str, -) -> Result<(), anyhow::Error> { - let mut columns: Vec<(String, Vec)> = Vec::new(); - let mut len = 0; - for value in arr { - if let Value::Object(object) = value { - let mut flattened_object = Map::new(); - flatten_object(&mut flattened_object, None, object, separator)?; - let mut col_index = 0; - for (key, value) in flattened_object.into_iter().sorted_by(|a, b| a.0.cmp(&b.0)) { - loop { - if let Some((column_name, column)) = columns.get_mut(col_index) { - match (*column_name).cmp(&key) { - std::cmp::Ordering::Less => { - column.push(Value::Null); - col_index += 1; - continue; - } - std::cmp::Ordering::Equal => column.push(value), - std::cmp::Ordering::Greater => { - let mut list = vec![Value::Null; len]; - list.push(value); - columns.insert(col_index, (key, list)); - } - } - } else { - let mut list = vec![Value::Null; len]; - list.push(value); - columns.push((key, list)); - } - col_index += 1; - break; +) -> Result<(), JsonFlattenError> { + let mut columns: BTreeMap> = BTreeMap::new(); + + for (index, value) in arr.iter_mut().enumerate() { + match value { + Value::Object(nested_object) => { + let mut output_map = Map::new(); + flatten_object(&mut output_map, Some(parent_key), nested_object, separator)?; + for (key, value) in output_map { + let column = columns + .entry(key) + .or_insert_with(|| vec![Value::Null; index]); + column.push(value); } } - for (_, column) in &mut columns[col_index..] { - column.push(Value::Null) + Value::Null => { + for column in columns.values_mut() { + column.push(Value::Null); + } } - } else if value.is_null() { - for (_, column) in &mut columns { - column.push(Value::Null) + _ => return Err(JsonFlattenError::NonObjectInArray), + } + + // Ensure all columns are extended with null values if they weren't updated in this iteration + let max_len = index + 1; + for column in columns.values_mut() { + while column.len() < max_len { + column.push(Value::Null); } - } else { - return Err(anyhow!( - "Found non object element while flattening array of object(s)", - )); } - len += 1; } - for (key, arr) in columns { - let new_key = format!("{parent_key}{separator}{key}"); - map.insert(new_key, Value::Array(arr)); + // Update the main map with new keys and their corresponding arrays + for (key, values) in columns { + output_map.insert(key, Value::Array(values)); } Ok(()) } +/// Recursively flattens a JSON value. +/// - If the value is an array, it flattens all elements of the array. +/// - If the value is an object, it flattens all nested objects and arrays. +/// - Otherwise, it returns the value itself in a vector. +/// +/// Examples: +/// 1. `{"a": 1}` ~> `[{"a": 1}]` +/// 2. `[{"a": 1}, {"b": 2}]` ~> `[{"a": 1}, {"b": 2}]` +/// 3. `[{"a": [{"b": 1}, {"c": 2}]}]` ~> `[{"a": {"b": 1)}}, {"a": {"c": 2)}}]` +/// 3. `{"a": [{"b": 1}, {"c": 2}], "d": {"e": 4}}` ~> `[{"a": {"b":1}, "d": {"e":4}}, {"a": {"c":2}, "d": {"e":4}}]` pub fn flatten_json(value: &Value) -> Vec { match value { - Value::Array(arr) => { - let mut results = Vec::new(); - for item in arr { - results.extend(flatten_json(item)); - } - results - } - Value::Object(map) => { - let mut results = vec![map.clone()]; - for (key, val) in map { - if matches!(val, Value::Array(_)) { - if let Value::Array(arr) = val { - let mut new_results = Vec::new(); - for item in arr { - let flattened_items = flatten_json(item); - for flattened_item in flattened_items { - for result in &results { - let mut new_obj = result.clone(); - new_obj.insert(key.clone(), flattened_item.clone()); - new_results.push(new_obj); - } - } - } - results = new_results; - } - } else if matches!(val, Value::Object(_)) { - let nested_results = flatten_json(val); - let mut new_results = Vec::new(); - for nested_result in nested_results { - for result in &results { + Value::Array(arr) => arr.iter().flat_map(flatten_json).collect(), + Value::Object(map) => map + .iter() + .fold(vec![Map::new()], |results, (key, val)| match val { + Value::Array(arr) => arr + .iter() + .flat_map(flatten_json) + .flat_map(|flattened_item| { + results.iter().map(move |result| { + let mut new_obj = result.clone(); + new_obj.insert(key.clone(), flattened_item.clone()); + new_obj + }) + }) + .collect(), + Value::Object(_) => flatten_json(val) + .iter() + .flat_map(|nested_result| { + results.iter().map(move |result| { let mut new_obj = result.clone(); new_obj.insert(key.clone(), nested_result.clone()); - new_results.push(new_obj); - } - } - results = new_results; - } - } - results.into_iter().map(Value::Object).collect() - } + new_obj + }) + }) + .collect(), + _ => results + .into_iter() + .map(|mut result| { + result.insert(key.clone(), val.clone()); + result + }) + .collect(), + }) + .into_iter() + .map(Value::Object) + .collect(), _ => vec![value.clone()], } } -pub fn convert_to_array(flattened: Vec) -> Result { +// Converts a Vector of values into a `Value::Array`, as long as all of them are objects +pub fn convert_to_array(flattened: Vec) -> Result { let mut result = Vec::new(); for item in flattened { let mut map = Map::new(); - if let Some(item) = item.as_object() { - for (key, value) in item { - map.insert(key.clone(), value.clone()); - } - result.push(Value::Object(map)); - } else { - return Err(anyhow!("Expected object in array of objects")); + let Some(item) = item.as_object() else { + return Err(JsonFlattenError::ExpectedObjectInArray); + }; + for (key, value) in item { + map.insert(key.clone(), value.clone()); } + result.push(Value::Object(map)); } Ok(Value::Array(result)) } + #[cfg(test)] mod tests { use crate::utils::json::flatten::flatten_array_objects; - use super::flatten; + use super::{flatten, JsonFlattenError}; use serde_json::{json, Map, Value}; #[test] fn flatten_single_key_string() { - let obj = json!({"key": "value"}); - assert_eq!( - obj.clone(), - flatten(obj, "_", None, None, None, false).unwrap() - ); + let mut obj = json!({"key": "value"}); + let expected = obj.clone(); + flatten(&mut obj, "_", None, None, None, false).unwrap(); + assert_eq!(obj, expected); } #[test] fn flatten_single_key_int() { - let obj = json!({"key": 1}); - assert_eq!( - obj.clone(), - flatten(obj, "_", None, None, None, false).unwrap() - ); + let mut obj = json!({"key": 1}); + let expected = obj.clone(); + flatten(&mut obj, "_", None, None, None, false).unwrap(); + assert_eq!(obj, expected); } #[test] fn flatten_multiple_key_value() { - let obj = json!({"key1": 1, "key2": "value2"}); - assert_eq!( - obj.clone(), - flatten(obj, "_", None, None, None, false).unwrap() - ); + let mut obj = json!({"key1": 1, "key2": "value2"}); + let expected = obj.clone(); + flatten(&mut obj, "_", None, None, None, false).unwrap(); + assert_eq!(obj, expected); } #[test] fn flatten_nested_single_key_value() { - let obj = json!({"key": "value", "nested_key": {"key":"value"}}); - assert_eq!( - json!({"key": "value", "nested_key.key": "value"}), - flatten(obj, ".", None, None, None, false).unwrap() - ); + let mut obj = json!({"key": "value", "nested_key": {"key":"value"}}); + let expected = json!({"key": "value", "nested_key.key": "value"}); + flatten(&mut obj, ".", None, None, None, false).unwrap(); + assert_eq!(obj, expected); } #[test] fn nested_multiple_key_value() { - let obj = json!({"key": "value", "nested_key": {"key1":"value1", "key2": "value2"}}); - assert_eq!( - json!({"key": "value", "nested_key.key1": "value1", "nested_key.key2": "value2"}), - flatten(obj, ".", None, None, None, false).unwrap() - ); + let mut obj = json!({"key": "value", "nested_key": {"key1":"value1", "key2": "value2"}}); + let expected = + json!({"key": "value", "nested_key.key1": "value1", "nested_key.key2": "value2"}); + flatten(&mut obj, ".", None, None, None, false).unwrap(); + assert_eq!(obj, expected); } #[test] fn nested_key_value_with_array() { - let obj = json!({"key": "value", "nested_key": {"key1":[1,2,3]}}); - assert_eq!( - json!({"key": "value", "nested_key.key1": [1,2,3]}), - flatten(obj, ".", None, None, None, false).unwrap() - ); + let mut obj = json!({"key": "value", "nested_key": {"key1":[1,2,3]}}); + let expected = json!({"key": "value", "nested_key.key1": [1,2,3]}); + flatten(&mut obj, ".", None, None, None, false).unwrap(); + assert_eq!(obj, expected); } #[test] fn nested_obj_array() { - let obj = json!({"key": [{"a": "value0"}, {"a": "value1"}]}); - assert_eq!( - json!({"key.a": ["value0", "value1"]}), - flatten(obj, ".", None, None, None, false).unwrap() - ); + let mut obj = json!({"key": [{"a": "value0"}, {"a": "value1"}]}); + let expected = json!({"key.a": ["value0", "value1"]}); + flatten(&mut obj, ".", None, None, None, false).unwrap(); + assert_eq!(obj, expected); } #[test] fn nested_obj_array_nulls() { - let obj = json!({"key": [{"a": "value0"}, {"a": "value1", "b": "value1"}]}); - assert_eq!( - json!({"key.a": ["value0", "value1"], "key.b": [null, "value1"]}), - flatten(obj, ".", None, None, None, false).unwrap() - ); + let mut obj = json!({"key": [{"a": "value0"}, {"a": "value1", "b": "value1"}]}); + let expected = json!({"key.a": ["value0", "value1"], "key.b": [null, "value1"]}); + flatten(&mut obj, ".", None, None, None, false).unwrap(); + assert_eq!(obj, expected); } #[test] fn nested_obj_array_nulls_reversed() { - let obj = json!({"key": [{"a": "value0", "b": "value0"}, {"a": "value1"}]}); - assert_eq!( - json!({"key.a": ["value0", "value1"], "key.b": ["value0", null]}), - flatten(obj, ".", None, None, None, false).unwrap() - ); + let mut obj = json!({"key": [{"a": "value0", "b": "value0"}, {"a": "value1"}]}); + let expected = json!({"key.a": ["value0", "value1"], "key.b": ["value0", null]}); + flatten(&mut obj, ".", None, None, None, false).unwrap(); + assert_eq!(obj, expected); } #[test] fn nested_obj_array_nested_obj() { - let obj = json!({"key": [{"a": {"p": 0}, "b": "value0"}, {"b": "value1"}]}); - assert_eq!( - json!({"key.a.p": [0, null], "key.b": ["value0", "value1"]}), - flatten(obj, ".", None, None, None, false).unwrap() - ); + let mut obj = json!({"key": [{"a": {"p": 0}, "b": "value0"}, {"b": "value1"}]}); + let expected = json!({"key.a.p": [0, null], "key.b": ["value0", "value1"]}); + flatten(&mut obj, ".", None, None, None, false).unwrap(); + assert_eq!(obj, expected); } #[test] fn nested_obj_array_nested_obj_array() { - let obj = json!({"key": [{"a": [{"p": "value0", "q": "value0"}, {"p": "value1", "q": null}], "b": "value0"}, {"b": "value1"}]}); - assert_eq!( - json!({"key.a.p": [["value0", "value1"], null], "key.a.q": [["value0", null], null], "key.b": ["value0", "value1"]}), - flatten(obj, ".", None, None, None, false).unwrap() - ); + let mut obj = json!({"key": [{"a": [{"p": "value0", "q": "value0"}, {"p": "value1", "q": null}], "b": "value0"}, {"b": "value1"}]}); + let expected = json!({"key.a.p": [["value0", "value1"], null], "key.a.q": [["value0", null], null], "key.b": ["value0", "value1"]}); + flatten(&mut obj, ".", None, None, None, false).unwrap(); + assert_eq!(obj, expected); } #[test] fn flatten_mixed_object() { - let obj = json!({"a": 42, "arr": ["1", {"key": "2"}, {"key": {"nested": "3"}}]}); - assert!(flatten(obj, ".", None, None, None, false).is_err()); + let mut obj = json!({"a": 42, "arr": ["1", {"key": "2"}, {"key": {"nested": "3"}}]}); + assert!(flatten(&mut obj, ".", None, None, None, false).is_err()); } #[test] fn flatten_array_nulls_at_start() { - let Value::Array(arr) = json!([ + let Value::Array(mut arr) = json!([ null, {"p": 2, "q": 2}, {"q": 3}, @@ -485,7 +451,7 @@ mod tests { }; let mut map = Map::new(); - flatten_array_objects(&mut map, "key", arr, ".").unwrap(); + flatten_array_objects(&mut map, "key", &mut arr, ".").unwrap(); assert_eq!(map.len(), 2); assert_eq!(map.get("key.p").unwrap(), &json!([null, 2, null])); @@ -494,12 +460,12 @@ mod tests { #[test] fn flatten_array_objects_nulls_at_end() { - let Value::Array(arr) = json!([{"a": 1, "b": 1}, {"a": 2}, null]) else { + let Value::Array(mut arr) = json!([{"a": 1, "b": 1}, {"a": 2}, null]) else { unreachable!() }; let mut map = Map::new(); - flatten_array_objects(&mut map, "key", arr, ".").unwrap(); + flatten_array_objects(&mut map, "key", &mut arr, ".").unwrap(); assert_eq!(map.len(), 2); assert_eq!(map.get("key.a").unwrap(), &json!([1, 2, null])); @@ -508,12 +474,12 @@ mod tests { #[test] fn flatten_array_objects_nulls_in_middle() { - let Value::Array(arr) = json!([{"a": 1, "b": 1}, null, {"a": 3, "c": 3}]) else { + let Value::Array(mut arr) = json!([{"a": 1, "b": 1}, null, {"a": 3, "c": 3}]) else { unreachable!() }; let mut map = Map::new(); - flatten_array_objects(&mut map, "key", arr, ".").unwrap(); + flatten_array_objects(&mut map, "key", &mut arr, ".").unwrap(); assert_eq!(map.len(), 3); assert_eq!(map.get("key.a").unwrap(), &json!([1, null, 3])); @@ -523,7 +489,7 @@ mod tests { #[test] fn flatten_array_test() { - let Value::Array(arr) = json!([ + let Value::Array(mut arr) = json!([ {"p": 1, "q": 1}, {"r": 2, "q": 2}, {"p": 3, "r": 3} @@ -532,7 +498,7 @@ mod tests { }; let mut map = Map::new(); - flatten_array_objects(&mut map, "key", arr, ".").unwrap(); + flatten_array_objects(&mut map, "key", &mut arr, ".").unwrap(); assert_eq!(map.len(), 3); assert_eq!(map.get("key.p").unwrap(), &json!([1, null, 3])); @@ -542,7 +508,7 @@ mod tests { #[test] fn flatten_array_nested_test() { - let Value::Array(arr) = json!([ + let Value::Array(mut arr) = json!([ {"p": 1, "q": [{"x": 1}, {"x": 2}]}, {"r": 2, "q": [{"x": 1}]}, {"p": 3, "r": 3} @@ -551,11 +517,85 @@ mod tests { }; let mut map = Map::new(); - flatten_array_objects(&mut map, "key", arr, ".").unwrap(); + flatten_array_objects(&mut map, "key", &mut arr, ".").unwrap(); assert_eq!(map.len(), 3); assert_eq!(map.get("key.p").unwrap(), &json!([1, null, 3])); assert_eq!(map.get("key.q.x").unwrap(), &json!([[1, 2], [1], null])); assert_eq!(map.get("key.r").unwrap(), &json!([null, 2, 3])); } + + #[test] + fn acceptable_value_custom_parition_test() { + let mut value = json!({ + "a": 1, + }); + assert!(flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).is_ok()); + + let mut value = json!({ + "a": true, + }); + assert!(flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).is_ok()); + + let mut value = json!({ + "a": "yes", + }); + assert!(flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).is_ok()); + + let mut value = json!({ + "a": -1, + }); + assert!(flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).is_ok()); + } + + #[test] + fn unacceptable_value_custom_partition_test() { + let mut value = json!({ + "a": null, + }); + matches!( + flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).unwrap_err(), + JsonFlattenError::FieldEmptyOrNull(_) + ); + + let mut value = json!({ + "a": "", + }); + matches!( + flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).unwrap_err(), + JsonFlattenError::FieldEmptyOrNull(_) + ); + + let mut value = json!({ + "a": {"b": 1}, + }); + matches!( + flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).unwrap_err(), + JsonFlattenError::FieldIsObject(_) + ); + + let mut value = json!({ + "a": ["b", "c"], + }); + matches!( + flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).unwrap_err(), + JsonFlattenError::FieldIsArray(_) + ); + + let mut value = json!({ + "a": "b.c", + }); + matches!( + flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).unwrap_err(), + JsonFlattenError::FieldContainsPeriod(_) + ); + + let mut value = json!({ + "a": 1.0, + }); + matches!( + flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).unwrap_err(), + JsonFlattenError::FieldContainsPeriod(_) + ); + } } diff --git a/src/utils/json/mod.rs b/src/utils/json/mod.rs index 1130dd1a2..386e6347d 100644 --- a/src/utils/json/mod.rs +++ b/src/utils/json/mod.rs @@ -16,6 +16,8 @@ * */ +use std::num::NonZeroU32; + use serde_json; use serde_json::Value; @@ -24,26 +26,28 @@ pub mod flatten; pub fn flatten_json_body( body: &Value, time_partition: Option<&String>, - time_partition_limit: Option<&String>, + time_partition_limit: Option, custom_partition: Option<&String>, validation_required: bool, ) -> Result { - let nested_value = flatten::convert_to_array(flatten::flatten_json(body))?; + let mut nested_value = flatten::convert_to_array(flatten::flatten_json(body))?; flatten::flatten( - nested_value, + &mut nested_value, "_", time_partition, time_partition_limit, custom_partition, validation_required, - ) + )?; + + Ok(nested_value) } pub fn convert_array_to_object( body: &Value, time_partition: Option<&String>, - time_partition_limit: Option<&String>, + time_partition_limit: Option, custom_partition: Option<&String>, ) -> Result, anyhow::Error> { let data = flatten_json_body(