Skip to content

Commit 303ba35

Browse files
author
Devdutt Shenoi
committed
refactor: perform flattening in to_data alone
1 parent d604b65 commit 303ba35

File tree

5 files changed

+87
-74
lines changed

5 files changed

+87
-74
lines changed

src/event/format/json.rs

Lines changed: 74 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,9 @@ use arrow_schema::{DataType, Field, Fields, Schema};
2626
use chrono::{DateTime, NaiveDateTime, Utc};
2727
use datafusion::arrow::util::bit_util::round_upto_multiple_of_64;
2828
use itertools::Itertools;
29+
use opentelemetry_proto::tonic::{
30+
logs::v1::LogsData, metrics::v1::MetricsData, trace::v1::TracesData,
31+
};
2932
use serde_json::Value;
3033
use std::{
3134
collections::{HashMap, HashSet},
@@ -36,7 +39,9 @@ use tracing::error;
3639

3740
use super::{EventFormat, LogSource};
3841
use crate::{
42+
kinesis::{flatten_kinesis_logs, Message},
3943
metadata::SchemaVersion,
44+
otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces},
4045
storage::StreamType,
4146
utils::{
4247
arrow::get_field,
@@ -58,6 +63,64 @@ impl Event {
5863
}
5964
}
6065

66+
pub fn flatten_logs(
67+
json: Value,
68+
time_partition: Option<&String>,
69+
time_partition_limit: Option<NonZeroU32>,
70+
custom_partitions: Option<&String>,
71+
schema_version: SchemaVersion,
72+
log_source: &LogSource,
73+
) -> Result<Vec<Value>, anyhow::Error> {
74+
let data = match log_source {
75+
LogSource::Kinesis => {
76+
//custom flattening required for Amazon Kinesis
77+
let message: Message = serde_json::from_value(json)?;
78+
flatten_kinesis_logs(message)
79+
}
80+
LogSource::OtelLogs => {
81+
//custom flattening required for otel logs
82+
let logs: LogsData = serde_json::from_value(json)?;
83+
flatten_otel_logs(&logs)
84+
}
85+
LogSource::OtelTraces => {
86+
//custom flattening required for otel traces
87+
let traces: TracesData = serde_json::from_value(json)?;
88+
flatten_otel_traces(&traces)
89+
}
90+
LogSource::OtelMetrics => {
91+
//custom flattening required for otel metrics
92+
let metrics: MetricsData = serde_json::from_value(json)?;
93+
flatten_otel_metrics(metrics)
94+
}
95+
_ => vec![json],
96+
};
97+
98+
let mut logs = vec![];
99+
for json in data {
100+
if time_partition.is_some() || custom_partitions.is_some() {
101+
logs.append(&mut convert_array_to_object(
102+
json,
103+
time_partition,
104+
time_partition_limit,
105+
custom_partitions,
106+
schema_version,
107+
log_source,
108+
)?)
109+
} else {
110+
logs.push(convert_to_array(convert_array_to_object(
111+
json,
112+
None,
113+
None,
114+
None,
115+
schema_version,
116+
log_source,
117+
)?)?)
118+
}
119+
}
120+
121+
Ok(logs)
122+
}
123+
61124
impl EventFormat for Event {
62125
type Data = Vec<Value>;
63126

@@ -73,29 +136,18 @@ impl EventFormat for Event {
73136
stored_schema: &HashMap<String, Arc<Field>>,
74137
time_partition: Option<&String>,
75138
time_partition_limit: Option<NonZeroU32>,
76-
custom_partition: Option<&String>,
139+
custom_partitions: Option<&String>,
77140
schema_version: SchemaVersion,
78141
log_source: &LogSource,
79142
) -> Result<(Self::Data, Vec<Arc<Field>>, bool), anyhow::Error> {
80-
let flattened = if time_partition.is_some() || custom_partition.is_some() {
81-
convert_array_to_object(
82-
self.json,
83-
time_partition,
84-
time_partition_limit,
85-
custom_partition,
86-
schema_version,
87-
log_source,
88-
)?
89-
} else {
90-
vec![convert_to_array(convert_array_to_object(
91-
self.json,
92-
None,
93-
None,
94-
None,
95-
schema_version,
96-
log_source,
97-
)?)?]
98-
};
143+
let flattened = flatten_logs(
144+
self.json,
145+
time_partition,
146+
time_partition_limit,
147+
custom_partitions,
148+
schema_version,
149+
log_source,
150+
)?;
99151

100152
// collect all the keys from all the json objects in the request body
101153
let fields =
@@ -175,8 +227,8 @@ impl EventFormat for Event {
175227
stream_type: StreamType,
176228
) -> Result<super::Event, anyhow::Error> {
177229
let custom_partition_values = match custom_partitions.as_ref() {
178-
Some(custom_partition) => {
179-
let custom_partitions = custom_partition.split(',').collect_vec();
230+
Some(custom_partitions) => {
231+
let custom_partitions = custom_partitions.split(',').collect_vec();
180232
extract_custom_partition_values(&self.json, &custom_partitions)
181233
}
182234
None => HashMap::new(),

src/event/format/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ pub trait EventFormat: Sized {
113113
/// Returns the UTC time at ingestion
114114
fn get_p_timestamp(&self) -> DateTime<Utc>;
115115

116+
#[allow(clippy::too_many_arguments)]
116117
fn into_recordbatch(
117118
self,
118119
storage_schema: &HashMap<String, Arc<Field>>,

src/handlers/http/ingest.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use crate::utils::header_parsing::ParseHeaderError;
3838
use crate::utils::json::flatten::JsonFlattenError;
3939

4040
use super::logstream::error::{CreateStreamError, StreamError};
41-
use super::modal::utils::ingest_utils::flatten_and_push_logs;
41+
use super::modal::utils::ingest_utils::push_logs;
4242
use super::users::dashboards::DashboardError;
4343
use super::users::filters::FiltersError;
4444

@@ -72,7 +72,7 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
7272
return Err(PostError::OtelNotSupported);
7373
}
7474

75-
flatten_and_push_logs(json, &stream_name, &log_source).await?;
75+
push_logs(&stream_name, json, &log_source).await?;
7676

7777
Ok(HttpResponse::Ok().finish())
7878
}
@@ -125,7 +125,7 @@ pub async fn handle_otel_logs_ingestion(
125125
.create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::OtelLogs)
126126
.await?;
127127

128-
flatten_and_push_logs(json, &stream_name, &log_source).await?;
128+
push_logs(&stream_name, json, &log_source).await?;
129129

130130
Ok(HttpResponse::Ok().finish())
131131
}
@@ -156,7 +156,7 @@ pub async fn handle_otel_metrics_ingestion(
156156
)
157157
.await?;
158158

159-
flatten_and_push_logs(json, &stream_name, &log_source).await?;
159+
push_logs(&stream_name, json, &log_source).await?;
160160

161161
Ok(HttpResponse::Ok().finish())
162162
}
@@ -184,7 +184,7 @@ pub async fn handle_otel_traces_ingestion(
184184
.create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::OtelTraces)
185185
.await?;
186186

187-
flatten_and_push_logs(json, &stream_name, &log_source).await?;
187+
push_logs(&stream_name, json, &log_source).await?;
188188

189189
Ok(HttpResponse::Ok().finish())
190190
}
@@ -233,7 +233,7 @@ pub async fn post_event(
233233
return Err(PostError::OtelNotSupported);
234234
}
235235

236-
flatten_and_push_logs(json, &stream_name, &log_source).await?;
236+
push_logs(&stream_name, json, &log_source).await?;
237237

238238
Ok(HttpResponse::Ok().finish())
239239
}

src/handlers/http/modal/ingest/ingestor_ingest.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
use actix_web::{HttpRequest, HttpResponse};
2020
use bytes::Bytes;
2121

22-
use crate::{handlers::http::{ingest::PostError, modal::utils::ingest_utils::flatten_and_push_logs}, metadata::PARSEABLE.streams};
22+
use crate::{handlers::http::{ingest::PostError, modal::utils::ingest_utils::push_logs}, metadata::PARSEABLE.streams};
2323

2424

2525
// Handler for POST /api/v1/logstream/{logstream}
@@ -38,6 +38,6 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result<HttpResponse, P
3838
return Err(PostError::StreamNotFound(stream_name));
3939
}
4040

41-
flatten_and_push_logs(req, body, stream_name).await?;
41+
push_logs(req, body, stream_name).await?;
4242
Ok(HttpResponse::Ok().finish())
4343
}

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 4 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -17,57 +17,18 @@
1717
*/
1818

