Skip to content

Commit e2a1fc3

Browse files
author
Devdutt Shenoi
committed
refactor: accept array in push_logs
1 parent e732821 commit e2a1fc3

File tree

1 file changed

+44
-51
lines changed

1 file changed

+44
-51
lines changed

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 44 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -39,45 +39,36 @@ pub async fn flatten_and_push_logs(
3939
stream_name: &str,
4040
log_source: &LogSource,
4141
) -> Result<(), PostError> {
42-
match log_source {
42+
let json = match log_source {
4343
LogSource::Kinesis => {
4444
//custom flattening required for Amazon Kinesis
4545
let message: Message = serde_json::from_value(json)?;
46-
for record in flatten_kinesis_logs(message) {
47-
push_logs(stream_name, record, &LogSource::default()).await?;
48-
}
46+
flatten_kinesis_logs(message)
4947
}
5048
LogSource::OtelLogs => {
5149
//custom flattening required for otel logs
5250
let logs: LogsData = serde_json::from_value(json)?;
53-
for record in flatten_otel_logs(&logs) {
54-
push_logs(stream_name, record, log_source).await?;
55-
}
51+
flatten_otel_logs(&logs)
5652
}
5753
LogSource::OtelTraces => {
5854
//custom flattening required for otel traces
5955
let traces: TracesData = serde_json::from_value(json)?;
60-
for record in flatten_otel_traces(&traces) {
61-
push_logs(stream_name, record, log_source).await?;
62-
}
56+
flatten_otel_traces(&traces)
6357
}
6458
LogSource::OtelMetrics => {
6559
//custom flattening required for otel metrics
6660
let metrics: MetricsData = serde_json::from_value(json)?;
67-
for record in flatten_otel_metrics(metrics) {
68-
push_logs(stream_name, record, log_source).await?;
69-
}
61+
flatten_otel_metrics(metrics)
7062
}
71-
_ => {
72-
push_logs(stream_name, json, log_source).await?;
73-
}
74-
}
63+
_ => vec![json],
64+
};
65+
push_logs(stream_name, json, log_source).await?;
7566
Ok(())
7667
}
7768

7869
async fn push_logs(
7970
stream_name: &str,
80-
json: Value,
71+
jsons: Vec<Value>,
8172
log_source: &LogSource,
8273
) -> Result<(), PostError> {
8374
let stream = PARSEABLE.get_stream(stream_name)?;
@@ -89,42 +80,44 @@ async fn push_logs(
8980
let custom_partition = stream.get_custom_partition();
9081
let schema_version = stream.get_schema_version();
9182
let p_timestamp = Utc::now();
92-
93-
let data = if time_partition.is_some() || custom_partition.is_some() {
94-
convert_array_to_object(
95-
json,
96-
time_partition.as_ref(),
97-
time_partition_limit,
98-
custom_partition.as_ref(),
99-
schema_version,
100-
log_source,
101-
)?
102-
} else {
103-
vec![convert_to_array(convert_array_to_object(
104-
json,
105-
None,
106-
None,
107-
None,
108-
schema_version,
109-
log_source,
110-
)?)?]
111-
};
112-
113-
for json in data {
114-
let origin_size = serde_json::to_vec(&json).unwrap().len() as u64; // string length need not be the same as byte length
115-
let schema = PARSEABLE.get_stream(stream_name)?.get_schema_raw();
116-
json::Event { json, p_timestamp }
117-
.into_event(
118-
stream_name.to_owned(),
119-
origin_size,
120-
&schema,
121-
static_schema_flag,
122-
custom_partition.as_ref(),
83+
84+
for json in jsons {
85+
let data = if time_partition.is_some() || custom_partition.is_some() {
86+
convert_array_to_object(
87+
json,
12388
time_partition.as_ref(),
89+
time_partition_limit,
90+
custom_partition.as_ref(),
12491
schema_version,
125-
StreamType::UserDefined,
92+
log_source,
12693
)?
127-
.process()?;
94+
} else {
95+
vec![convert_to_array(convert_array_to_object(
96+
json,
97+
None,
98+
None,
99+
None,
100+
schema_version,
101+
log_source,
102+
)?)?]
103+
};
104+
105+
for json in data {
106+
let origin_size = serde_json::to_vec(&json).unwrap().len() as u64; // string length need not be the same as byte length
107+
let schema = PARSEABLE.get_stream(stream_name)?.get_schema_raw();
108+
json::Event { json, p_timestamp }
109+
.into_event(
110+
stream_name.to_owned(),
111+
origin_size,
112+
&schema,
113+
static_schema_flag,
114+
custom_partition.as_ref(),
115+
time_partition.as_ref(),
116+
schema_version,
117+
StreamType::UserDefined,
118+
)?
119+
.process()?;
120+
}
128121
}
129122
Ok(())
130123
}

0 commit comments

Comments
 (0)