Skip to content

Commit 02ee793

Browse files
feat: custom flattening for OTEL data
add proto files for metrics and trace add compiled rust files for metrics and trace protobuf files add separate handlers for OTEL logs, metrics and traces custom flattening added for OTEL logs and metrics
1 parent 603b095 commit 02ee793

File tree

14 files changed

+3496
-17
lines changed

14 files changed

+3496
-17
lines changed

src/handlers/http/ingest.rs

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,14 @@ use super::logstream::error::{CreateStreamError, StreamError};
2020
use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs};
2121
use super::users::dashboards::DashboardError;
2222
use super::users::filters::FiltersError;
23+
use super::{otel_logs, otel_metrics};
2324
use crate::event::{
2425
self,
2526
error::EventError,
2627
format::{self, EventFormat},
2728
};
2829
use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage;
29-
use crate::handlers::STREAM_NAME_HEADER_KEY;
30+
use crate::handlers::{LOG_SOURCE_KEY, LOG_SOURCE_OTEL, STREAM_NAME_HEADER_KEY};
3031
use crate::metadata::error::stream_info::MetadataError;
3132
use crate::metadata::STREAM_INFO;
3233
use crate::option::{Mode, CONFIG};
@@ -105,7 +106,85 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
105106
// Handler for POST /v1/logs to ingest OTEL logs
106107
// ingests events by extracting stream name from header
107108
// creates if stream does not exist
108-
pub async fn handle_otel_ingestion(
109+
pub async fn handle_otel_logs_ingestion(
110+
req: HttpRequest,
111+
body: Bytes,
112+
) -> Result<HttpResponse, PostError> {
113+
if let Some((_, stream_name)) = req
114+
.headers()
115+
.iter()
116+
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
117+
{
118+
let stream_name = stream_name.to_str().unwrap().to_owned();
119+
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
120+
121+
//flatten logs
122+
if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY)
123+
{
124+
let log_source: String = log_source.to_str().unwrap().to_owned();
125+
if log_source == LOG_SOURCE_OTEL {
126+
let mut json = otel_logs::flatten_otel_logs(&body);
127+
for record in json.iter_mut() {
128+
let body: Bytes = serde_json::to_vec(record).unwrap().into();
129+
push_logs(&stream_name, &req, &body).await?;
130+
}
131+
} else {
132+
return Err(PostError::CustomError("Unknown log source".to_string()));
133+
}
134+
} else {
135+
return Err(PostError::CustomError(
136+
"log source key header is missing".to_string(),
137+
));
138+
}
139+
} else {
140+
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
141+
}
142+
Ok(HttpResponse::Ok().finish())
143+
}
144+
145+
// Handler for POST /v1/metrics to ingest OTEL metrics
146+
// ingests events by extracting stream name from header
147+
// creates if stream does not exist
148+
pub async fn handle_otel_metrics_ingestion(
149+
req: HttpRequest,
150+
body: Bytes,
151+
) -> Result<HttpResponse, PostError> {
152+
if let Some((_, stream_name)) = req
153+
.headers()
154+
.iter()
155+
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
156+
{
157+
let stream_name = stream_name.to_str().unwrap().to_owned();
158+
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
159+
160+
//flatten logs
161+
if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY)
162+
{
163+
let log_source: String = log_source.to_str().unwrap().to_owned();
164+
if log_source == LOG_SOURCE_OTEL {
165+
let mut json = otel_metrics::flatten_otel_metrics(&body);
166+
for record in json.iter_mut() {
167+
let body: Bytes = serde_json::to_vec(record).unwrap().into();
168+
push_logs(&stream_name, &req, &body).await?;
169+
}
170+
} else {
171+
return Err(PostError::CustomError("Unknown log source".to_string()));
172+
}
173+
} else {
174+
return Err(PostError::CustomError(
175+
"log source key header is missing".to_string(),
176+
));
177+
}
178+
} else {
179+
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
180+
}
181+
Ok(HttpResponse::Ok().finish())
182+
}
183+
184+
// Handler for POST /v1/traces to ingest OTEL traces
185+
// ingests events by extracting stream name from header
186+
// creates if stream does not exist
187+
pub async fn handle_otel_traces_ingestion(
109188
req: HttpRequest,
110189
body: Bytes,
111190
) -> Result<HttpResponse, PostError> {

src/handlers/http/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ pub mod logstream;
3737
pub mod middleware;
3838
pub mod modal;
3939
pub mod oidc;
40+
pub mod otel;
41+
pub mod otel_logs;
42+
pub mod otel_metrics;
4043
pub mod query;
4144
pub mod rbac;
4245
pub mod role;

src/handlers/http/modal/server.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ impl Server {
398398
web::resource("/logs")
399399
.route(
400400
web::post()
401-
.to(ingest::handle_otel_ingestion)
401+
.to(ingest::handle_otel_logs_ingestion)
402402
.authorize_for_stream(Action::Ingest),
403403
)
404404
.app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)),
@@ -407,7 +407,7 @@ impl Server {
407407
web::resource("/metrics")
408408
.route(
409409
web::post()
410-
.to(ingest::handle_otel_ingestion)
410+
.to(ingest::handle_otel_metrics_ingestion)
411411
.authorize_for_stream(Action::Ingest),
412412
)
413413
.app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)),
@@ -416,7 +416,7 @@ impl Server {
416416
web::resource("/traces")
417417
.route(
418418
web::post()
419-
.to(ingest::handle_otel_ingestion)
419+
.to(ingest::handle_otel_traces_ingestion)
420420
.authorize_for_stream(Action::Ingest),
421421
)
422422
.app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)),

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@
1616
*
1717
*/
1818

