Skip to content

Commit adf87dd

Browse files
fix: ingestion flow and generic flattening
1 parent e645168 commit adf87dd

File tree

3 files changed

+195
-63
lines changed

3 files changed

+195
-63
lines changed

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 157 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,12 @@ use anyhow::anyhow;
2323
use arrow_schema::Field;
2424
use bytes::Bytes;
2525
use chrono::{DateTime, NaiveDateTime, Utc};
26-
use itertools::Itertools;
2726
use serde_json::Value;
2827

2928
use crate::{
3029
event::{
30+
self,
3131
format::{self, EventFormat},
32-
Event,
3332
},
3433
handlers::{
3534
http::{ingest::PostError, kinesis},
@@ -73,61 +72,174 @@ pub async fn push_logs(
7372
let custom_partition = STREAM_INFO.get_custom_partition(stream_name)?;
7473
let schema_version = STREAM_INFO.get_schema_version(stream_name)?;
7574
let body_val: Value = serde_json::from_slice(body)?;
76-
let data = convert_array_to_object(
77-
body_val,
78-
time_partition.as_ref(),
79-
time_partition_limit,
80-
custom_partition.as_ref(),
81-
schema_version,
82-
)?;
8375

84-
for value in data {
85-
let origin_size = serde_json::to_vec(&value).unwrap().len() as u64; // string length need not be the same as byte length
86-
let parsed_timestamp = match time_partition.as_ref() {
87-
Some(time_partition) => get_parsed_timestamp(&value, time_partition)?,
88-
_ => Utc::now().naive_utc(),
89-
};
90-
let custom_partition_values = match custom_partition.as_ref() {
91-
Some(custom_partition) => {
92-
let custom_partitions = custom_partition.split(',').collect_vec();
93-
get_custom_partition_values(&value, &custom_partitions)
76+
let size: usize = body.len();
77+
let mut parsed_timestamp = Utc::now().naive_utc();
78+
if time_partition.is_none() {
79+
if custom_partition.is_none() {
80+
let size = size as u64;
81+
create_process_record_batch(
82+
stream_name,
83+
req,
84+
body_val,
85+
static_schema_flag.as_ref(),
86+
None,
87+
parsed_timestamp,
88+
&HashMap::new(),
89+
size,
90+
schema_version,
91+
)
92+
.await?;
93+
} else {
94+
let data = convert_array_to_object(
95+
body_val,
96+
None,
97+
None,
98+
custom_partition.as_ref(),
99+
schema_version,
100+
)?;
101+
let custom_partition = custom_partition.unwrap();
102+
let custom_partition_list = custom_partition.split(',').collect::<Vec<&str>>();
103+
104+
for value in data {
105+
let custom_partition_values =
106+
get_custom_partition_values(&value, &custom_partition_list);
107+
108+
let size = value.to_string().into_bytes().len() as u64;
109+
create_process_record_batch(
110+
stream_name,
111+
req,
112+
value,
113+
static_schema_flag.as_ref(),
114+
None,
115+
parsed_timestamp,
116+
&custom_partition_values,
117+
size,
118+
schema_version,
119+
)
120+
.await?;
94121
}
95-
None => HashMap::new(),
96-
};
97-
let schema = STREAM_INFO
98-
.read()
99-
.unwrap()
100-
.get(stream_name)
101-
.ok_or(PostError::StreamNotFound(stream_name.to_owned()))?
102-
.schema
103-
.clone();
104-
let (rb, is_first_event) = into_event_batch(
105-
req,
106-
&value,
107-
schema,
108-
static_schema_flag.as_ref(),
122+
}
123+
} else if custom_partition.is_none() {
124+
let data = convert_array_to_object(
125+
body_val,
126+
time_partition.as_ref(),
127+
time_partition_limit,
128+
None,
129+
schema_version,
130+
)?;
131+
for value in data {
132+
parsed_timestamp = get_parsed_timestamp(&value, time_partition.as_ref().unwrap())?;
133+
let size = value.to_string().into_bytes().len() as u64;
134+
create_process_record_batch(
135+
stream_name,
136+
req,
137+
value,
138+
static_schema_flag.as_ref(),
139+
time_partition.as_ref(),
140+
parsed_timestamp,
141+
&HashMap::new(),
142+
size,
143+
schema_version,
144+
)
145+
.await?;
146+
}
147+
} else {
148+
let data = convert_array_to_object(
149+
body_val,
109150
time_partition.as_ref(),
151+
time_partition_limit,
152+
custom_partition.as_ref(),
110153
schema_version,
111154
)?;
155+
let custom_partition = custom_partition.unwrap();
156+
let custom_partition_list = custom_partition.split(',').collect::<Vec<&str>>();
112157

113-
Event {
114-
rb,
115-
stream_name: stream_name.to_owned(),
116-
origin_format: "json",
117-
origin_size,
118-
is_first_event,
119-
parsed_timestamp,
120-
time_partition: time_partition.clone(),
121-
custom_partition_values,
122-
stream_type: StreamType::UserDefined,
158+
for value in data {
159+
let custom_partition_values =
160+
get_custom_partition_values(&value, &custom_partition_list);
161+
162+
parsed_timestamp = get_parsed_timestamp(&value, time_partition.as_ref().unwrap())?;
163+
let size = value.to_string().into_bytes().len() as u64;
164+
create_process_record_batch(
165+
stream_name,
166+
req,
167+
value,
168+
static_schema_flag.as_ref(),
169+
time_partition.as_ref(),
170+
parsed_timestamp,
171+
&custom_partition_values,
172+
size,
173+
schema_version,
174+
)
175+
.await?;
123176
}
124-
.process()
125-
.await?;
126177
}
127178

128179
Ok(())
129180
}
130181

182+
#[allow(clippy::too_many_arguments)]
183+
pub async fn create_process_record_batch(
184+
stream_name: &str,
185+
req: &HttpRequest,
186+
value: Value,
187+
static_schema_flag: Option<&String>,
188+
time_partition: Option<&String>,
189+
parsed_timestamp: NaiveDateTime,
190+
custom_partition_values: &HashMap<String, String>,
191+
origin_size: u64,
192+
schema_version: SchemaVersion,
193+
) -> Result<(), PostError> {
194+
let (rb, is_first_event) = get_stream_schema(
195+
stream_name,
196+
req,
197+
&value,
198+
static_schema_flag,
199+
time_partition,
200+
schema_version,
201+
)?;
202+
event::Event {
203+
rb,
204+
stream_name: stream_name.to_owned(),
205+
origin_format: "json",
206+
origin_size,
207+
is_first_event,
208+
parsed_timestamp,
209+
time_partition: time_partition.cloned(),
210+
custom_partition_values: custom_partition_values.clone(),
211+
stream_type: StreamType::UserDefined,
212+
}
213+
.process()
214+
.await?;
215+
216+
Ok(())
217+
}
218+
219+
pub fn get_stream_schema(
220+
stream_name: &str,
221+
req: &HttpRequest,
222+
body: &Value,
223+
static_schema_flag: Option<&String>,
224+
time_partition: Option<&String>,
225+
schema_version: SchemaVersion,
226+
) -> Result<(arrow_array::RecordBatch, bool), PostError> {
227+
let hash_map = STREAM_INFO.read().unwrap();
228+
let schema = hash_map
229+
.get(stream_name)
230+
.ok_or(PostError::StreamNotFound(stream_name.to_owned()))?
231+
.schema
232+
.clone();
233+
into_event_batch(
234+
req,
235+
body,
236+
schema,
237+
static_schema_flag,
238+
time_partition,
239+
schema_version,
240+
)
241+
}
242+
131243
pub fn into_event_batch(
132244
req: &HttpRequest,
133245
body: &Value,

src/utils/json/flatten.rs

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
use std::collections::BTreeMap;
2020
use std::num::NonZeroU32;
2121

22-
use anyhow::anyhow;
2322
use chrono::{DateTime, Duration, Utc};
2423
use serde_json::map::Map;
2524
use serde_json::value::Value;
@@ -50,6 +49,8 @@ pub enum JsonFlattenError {
5049
ExpectedObjectInArray,
5150
#[error("Found non-object element while flattening array of objects")]
5251
NonObjectInArray,
52+
#[error("heavily nested, cannot flatten this JSON")]
53+
HeavilyNestedJson,
5354
}
5455

5556
// Recursively flattens JSON objects and arrays, e.g. with the separator `.`, starting from the TOP
@@ -283,23 +284,25 @@ pub fn flatten_array_objects(
283284
/// 3. `[{"a": [{"b": 1}, {"c": 2}]}]` ~> `[{"a": {"b": 1)}}, {"a": {"c": 2)}}]`
284285
/// 4. `{"a": [{"b": 1}, {"c": 2}], "d": {"e": 4}}` ~> `[{"a": {"b":1}, "d": {"e":4}}, {"a": {"c":2}, "d": {"e":4}}]`
285286
/// 5. `{"a":{"b":{"c":{"d":{"e":["a","b"]}}}}}` ~> returns error - heavily nested, cannot flatten this JSON
286-
fn flattening_helper(value: &Value) -> Result<Vec<Value>, anyhow::Error> {
287+
pub fn generic_flattening(value: &Value) -> Result<Vec<Value>, JsonFlattenError> {
287288
if has_more_than_four_levels(value, 1) {
288-
return Err(anyhow!("heavily nested, cannot flatten this JSON"));
289+
return Err(JsonFlattenError::HeavilyNestedJson);
289290
}
290291

291292
match value {
292293
Value::Array(arr) => Ok(arr
293294
.iter()
294-
.flat_map(|flatten_item| flattening_helper(flatten_item).unwrap_or_default())
295+
.flat_map(|flatten_item| generic_flattening(flatten_item).unwrap_or_default())
295296
.collect()),
296297
Value::Object(map) => {
297298
let results = map
298299
.iter()
299300
.fold(vec![Map::new()], |results, (key, val)| match val {
300301
Value::Array(arr) => arr
301302
.iter()
302-
.flat_map(|flatten_item| flattening_helper(flatten_item).unwrap_or_default())
303+
.flat_map(|flatten_item| {
304+
generic_flattening(flatten_item).unwrap_or_default()
305+
})
303306
.flat_map(|flattened_item| {
304307
results.iter().map(move |result| {
305308
let mut new_obj = result.clone();
@@ -308,7 +311,7 @@ fn flattening_helper(value: &Value) -> Result<Vec<Value>, anyhow::Error> {
308311
})
309312
})
310313
.collect(),
311-
Value::Object(_) => flattening_helper(val)
314+
Value::Object(_) => generic_flattening(val)
312315
.unwrap_or_default()
313316
.iter()
314317
.flat_map(|nested_result| {
@@ -355,20 +358,19 @@ fn has_more_than_four_levels(value: &Value, current_level: usize) -> bool {
355358
}
356359

357360
// Converts a Vector of values into a `Value::Array`, as long as all of them are objects
358-
pub fn generic_flattening(json: Value) -> Result<Value, JsonFlattenError> {
359-
let mut flattened = Vec::new();
360-
for item in flattening_helper(&json).unwrap_or_default() {
361+
pub fn convert_to_array(flattened: Vec<Value>) -> Result<Value, JsonFlattenError> {
362+
let mut result = Vec::new();
363+
for item in flattened {
361364
let mut map = Map::new();
362365
let Some(item) = item.as_object() else {
363366
return Err(JsonFlattenError::ExpectedObjectInArray);
364367
};
365368
for (key, value) in item {
366369
map.insert(key.clone(), value.clone());
367370
}
368-
flattened.push(Value::Object(map));
371+
result.push(Value::Object(map));
369372
}
370-
371-
Ok(Value::Array(flattened))
373+
Ok(Value::Array(result))
372374
}
373375

374376
#[cfg(test)]
@@ -649,13 +651,13 @@ mod tests {
649651
#[test]
650652
fn flatten_json_success() {
651653
let value = json!({"a":{"b":{"e":["a","b"]}}});
652-
let expected = json!([{"a":{"b":{"e":"a"}}},{"a":{"b":{"e":"b"}}}]);
653-
assert_eq!(generic_flattening(value).unwrap(), expected);
654+
let expected = vec![json!({"a":{"b":{"e":"a"}}}), json!({"a":{"b":{"e":"b"}}})];
655+
assert_eq!(generic_flattening(&value).unwrap(), expected);
654656
}
655657

656658
#[test]
657659
fn flatten_json_error() {
658660
let value = json!({"a":{"b":{"c":{"d":{"e":["a","b"]}}}}});
659-
assert!(generic_flattening(value).is_err());
661+
assert!(generic_flattening(&value).is_err());
660662
}
661663
}

src/utils/json/mod.rs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@ pub fn flatten_json_body(
3737
validation_required: bool,
3838
) -> Result<Value, anyhow::Error> {
3939
let mut nested_value = if schema_version == SchemaVersion::V1 {
40-
flatten::generic_flattening(body)?
40+
flatten::generic_flattening(&body)
41+
.map(flatten::convert_to_array)
42+
.unwrap_or(Ok(body))?
4143
} else {
4244
body
4345
};
@@ -105,7 +107,15 @@ mod tests {
105107
let value = json!({"a":{"b":{"e":["a","b"]}}});
106108
let expected = json!([{"a_b_e": "a"}, {"a_b_e": "b"}]);
107109
assert_eq!(
108-
flatten_json_body(value, None, None, None, crate::metadata::SchemaVersion::V1, false).unwrap(),
110+
flatten_json_body(
111+
value,
112+
None,
113+
None,
114+
None,
115+
crate::metadata::SchemaVersion::V1,
116+
false
117+
)
118+
.unwrap(),
109119
expected
110120
);
111121
}
@@ -115,7 +125,15 @@ mod tests {
115125
let value = json!({"a":{"b":{"c":{"d":{"e":["a","b"]}}}}});
116126
let expected = json!({"a_b_c_d_e": ["a","b"]});
117127
assert_eq!(
118-
flatten_json_body(value, None, None, None,crate::metadata::SchemaVersion::V1, false).unwrap(),
128+
flatten_json_body(
129+
value,
130+
None,
131+
None,
132+
None,
133+
crate::metadata::SchemaVersion::V1,
134+
false
135+
)
136+
.unwrap(),
119137
expected
120138
);
121139
}

0 commit comments

Comments
 (0)