1919
use chrono::Utc;
20-
use opentelemetry_proto::tonic::{
21-
logs::v1::LogsData, metrics::v1::MetricsData, trace::v1::TracesData,
22-
};
2320
use serde_json::Value;
2421

2522
use crate::{
2623
event::format::{json, EventFormat, LogSource},
27-
handlers::http::{
28-
ingest::PostError,
29-
kinesis::{flatten_kinesis_logs, Message},
30-
},
31-
otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces},
24+
handlers::http::ingest::PostError,
3225
parseable::PARSEABLE,
3326
storage::StreamType,
3427
};
3528

36-
pub async fn flatten_and_push_logs(
37-
json: Value,
29+
pub async fn push_logs(
3830
stream_name: &str,
39-
log_source: &LogSource,
40-
) -> Result<(), PostError> {
41-
let json = match log_source {
42-
LogSource::Kinesis => {
43-
//custom flattening required for Amazon Kinesis
44-
let message: Message = serde_json::from_value(json)?;
45-
flatten_kinesis_logs(message)
46-
}
47-
LogSource::OtelLogs => {
48-
//custom flattening required for otel logs
49-
let logs: LogsData = serde_json::from_value(json)?;
50-
flatten_otel_logs(&logs)
51-
}
52-
LogSource::OtelTraces => {
53-
//custom flattening required for otel traces
54-
let traces: TracesData = serde_json::from_value(json)?;
55-
flatten_otel_traces(&traces)
56-
}
57-
LogSource::OtelMetrics => {
58-
//custom flattening required for otel metrics
59-
let metrics: MetricsData = serde_json::from_value(json)?;
60-
flatten_otel_metrics(metrics)
61-
}
62-
_ => vec![json],
63-
};
64-
push_logs(stream_name, json, log_source).await?;
65-
Ok(())
66-
}
67-
68-
async fn push_logs(
69-
stream_name: &str,
70-
jsons: Vec<Value>,
31+
json: Value,
7132
log_source: &LogSource,
7233
) -> Result<(), PostError> {
7334
let stream = PARSEABLE.get_stream(stream_name)?;
@@ -80,7 +41,6 @@ async fn push_logs(
8041
let schema_version = stream.get_schema_version();
8142
let p_timestamp = Utc::now();
8243

83-
for json in jsons {
8444
let origin_size = serde_json::to_vec(&json).unwrap().len() as u64; // string length need not be the same as byte length
8545
let schema = PARSEABLE.get_stream(stream_name)?.get_schema_raw();
8646
json::Event { json, p_timestamp }
@@ -97,7 +57,7 @@ async fn push_logs(
9757
StreamType::UserDefined,
9858
)?
9959
.process()?;
100-
}
60+
10161

10262
Ok(())
10363
}

0 commit comments

Comments
 (0)