Skip to content

Commit 33b327d

Browse files
feat: custom flattening for OTEL data
add proto files for metrics and trace add compiled rust files for metrics and trace protobuf files add separate handlers for OTEL logs, metrics and traces custom flattening added for OTEL logs and metrics
1 parent d0f76ed commit 33b327d

File tree

14 files changed

+3496
-17
lines changed

14 files changed

+3496
-17
lines changed

src/handlers/http/ingest.rs

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,14 @@ use super::logstream::error::{CreateStreamError, StreamError};
2020
use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs};
2121
use super::users::dashboards::DashboardError;
2222
use super::users::filters::FiltersError;
23+
use super::{otel_logs, otel_metrics};
2324
use crate::event::{
2425
self,
2526
error::EventError,
2627
format::{self, EventFormat},
2728
};
2829
use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage;
29-
use crate::handlers::STREAM_NAME_HEADER_KEY;
30+
use crate::handlers::{LOG_SOURCE_KEY, LOG_SOURCE_OTEL, STREAM_NAME_HEADER_KEY};
3031
use crate::metadata::error::stream_info::MetadataError;
3132
use crate::metadata::{SchemaVersion, STREAM_INFO};
3233
use crate::option::{Mode, CONFIG};
@@ -106,7 +107,85 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
106107
// Handler for POST /v1/logs to ingest OTEL logs
107108
// ingests events by extracting stream name from header
108109
// creates if stream does not exist
109-
pub async fn handle_otel_ingestion(
110+
pub async fn handle_otel_logs_ingestion(
111+
req: HttpRequest,
112+
body: Bytes,
113+
) -> Result<HttpResponse, PostError> {
114+
if let Some((_, stream_name)) = req
115+
.headers()
116+
.iter()
117+
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
118+
{
119+
let stream_name = stream_name.to_str().unwrap().to_owned();
120+
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
121+
122+
//flatten logs
123+
if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY)
124+
{
125+
let log_source: String = log_source.to_str().unwrap().to_owned();
126+
if log_source == LOG_SOURCE_OTEL {
127+
let mut json = otel_logs::flatten_otel_logs(&body);
128+
for record in json.iter_mut() {
129+
let body: Bytes = serde_json::to_vec(record).unwrap().into();
130+
push_logs(&stream_name, &req, &body).await?;
131+
}
132+
} else {
133+
return Err(PostError::CustomError("Unknown log source".to_string()));
134+
}
135+
} else {
136+
return Err(PostError::CustomError(
137+
"log source key header is missing".to_string(),
138+
));
139+
}
140+
} else {
141+
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
142+
}
143+
Ok(HttpResponse::Ok().finish())
144+
}
145+
146+
// Handler for POST /v1/metrics to ingest OTEL metrics
147+
// ingests events by extracting stream name from header
148+
// creates if stream does not exist
149+
pub async fn handle_otel_metrics_ingestion(
150+
req: HttpRequest,
151+
body: Bytes,
152+
) -> Result<HttpResponse, PostError> {
153+
if let Some((_, stream_name)) = req
154+
.headers()
155+
.iter()
156+
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
157+
{
158+
let stream_name = stream_name.to_str().unwrap().to_owned();
159+
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
160+
161+
//flatten logs
162+
if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY)
163+
{
164+
let log_source: String = log_source.to_str().unwrap().to_owned();
165+
if log_source == LOG_SOURCE_OTEL {
166+
let mut json = otel_metrics::flatten_otel_metrics(&body);
167+
for record in json.iter_mut() {
168+
let body: Bytes = serde_json::to_vec(record).unwrap().into();
169+
push_logs(&stream_name, &req, &body).await?;
170+
}
171+
} else {
172+
return Err(PostError::CustomError("Unknown log source".to_string()));
173+
}
174+
} else {
175+
return Err(PostError::CustomError(
176+
"log source key header is missing".to_string(),
177+
));
178+
}
179+
} else {
180+
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
181+
}
182+
Ok(HttpResponse::Ok().finish())
183+
}
184+
185+
// Handler for POST /v1/traces to ingest OTEL traces
186+
// ingests events by extracting stream name from header
187+
// creates if stream does not exist
188+
pub async fn handle_otel_traces_ingestion(
110189
req: HttpRequest,
111190
body: Bytes,
112191
) -> Result<HttpResponse, PostError> {

src/handlers/http/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ pub mod logstream;
3838
pub mod middleware;
3939
pub mod modal;
4040
pub mod oidc;
41+
pub mod otel;
42+
pub mod otel_logs;
43+
pub mod otel_metrics;
4144
pub mod query;
4245
pub mod rbac;
4346
pub mod role;

src/handlers/http/modal/server.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,7 @@ impl Server {
438438
web::resource("/logs")
439439
.route(
440440
web::post()
441-
.to(ingest::handle_otel_ingestion)
441+
.to(ingest::handle_otel_logs_ingestion)
442442
.authorize_for_stream(Action::Ingest),
443443
)
444444
.app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)),
@@ -447,7 +447,7 @@ impl Server {
447447
web::resource("/metrics")
448448
.route(
449449
web::post()
450-
.to(ingest::handle_otel_ingestion)
450+
.to(ingest::handle_otel_metrics_ingestion)
451451
.authorize_for_stream(Action::Ingest),
452452
)
453453
.app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)),
@@ -456,7 +456,7 @@ impl Server {
456456
web::resource("/traces")
457457
.route(
458458
web::post()
459-
.to(ingest::handle_otel_ingestion)
459+
.to(ingest::handle_otel_traces_ingestion)
460460
.authorize_for_stream(Action::Ingest),
461461
)
462462
.app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)),

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@
1616
*
1717
*/
1818

