Skip to content

Commit 239b0ce

Browse files
Devdutt Shenoinikhilsinhaparseable
authored andcommitted
suggestions
1 parent 34758f8 commit 239b0ce

File tree

6 files changed

+77
-89
lines changed

6 files changed

+77
-89
lines changed

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,10 @@ humantime-serde = "1.1"
6464
itertools = "0.13.0"
6565
num_cpus = "1.15"
6666
once_cell = "1.17.1"
67+
opentelemetry-proto = "0.27.0"
6768
prometheus = { version = "0.13", features = ["process"] }
6869
rand = "0.8.5"
69-
rdkafka = {version = "0.36.2", default-features = false, features = ["tokio"]}
70+
rdkafka = { version = "0.36.2", default-features = false, features = ["tokio"] }
7071
regex = "1.7.3"
7172
relative-path = { version = "1.7", features = ["serde"] }
7273
reqwest = { version = "0.11.27", default-features = false, features = [
@@ -106,7 +107,6 @@ prost = "0.13.3"
106107
prometheus-parse = "0.2.5"
107108
sha2 = "0.10.8"
108109
tracing = "0.1.41"
109-
opentelemetry-proto = "0.27.0"
110110

111111
[build-dependencies]
112112
cargo_toml = "0.20.1"

src/handlers/http/ingest.rs

Lines changed: 40 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ use arrow_schema::Schema;
4141
use bytes::Bytes;
4242
use chrono::Utc;
4343
use http::StatusCode;
44+
use nom::AsBytes;
45+
use opentelemetry_proto::tonic::logs::v1::LogsData;
46+
use opentelemetry_proto::tonic::metrics::v1::MetricsData;
47+
use opentelemetry_proto::tonic::trace::v1::TracesData;
4448
use serde_json::Value;
4549
use std::collections::HashMap;
4650
use std::sync::Arc;
@@ -113,23 +117,20 @@ pub async fn handle_otel_logs_ingestion(
113117
req: HttpRequest,
114118
body: Bytes,
115119
) -> Result<HttpResponse, PostError> {
116-
if let Some((_, stream_name)) = req
117-
.headers()
118-
.iter()
119-
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
120-
{
121-
let stream_name = stream_name.to_str().unwrap().to_owned();
122-
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
123-
124-
//custom flattening required for otel logs
125-
let mut json = flatten_otel_logs(&body);
126-
for record in json.iter_mut() {
127-
let body: Bytes = serde_json::to_vec(record).unwrap().into();
128-
push_logs(&stream_name, &req, &body).await?;
129-
}
130-
} else {
120+
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
131121
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
122+
};
123+
let stream_name = stream_name.to_str().unwrap().to_owned();
124+
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
125+
126+
//custom flattening required for otel logs
127+
let logs: LogsData = serde_json::from_slice(body.as_bytes())?;
128+
let mut json = flatten_otel_logs(&logs);
129+
for record in json.iter_mut() {
130+
let body: Bytes = serde_json::to_vec(record).unwrap().into();
131+
push_logs(&stream_name, &req, &body).await?;
132132
}
133+
133134
Ok(HttpResponse::Ok().finish())
134135
}
135136

@@ -140,23 +141,20 @@ pub async fn handle_otel_metrics_ingestion(
140141
req: HttpRequest,
141142
body: Bytes,
142143
) -> Result<HttpResponse, PostError> {
143-
if let Some((_, stream_name)) = req
144-
.headers()
145-
.iter()
146-
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
147-
{
148-
let stream_name = stream_name.to_str().unwrap().to_owned();
149-
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
150-
151-
//custom flattening required for otel metrics
152-
let mut json = flatten_otel_metrics(&body);
153-
for record in json.iter_mut() {
154-
let body: Bytes = serde_json::to_vec(record).unwrap().into();
155-
push_logs(&stream_name, &req, &body).await?;
156-
}
157-
} else {
144+
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
158145
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
146+
};
147+
let stream_name = stream_name.to_str().unwrap().to_owned();
148+
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
149+
150+
//custom flattening required for otel metrics
151+
let metrics: MetricsData = serde_json::from_slice(body.as_bytes())?;
152+
let mut json = flatten_otel_metrics(metrics);
153+
for record in json.iter_mut() {
154+
let body: Bytes = serde_json::to_vec(record).unwrap().into();
155+
push_logs(&stream_name, &req, &body).await?;
159156
}
157+
160158
Ok(HttpResponse::Ok().finish())
161159
}
162160

@@ -167,23 +165,20 @@ pub async fn handle_otel_traces_ingestion(
167165
req: HttpRequest,
168166
body: Bytes,
169167
) -> Result<HttpResponse, PostError> {
170-
if let Some((_, stream_name)) = req
171-
.headers()
172-
.iter()
173-
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
174-
{
175-
let stream_name = stream_name.to_str().unwrap().to_owned();
176-
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
177-
178-
//custom flattening required for otel traces
179-
let mut json = flatten_otel_traces(&body);
180-
for record in json.iter_mut() {
181-
let body: Bytes = serde_json::to_vec(record).unwrap().into();
182-
push_logs(&stream_name, &req, &body).await?;
183-
}
184-
} else {
168+
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
185169
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
170+
};
171+
let stream_name = stream_name.to_str().unwrap().to_owned();
172+
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
173+
174+
//custom flattening required for otel traces
175+
let traces: TracesData = serde_json::from_slice(body.as_bytes())?;
176+
let mut json = flatten_otel_traces(&traces);
177+
for record in json.iter_mut() {
178+
let body: Bytes = serde_json::to_vec(record).unwrap().into();
179+
push_logs(&stream_name, &req, &body).await?;
186180
}
181+
187182
Ok(HttpResponse::Ok().finish())
188183
}
189184

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ use anyhow::anyhow;
2121
use arrow_schema::Field;
2222
use bytes::Bytes;
2323
use chrono::{DateTime, NaiveDateTime, Utc};
24+
use nom::AsBytes;
25+
use opentelemetry_proto::tonic::{
26+
logs::v1::LogsData, metrics::v1::MetricsData, trace::v1::TracesData,
27+
};
2428
use itertools::Itertools;
2529
use serde_json::Value;
2630
use std::{
@@ -49,39 +53,39 @@ pub async fn flatten_and_push_logs(
4953
body: Bytes,
5054
stream_name: &str,
5155
) -> Result<(), PostError> {
52-
//flatten logs
53-
if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY) {
54-
let mut json: Vec<BTreeMap<String, Value>> = Vec::new();
55-
let log_source: String = log_source.to_str().unwrap().to_owned();
56-
match log_source.as_str() {
57-
LOG_SOURCE_KINESIS => json = kinesis::flatten_kinesis_logs(&body),
58-
59-
//custom flattening required for otel logs
60-
LOG_SOURCE_OTEL_LOGS => {
61-
json = flatten_otel_logs(&body);
62-
}
63-
64-
//custom flattening required for otel metrics
65-
LOG_SOURCE_OTEL_METRICS => {
66-
json = flatten_otel_metrics(&body);
67-
}
68-
69-
//custom flattening required for otel traces
70-
LOG_SOURCE_OTEL_TRACES => {
71-
json = flatten_otel_traces(&body);
72-
}
73-
_ => {
74-
tracing::warn!("Unknown log source: {}", log_source);
75-
push_logs(stream_name, &req, &body).await?;
76-
}
56+
let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else {
57+
push_logs(stream_name, &req, &body).await?;
58+
return Ok(());
59+
};
60+
let mut json: Vec<BTreeMap<String, Value>> = Vec::new();
61+
match log_source.to_str().unwrap() {
62+
LOG_SOURCE_KINESIS => json = kinesis::flatten_kinesis_logs(&body),
63+
//custom flattening required for otel logs
64+
LOG_SOURCE_OTEL_LOGS => {
65+
let logs: LogsData = serde_json::from_slice(body.as_bytes())?;
66+
json = flatten_otel_logs(&logs);
67+
}
68+
//custom flattening required for otel metrics
69+
LOG_SOURCE_OTEL_METRICS => {
70+
let metrics: MetricsData = serde_json::from_slice(body.as_bytes())?;
71+
json = flatten_otel_metrics(metrics);
72+
}
73+
//custom flattening required for otel traces
74+
LOG_SOURCE_OTEL_TRACES => {
75+
let traces: TracesData = serde_json::from_slice(body.as_bytes())?;
76+
json = flatten_otel_traces(&traces);
7777
}
78-
for record in json.iter_mut() {
79-
let body: Bytes = serde_json::to_vec(record).unwrap().into();
78+
log_source => {
79+
tracing::warn!("Unknown log source: {}", log_source);
8080
push_logs(stream_name, &req, &body).await?;
8181
}
82-
} else {
82+
}
83+
84+
for record in json.iter_mut() {
85+
let body: Bytes = serde_json::to_vec(record).unwrap().into();
8386
push_logs(stream_name, &req, &body).await?;
8487
}
88+
8589
Ok(())
8690
}
8791

src/otel/logs.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
*
1717
*/
1818

19-
use bytes::Bytes;
2019
use opentelemetry_proto::tonic::logs::v1::LogRecord;
2120
use opentelemetry_proto::tonic::logs::v1::LogsData;
2221
use opentelemetry_proto::tonic::logs::v1::ScopeLogs;
@@ -125,11 +124,8 @@ fn flatten_scope_log(scope_log: &ScopeLogs) -> Vec<BTreeMap<String, Value>> {
125124

126125
/// this function performs the custom flattening of the otel logs
127126
/// and returns a `Vec` of `BTreeMap` of the flattened json
128-
pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
129-
let body_str = std::str::from_utf8(body).unwrap();
130-
let message: LogsData = serde_json::from_str(body_str).unwrap();
127+
pub fn flatten_otel_logs(message: &LogsData) -> Vec<BTreeMap<String, Value>> {
131128
let mut vec_otel_json = Vec::new();
132-
133129
for record in &message.resource_logs {
134130
let mut resource_log_json = BTreeMap::new();
135131

src/otel/metrics.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
use std::collections::BTreeMap;
2020

21-
use bytes::Bytes;
2221
use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value as NumberDataPointValue;
2322
use opentelemetry_proto::tonic::metrics::v1::{
2423
exemplar::Value as ExemplarValue, exponential_histogram_data_point::Buckets, metric, Exemplar,
@@ -386,9 +385,7 @@ pub fn flatten_metrics_record(metrics_record: &Metric) -> Vec<BTreeMap<String, V
386385

387386
/// this function performs the custom flattening of the otel metrics
388387
/// and returns a `Vec` of `BTreeMap` of the flattened json
389-
pub fn flatten_otel_metrics(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
390-
let body_str = std::str::from_utf8(body).unwrap();
391-
let message: MetricsData = serde_json::from_str(body_str).unwrap();
388+
pub fn flatten_otel_metrics(message: MetricsData) -> Vec<BTreeMap<String, Value>> {
392389
let mut vec_otel_json = Vec::new();
393390
for record in &message.resource_metrics {
394391
let mut resource_metrics_json = BTreeMap::new();

src/otel/traces.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
*
1717
*/
1818

19-
use bytes::Bytes;
20-
2119
use opentelemetry_proto::tonic::trace::v1::span::Event;
2220
use opentelemetry_proto::tonic::trace::v1::span::Link;
2321
use opentelemetry_proto::tonic::trace::v1::ScopeSpans;
@@ -71,9 +69,7 @@ fn flatten_scope_span(scope_span: &ScopeSpans) -> Vec<BTreeMap<String, Value>> {
7169

7270
/// this function performs the custom flattening of the otel traces event
7371
/// and returns a `Vec` of `BTreeMap` of the flattened json
74-
pub fn flatten_otel_traces(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
75-
let body_str = std::str::from_utf8(body).unwrap();
76-
let message: TracesData = serde_json::from_str(body_str).unwrap();
72+
pub fn flatten_otel_traces(message: &TracesData) -> Vec<BTreeMap<String, Value>> {
7773
let mut vec_otel_json = Vec::new();
7874

7975
for record in &message.resource_spans {

0 commit comments

Comments
 (0)