Skip to content

Commit 8c57c61

Browse files
Merge branch 'main' into fix-oss-after-enterprise
2 parents 5171459 + ad12b8d commit 8c57c61

File tree

5 files changed

+57
-30
lines changed

5 files changed

+57
-30
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "parseable"
3-
version = "2.3.1"
3+
version = "2.3.2"
44
authors = ["Parseable Team <[email protected]>"]
55
edition = "2021"
66
rust-version = "1.83.0"
@@ -139,8 +139,8 @@ arrow = "54.0.0"
139139
temp-dir = "0.1.14"
140140

141141
[package.metadata.parseable_ui]
142-
assets-url = "https://parseable-prism-build.s3.us-east-2.amazonaws.com/v2.3.1/build.zip"
143-
assets-sha1 = "2d420243ae85b730b201ab1c15b44fae57af992b"
142+
assets-url = "https://parseable-prism-build.s3.us-east-2.amazonaws.com/v2.3.2/build.zip"
143+
assets-sha1 = "35cfa3ab692ab0debf6666e5e2e1876fa7de4a02"
144144

145145
[features]
146146
debug = []

src/handlers/http/kinesis.rs

Lines changed: 48 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ use serde::{Deserialize, Serialize};
2121
use serde_json::{Map, Value};
2222
use std::str;
2323

24+
use crate::utils::json::flatten::{generic_flattening, has_more_than_max_allowed_levels};
25+
2426
#[derive(Serialize, Deserialize, Debug)]
2527
#[serde(rename_all = "camelCase")]
2628
pub struct Message {
@@ -57,29 +59,56 @@ struct Data {
5759
// "requestId": "b858288a-f5d8-4181-a746-3f3dd716be8a",
5860
// "timestamp": "1704964113659"
5961
// }
60-
pub fn flatten_kinesis_logs(message: Message) -> Vec<Value> {
62+
pub async fn flatten_kinesis_logs(message: Message) -> Result<Vec<Value>, anyhow::Error> {
6163
let mut vec_kinesis_json = Vec::new();
6264

6365
for record in message.records.iter() {
64-
let bytes = STANDARD.decode(record.data.clone()).unwrap();
65-
let json_string: String = String::from_utf8(bytes).unwrap();
66-
let json: serde_json::Value = serde_json::from_str(&json_string).unwrap();
67-
let mut kinesis_json: Map<String, Value> = match serde_json::from_value(json) {
68-
Ok(value) => value,
69-
Err(error) => panic!("Failed to deserialize JSON: {}", error),
70-
};
71-
72-
kinesis_json.insert(
73-
"requestId".to_owned(),
74-
Value::String(message.request_id.clone()),
75-
);
76-
kinesis_json.insert(
77-
"timestamp".to_owned(),
78-
Value::String(message.timestamp.to_string()),
79-
);
66+
let bytes = STANDARD.decode(record.data.clone())?;
67+
if let Ok(json_string) = String::from_utf8(bytes) {
68+
let json: serde_json::Value = serde_json::from_str(&json_string)?;
69+
// Check if the JSON has more than the allowed levels of nesting
70+
// If it has less than or equal to the allowed levels, we flatten it.
71+
// If it has more than the allowed levels, we just push it as is
72+
// without flattening or modifying it.
73+
if !has_more_than_max_allowed_levels(&json, 1) {
74+
let flattened_json_arr = generic_flattening(&json)?;
75+
for flattened_json in flattened_json_arr {
76+
let mut kinesis_json: Map<String, Value> =
77+
serde_json::from_value(flattened_json)?;
78+
kinesis_json.insert(
79+
"requestId".to_owned(),
80+
Value::String(message.request_id.clone()),
81+
);
82+
kinesis_json.insert(
83+
"timestamp".to_owned(),
84+
Value::String(message.timestamp.to_string()),
85+
);
8086

81-
vec_kinesis_json.push(Value::Object(kinesis_json));
87+
vec_kinesis_json.push(Value::Object(kinesis_json));
88+
}
89+
} else {
90+
// If the JSON has more than the allowed levels, we just push it as is
91+
// without flattening or modifying it.
92+
// This is a fallback to ensure we don't lose data.
93+
tracing::warn!(
94+
"Kinesis log with requestId {} and timestamp {} has more than the allowed levels of nesting, skipping flattening for this record.",
95+
message.request_id, message.timestamp
96+
);
97+
vec_kinesis_json.push(json);
98+
}
99+
} else {
100+
tracing::error!(
101+
"Failed to decode base64 data for kinesis log with requestId {} and timestamp {}",
102+
message.request_id,
103+
message.timestamp
104+
);
105+
return Err(anyhow::anyhow!(
106+
"Failed to decode base64 data for record with requestId {} and timestamp {}",
107+
message.request_id,
108+
message.timestamp
109+
));
110+
}
82111
}
83112

84-
vec_kinesis_json
113+
Ok(vec_kinesis_json)
85114
}

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,9 @@ pub async fn flatten_and_push_logs(
6161
LogSource::Kinesis => {
6262
//custom flattening required for Amazon Kinesis
6363
let message: Message = serde_json::from_value(json)?;
64-
for record in flatten_kinesis_logs(message) {
65-
push_logs(stream_name, record, log_source, p_custom_fields).await?;
66-
}
64+
let flattened_kinesis_data = flatten_kinesis_logs(message).await?;
65+
let record = convert_to_array(flattened_kinesis_data)?;
66+
push_logs(stream_name, record, log_source, p_custom_fields).await?;
6767
}
6868
LogSource::OtelLogs => {
6969
//custom flattening required for otel logs

src/utils/json/mod.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,8 @@ pub fn flatten_json_body(
4444
// Flatten the json body only if new schema and has less than 4 levels of nesting
4545
let mut nested_value = if schema_version == SchemaVersion::V1
4646
&& !has_more_than_max_allowed_levels(&body, 1)
47-
&& matches!(
48-
log_source,
49-
LogSource::Json | LogSource::Custom(_) | LogSource::Kinesis
50-
) {
47+
&& matches!(log_source, LogSource::Json | LogSource::Custom(_))
48+
{
5149
let flattened_json = generic_flattening(&body)?;
5250
convert_to_array(flattened_json)?
5351
} else {

0 commit comments

Comments
 (0)