19-
use std::{collections::HashMap, sync::Arc};
19+
use std::{
20+
collections::{BTreeMap, HashMap},
21+
sync::Arc,
22+
};
2023

2124
use actix_web::HttpRequest;
2225
use anyhow::anyhow;
@@ -32,8 +35,8 @@ use crate::{
3235
Event,
3336
},
3437
handlers::{
35-
http::{ingest::PostError, kinesis},
36-
LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, PREFIX_META, PREFIX_TAGS, SEPARATOR,
38+
http::{ingest::PostError, kinesis, otel_logs},
39+
LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, LOG_SOURCE_OTEL, PREFIX_META, PREFIX_TAGS, SEPARATOR,
3740
},
3841
metadata::{SchemaVersion, STREAM_INFO},
3942
storage::StreamType,
@@ -45,14 +48,21 @@ pub async fn flatten_and_push_logs(
4548
body: Bytes,
4649
stream_name: &str,
4750
) -> Result<(), PostError> {
48-
let log_source = req
49-
.headers()
50-
.get(LOG_SOURCE_KEY)
51-
.map(|header| header.to_str().unwrap_or_default())
52-
.unwrap_or_default();
53-
if log_source == LOG_SOURCE_KINESIS {
54-
let json = kinesis::flatten_kinesis_logs(&body);
55-
for record in json.iter() {
51+
//flatten logs
52+
if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY) {
53+
let mut json: Vec<BTreeMap<String, Value>> = Vec::new();
54+
let log_source: String = log_source.to_str().unwrap().to_owned();
55+
match log_source.as_str() {
56+
LOG_SOURCE_KINESIS => json = kinesis::flatten_kinesis_logs(&body),
57+
LOG_SOURCE_OTEL => {
58+
json = otel_logs::flatten_otel_logs(&body);
59+
}
60+
_ => {
61+
tracing::warn!("Unknown log source: {}", log_source);
62+
push_logs(stream_name, &req, &body).await?;
63+
}
64+
}
65+
for record in json.iter_mut() {
5666
let body: Bytes = serde_json::to_vec(record).unwrap().into();
5767
push_logs(stream_name, &req, &body).await?;
5868
}

src/handlers/http/otel.rs

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
3+
*
4+
* This program is free software: you can redistribute it and/or modify
5+
* it under the terms of the GNU Affero General Public License as
6+
* published by the Free Software Foundation, either version 3 of the
7+
* License, or (at your option) any later version.
8+
*
9+
* This program is distributed in the hope that it will be useful,
10+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
* GNU Affero General Public License for more details.
13+
*
14+
* You should have received a copy of the GNU Affero General Public License
15+
* along with this program. If not, see <http://www.gnu.org/licenses/>.
16+
*
17+
*/
18+
pub mod proto;
19+
use proto::common::v1::KeyValue;
20+
use serde_json::Value;
21+
use std::collections::BTreeMap;
22+
// Value can be one of types - String, Bool, Int, Double, ArrayValue, AnyValue, KeyValueList, Byte
23+
pub fn collect_json_from_any_value(
24+
key: &String,
25+
value: super::otel::proto::common::v1::Value,
26+
) -> BTreeMap<String, Value> {
27+
let mut value_json: BTreeMap<String, Value> = BTreeMap::new();
28+
if value.str_val.is_some() {
29+
value_json.insert(
30+
key.to_string(),
31+
Value::String(value.str_val.as_ref().unwrap().to_owned()),
32+
);
33+
}
34+
if value.bool_val.is_some() {
35+
value_json.insert(key.to_string(), Value::Bool(value.bool_val.unwrap()));
36+
}
37+
if value.int_val.is_some() {
38+
value_json.insert(
39+
key.to_string(),
40+
Value::String(value.int_val.as_ref().unwrap().to_owned()),
41+
);
42+
}
43+
if value.double_val.is_some() {
44+
value_json.insert(
45+
key.to_string(),
46+
Value::Number(serde_json::Number::from_f64(value.double_val.unwrap()).unwrap()),
47+
);
48+
}
49+
50+
//ArrayValue is a vector of AnyValue
51+
//traverse by recursively calling the same function
52+
if value.array_val.is_some() {
53+
let array_val = value.array_val.as_ref().unwrap();
54+
let values = &array_val.values;
55+
for value in values {
56+
let array_value_json = collect_json_from_any_value(key, value.clone());
57+
for key in array_value_json.keys() {
58+
value_json.insert(
59+
format!(
60+
"{}_{}",
61+
key.to_owned(),
62+
value_to_string(array_value_json[key].to_owned())
63+
),
64+
array_value_json[key].to_owned(),
65+
);
66+
}
67+
}
68+
}
69+
70+
//KeyValueList is a vector of KeyValue
71+
//traverse through each element in the vector
72+
if value.kv_list_val.is_some() {
73+
let kv_list_val = value.kv_list_val.unwrap();
74+
for key_value in kv_list_val.values {
75+
let value = key_value.value;
76+
if value.is_some() {
77+
let value = value.unwrap();
78+
let key_value_json = collect_json_from_any_value(key, value);
79+
80+
for key in key_value_json.keys() {
81+
value_json.insert(
82+
format!(
83+
"{}_{}_{}",
84+
key.to_owned(),
85+
key_value.key,
86+
value_to_string(key_value_json[key].to_owned())
87+
),
88+
key_value_json[key].to_owned(),
89+
);
90+
}
91+
}
92+
}
93+
}
94+
if value.bytes_val.is_some() {
95+
value_json.insert(
96+
key.to_string(),
97+
Value::String(value.bytes_val.as_ref().unwrap().to_owned()),
98+
);
99+
}
100+
101+
value_json
102+
}
103+
104+
//traverse through Value by calling function ollect_json_from_any_value
105+
pub fn collect_json_from_values(
106+
values: &Option<super::otel::proto::common::v1::Value>,
107+
key: &String,
108+
) -> BTreeMap<String, Value> {
109+
let mut value_json: BTreeMap<String, Value> = BTreeMap::new();
110+
111+
for value in values.iter() {
112+
value_json = collect_json_from_any_value(key, value.clone());
113+
}
114+
115+
value_json
116+
}
117+
118+
pub fn value_to_string(value: serde_json::Value) -> String {
119+
match value.clone() {
120+
e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(),
121+
Value::String(s) => s,
122+
_ => "".to_string(),
123+
}
124+
}
125+
126+
pub fn flatten_attributes(attributes: &Vec<KeyValue>) -> BTreeMap<String, Value> {
127+
let mut attributes_json: BTreeMap<String, Value> = BTreeMap::new();
128+
for attribute in attributes {
129+
let key = &attribute.key;
130+
let value = &attribute.value;
131+
let value_json = collect_json_from_values(value, &key.to_string());
132+
for key in value_json.keys() {
133+
attributes_json.insert(key.to_owned(), value_json[key].to_owned());
134+
}
135+
}
136+
attributes_json
137+
}

0 commit comments

Comments
 (0)