Skip to content

Commit 6dff3e1

Browse files
fix: hierarchical json flattening restriction
get the level of hierarchy from the json perform generic flattening only if level of nesting is <=4
1 parent 603b095 commit 6dff3e1

File tree

4 files changed

+135
-44
lines changed

4 files changed

+135
-44
lines changed

src/cli.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -529,7 +529,9 @@ impl FromArgMatches for Cli {
529529
self.kafka_host = m.get_one::<String>(Self::KAFKA_HOST).cloned();
530530
self.kafka_group = m.get_one::<String>(Self::KAFKA_GROUP).cloned();
531531
self.kafka_client_id = m.get_one::<String>(Self::KAFKA_CLIENT_ID).cloned();
532-
self.kafka_security_protocol = m.get_one::<SslProtocol>(Self::KAFKA_SECURITY_PROTOCOL).cloned();
532+
self.kafka_security_protocol = m
533+
.get_one::<SslProtocol>(Self::KAFKA_SECURITY_PROTOCOL)
534+
.cloned();
533535
self.kafka_partitions = m.get_one::<String>(Self::KAFKA_PARTITIONS).cloned();
534536

535537
self.tls_cert_path = m.get_one::<PathBuf>(Self::TLS_CERT).cloned();

src/kafka.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ pub enum KafkaError {
125125
fn setup_consumer() -> Result<(StreamConsumer, Vec<String>), KafkaError> {
126126
if let Some(topics) = &CONFIG.parseable.kafka_topics {
127127
// topics can be a comma separated list of topics to subscribe to
128-
let topics = topics.split(",").map(|v| v.to_owned()).collect_vec();
128+
let topics = topics.split(',').map(|v| v.to_owned()).collect_vec();
129129

130130
let host = if CONFIG.parseable.kafka_host.is_some() {
131131
CONFIG.parseable.kafka_host.as_ref()
@@ -162,8 +162,8 @@ fn setup_consumer() -> Result<(StreamConsumer, Vec<String>), KafkaError> {
162162
// partitions is a comma separated pairs of topic:partitions
163163
let mut topic_partition_pairs = Vec::new();
164164
let mut set = true;
165-
for vals in vals_raw.split(",") {
166-
let intermediate = vals.split(":").collect_vec();
165+
for vals in vals_raw.split(',') {
166+
let intermediate = vals.split(':').collect_vec();
167167
if intermediate.len() != 2 {
168168
warn!(
169169
"Value for P_KAFKA_PARTITIONS is incorrect! Skipping setting partitions!"

src/utils/json/flatten.rs

Lines changed: 96 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
use std::collections::BTreeMap;
2020
use std::num::NonZeroU32;
2121

22+
use anyhow::anyhow;
2223
use chrono::{DateTime, Duration, Utc};
2324
use serde_json::map::Map;
2425
use serde_json::value::Value;
@@ -273,52 +274,83 @@ pub fn flatten_array_objects(
273274
/// Recursively flattens a JSON value.
274275
/// - If the value is an array, it flattens all elements of the array.
275276
/// - If the value is an object, it flattens all nested objects and arrays.
277+
/// - If the JSON value is heavily nested (with more than 4 levels of hierarchy), returns error
276278
/// - Otherwise, it returns the value itself in a vector.
277279
///
278280
/// Examples:
279281
/// 1. `{"a": 1}` ~> `[{"a": 1}]`
280282
/// 2. `[{"a": 1}, {"b": 2}]` ~> `[{"a": 1}, {"b": 2}]`
281283
/// 3. `[{"a": [{"b": 1}, {"c": 2}]}]` ~> `[{"a": {"b": 1)}}, {"a": {"c": 2)}}]`
282-
/// 3. `{"a": [{"b": 1}, {"c": 2}], "d": {"e": 4}}` ~> `[{"a": {"b":1}, "d": {"e":4}}, {"a": {"c":2}, "d": {"e":4}}]`
283-
pub fn flatten_json(value: &Value) -> Vec<Value> {
284+
/// 4. `{"a": [{"b": 1}, {"c": 2}], "d": {"e": 4}}` ~> `[{"a": {"b":1}, "d": {"e":4}}, {"a": {"c":2}, "d": {"e":4}}]`
285+
/// 5. `{"a":{"b":{"c":{"d":{"e":["a","b"]}}}}}` ~> returns error - heavily nested, cannot flatten this JSON
286+
pub fn flatten_json(value: &Value) -> Result<Vec<Value>, anyhow::Error> {
287+
if has_more_than_four_levels(value, 1) {
288+
return Err(anyhow!("heavily nested, cannot flatten this JSON"));
289+
}
290+
284291
match value {
285-
Value::Array(arr) => arr.iter().flat_map(flatten_json).collect(),
286-
Value::Object(map) => map
292+
Value::Array(arr) => Ok(arr
287293
.iter()
288-
.fold(vec![Map::new()], |results, (key, val)| match val {
289-
Value::Array(arr) => arr
290-
.iter()
291-
.flat_map(flatten_json)
292-
.flat_map(|flattened_item| {
293-
results.iter().map(move |result| {
294-
let mut new_obj = result.clone();
295-
new_obj.insert(key.clone(), flattened_item.clone());
296-
new_obj
294+
.flat_map(|flatten_item| flatten_json(flatten_item).unwrap_or_default())
295+
.collect()),
296+
Value::Object(map) => {
297+
let results = map
298+
.iter()
299+
.fold(vec![Map::new()], |results, (key, val)| match val {
300+
Value::Array(arr) => arr
301+
.iter()
302+
.flat_map(|flatten_item| flatten_json(flatten_item).unwrap_or_default())
303+
.flat_map(|flattened_item| {
304+
results.iter().map(move |result| {
305+
let mut new_obj = result.clone();
306+
new_obj.insert(key.clone(), flattened_item.clone());
307+
new_obj
308+
})
309+
})
310+
.collect(),
311+
Value::Object(_) => flatten_json(val)
312+
.unwrap_or_default()
313+
.iter()
314+
.flat_map(|nested_result| {
315+
results.iter().map(move |result| {
316+
let mut new_obj = result.clone();
317+
new_obj.insert(key.clone(), nested_result.clone());
318+
new_obj
319+
})
297320
})
298-
})
299-
.collect(),
300-
Value::Object(_) => flatten_json(val)
301-
.iter()
302-
.flat_map(|nested_result| {
303-
results.iter().map(move |result| {
304-
let mut new_obj = result.clone();
305-
new_obj.insert(key.clone(), nested_result.clone());
306-
new_obj
321+
.collect(),
322+
_ => results
323+
.into_iter()
324+
.map(|mut result| {
325+
result.insert(key.clone(), val.clone());
326+
result
307327
})
308-
})
309-
.collect(),
310-
_ => results
311-
.into_iter()
312-
.map(|mut result| {
313-
result.insert(key.clone(), val.clone());
314-
result
315-
})
316-
.collect(),
317-
})
318-
.into_iter()
319-
.map(Value::Object)
320-
.collect(),
321-
_ => vec![value.clone()],
328+
.collect(),
329+
});
330+
331+
Ok(results.into_iter().map(Value::Object).collect())
332+
}
333+
_ => Ok(vec![value.clone()]),
334+
}
335+
}
336+
337+
/// recursively checks the level of nesting for the serde Value
338+
/// if Value has more than 4 levels of hierarchy, returns true
339+
/// example -
340+
/// 1. `{"a":{"b":{"c":{"d":{"e":["a","b"]}}}}}` ~> returns true
341+
/// 2. `{"a": [{"b": 1}, {"c": 2}], "d": {"e": 4}}` ~> returns false
342+
fn has_more_than_four_levels(value: &Value, current_level: usize) -> bool {
343+
if current_level > 4 {
344+
return true;
345+
}
346+
match value {
347+
Value::Array(arr) => arr
348+
.iter()
349+
.any(|item| has_more_than_four_levels(item, current_level)),
350+
Value::Object(map) => map
351+
.values()
352+
.any(|val| has_more_than_four_levels(val, current_level + 1)),
353+
_ => false,
322354
}
323355
}
324356

@@ -340,7 +372,9 @@ pub fn convert_to_array(flattened: Vec<Value>) -> Result<Value, JsonFlattenError
340372

341373
#[cfg(test)]
342374
mod tests {
343-
use crate::utils::json::flatten::flatten_array_objects;
375+
use crate::utils::json::flatten::{
376+
flatten_array_objects, flatten_json, has_more_than_four_levels,
377+
};
344378

345379
use super::{flatten, JsonFlattenError};
346380
use serde_json::{json, Map, Value};
@@ -598,4 +632,29 @@ mod tests {
598632
JsonFlattenError::FieldContainsPeriod(_)
599633
);
600634
}
635+
636+
#[test]
637+
fn unacceptable_levels_of_nested_json() {
638+
let value = json!({"a":{"b":{"c":{"d":{"e":["a","b"]}}}}});
639+
assert_eq!(has_more_than_four_levels(&value, 1), true);
640+
}
641+
642+
#[test]
643+
fn acceptable_levels_of_nested_json() {
644+
let value = json!({"a":{"b":{"e":["a","b"]}}});
645+
assert_eq!(has_more_than_four_levels(&value, 1), false);
646+
}
647+
648+
#[test]
649+
fn flatten_json_success() {
650+
let value = json!({"a":{"b":{"e":["a","b"]}}});
651+
let expected = vec![json!({"a":{"b":{"e":"a"}}}), json!({"a":{"b":{"e":"b"}}})];
652+
assert_eq!(flatten_json(&value).unwrap(), expected);
653+
}
654+
655+
#[test]
656+
fn flatten_json_error() {
657+
let value = json!({"a":{"b":{"c":{"d":{"e":["a","b"]}}}}});
658+
assert!(flatten_json(&value).is_err());
659+
}
601660
}

src/utils/json/mod.rs

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,20 @@ use serde_json::Value;
2323

2424
pub mod flatten;
2525

26+
/// calls the function `flatten_json` which results Vec<Value> or Error
27+
/// in case when Vec<Value> is returned, converts the Vec<Value> to Value of Array
28+
/// this is to ensure recursive flattening does not happen for heavily nested jsons
2629
pub fn flatten_json_body(
2730
body: &Value,
2831
time_partition: Option<&String>,
2932
time_partition_limit: Option<NonZeroU32>,
3033
custom_partition: Option<&String>,
3134
validation_required: bool,
3235
) -> Result<Value, anyhow::Error> {
33-
let mut nested_value = flatten::convert_to_array(flatten::flatten_json(body))?;
34-
36+
let mut nested_value = body.clone();
37+
if let Ok(flattened_json) = flatten::flatten_json(body) {
38+
nested_value = flatten::convert_to_array(flattened_json)?;
39+
}
3540
flatten::flatten(
3641
&mut nested_value,
3742
"_",
@@ -40,7 +45,6 @@ pub fn flatten_json_body(
4045
custom_partition,
4146
validation_required,
4247
)?;
43-
4448
Ok(nested_value)
4549
}
4650

@@ -84,3 +88,29 @@ pub fn convert_to_string(value: &Value) -> Value {
8488
}
8589
}
8690
}
91+
92+
#[cfg(test)]
93+
mod tests {
94+
use super::flatten_json_body;
95+
use serde_json::json;
96+
97+
#[test]
98+
fn hierarchical_json_flattening_success() {
99+
let value = json!({"a":{"b":{"e":["a","b"]}}});
100+
let expected = json!([{"a_b_e": "a"}, {"a_b_e": "b"}]);
101+
assert_eq!(
102+
flatten_json_body(&value, None, None, None, false).unwrap(),
103+
expected
104+
);
105+
}
106+
107+
#[test]
108+
fn hierarchical_json_flattening_failure() {
109+
let value = json!({"a":{"b":{"c":{"d":{"e":["a","b"]}}}}});
110+
let expected = json!({"a_b_c_d_e": ["a","b"]});
111+
assert_eq!(
112+
flatten_json_body(&value, None, None, None, false).unwrap(),
113+
expected
114+
);
115+
}
116+
}

0 commit comments

Comments
 (0)