19-
use std::{collections::HashMap, sync::Arc};
19+
use std::{
20+
collections::{BTreeMap, HashMap},
21+
sync::Arc,
22+
};
2023

2124
use actix_web::HttpRequest;
2225
use arrow_schema::Field;
@@ -30,8 +33,8 @@ use crate::{
3033
format::{self, EventFormat},
3134
},
3235
handlers::{
33-
http::{ingest::PostError, kinesis},
34-
LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, PREFIX_META, PREFIX_TAGS, SEPARATOR,
36+
http::{ingest::PostError, kinesis, otel_logs},
37+
LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, LOG_SOURCE_OTEL, PREFIX_META, PREFIX_TAGS, SEPARATOR,
3538
},
3639
metadata::STREAM_INFO,
3740
storage::StreamType,
@@ -43,14 +46,21 @@ pub async fn flatten_and_push_logs(
4346
body: Bytes,
4447
stream_name: &str,
4548
) -> Result<(), PostError> {
46-
let log_source = req
47-
.headers()
48-
.get(LOG_SOURCE_KEY)
49-
.map(|header| header.to_str().unwrap_or_default())
50-
.unwrap_or_default();
51-
if log_source == LOG_SOURCE_KINESIS {
52-
let json = kinesis::flatten_kinesis_logs(&body);
53-
for record in json.iter() {
49+
//flatten logs
50+
if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY) {
51+
let mut json: Vec<BTreeMap<String, Value>> = Vec::new();
52+
let log_source: String = log_source.to_str().unwrap().to_owned();
53+
match log_source.as_str() {
54+
LOG_SOURCE_KINESIS => json = kinesis::flatten_kinesis_logs(&body),
55+
LOG_SOURCE_OTEL => {
56+
json = otel_logs::flatten_otel_logs(&body);
57+
}
58+
_ => {
59+
tracing::warn!("Unknown log source: {}", log_source);
60+
push_logs(stream_name, &req, &body).await?;
61+
}
62+
}
63+
for record in json.iter_mut() {
5464
let body: Bytes = serde_json::to_vec(record).unwrap().into();
5565
push_logs(stream_name, &req, &body).await?;
5666
}

src/handlers/http/otel.rs

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as
6+
* published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*
17+
*/
18+
pub mod proto;
19+
use proto::common::v1::KeyValue;
20+
use serde_json::Value;
21+
use std::collections::BTreeMap;
22+
// Value can be one of types - String, Bool, Int, Double, ArrayValue, AnyValue, KeyValueList, Byte
23+
pub fn collect_json_from_any_value(
24+
key: &String,
25+
value: super::otel::proto::common::v1::Value,
26+
) -> BTreeMap<String, Value> {
27+
let mut value_json: BTreeMap<String, Value> = BTreeMap::new();
28+
if value.str_val.is_some() {
29+
value_json.insert(
30+
key.to_string(),
31+
Value::String(value.str_val.as_ref().unwrap().to_owned()),
32+
);
33+
}
34+
if value.bool_val.is_some() {
35+
value_json.insert(key.to_string(), Value::Bool(value.bool_val.unwrap()));
36+
}
37+
if value.int_val.is_some() {
38+
value_json.insert(
39+
key.to_string(),
40+
Value::String(value.int_val.as_ref().unwrap().to_owned()),
41+
);
42+
}
43+
if value.double_val.is_some() {
44+
value_json.insert(
45+
key.to_string(),
46+
Value::Number(serde_json::Number::from_f64(value.double_val.unwrap()).unwrap()),
47+
);
48+
}
49+
50+
//ArrayValue is a vector of AnyValue
51+
//traverse by recursively calling the same function
52+
if value.array_val.is_some() {
53+
let array_val = value.array_val.as_ref().unwrap();
54+
let values = &array_val.values;
55+
for value in values {
56+
let array_value_json = collect_json_from_any_value(key, value.clone());
57+
for key in array_value_json.keys() {
58+
value_json.insert(
59+
format!(
60+
"{}_{}",
61+
key.to_owned(),
62+
value_to_string(array_value_json[key].to_owned())
63+
),
64+
array_value_json[key].to_owned(),
65+
);
66+
}
67+
}
68+
}
69+
70+
//KeyValueList is a vector of KeyValue
71+
//traverse through each element in the vector
72+
if value.kv_list_val.is_some() {
73+
let kv_list_val = value.kv_list_val.unwrap();
74+
for key_value in kv_list_val.values {
75+
let value = key_value.value;
76+
if value.is_some() {
77+
let value = value.unwrap();
78+
let key_value_json = collect_json_from_any_value(key, value);
79+
80+
for key in key_value_json.keys() {
81+
value_json.insert(
82+
format!(
83+
"{}_{}_{}",
84+
key.to_owned(),
85+
key_value.key,
86+
value_to_string(key_value_json[key].to_owned())
87+
),
88+
key_value_json[key].to_owned(),
89+
);
90+
}
91+
}
92+
}
93+
}
94+
if value.bytes_val.is_some() {
95+
value_json.insert(
96+
key.to_string(),
97+
Value::String(value.bytes_val.as_ref().unwrap().to_owned()),
98+
);
99+
}
100+
101+
value_json
102+
}
103+
104+
//traverse through Value by calling function ollect_json_from_any_value
105+
pub fn collect_json_from_values(
106+
values: &Option<super::otel::proto::common::v1::Value>,
107+
key: &String,
108+
) -> BTreeMap<String, Value> {
109+
let mut value_json: BTreeMap<String, Value> = BTreeMap::new();
110+
111+
for value in values.iter() {
112+
value_json = collect_json_from_any_value(key, value.clone());
113+
}
114+
115+
value_json
116+
}
117+
118+
pub fn value_to_string(value: serde_json::Value) -> String {
119+
match value.clone() {
120+
e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(),
121+
Value::String(s) => s,
122+
_ => "".to_string(),
123+
}
124+
}
125+
126+
pub fn flatten_attributes(attributes: &Vec<KeyValue>) -> BTreeMap<String, Value> {
127+
let mut attributes_json: BTreeMap<String, Value> = BTreeMap::new();
128+
for attribute in attributes {
129+
let key = &attribute.key;
130+
let value = &attribute.value;
131+
let value_json = collect_json_from_values(value, &key.to_string());
132+
for key in value_json.keys() {
133+
attributes_json.insert(key.to_owned(), value_json[key].to_owned());
134+
}
135+
}
136+
attributes_json
137+
}

0 commit comments

Comments
 (0)