Skip to content

Commit 41b7b7c

Browse files
feat: enable OTEL traces
add custom flattening for otel traces use endpoints `/v1/logs` for OTEL logs `/v1/metrics` for OTEL metrics `/v1/traces` for OTEL traces add custom header X-P-Log-Source when using endpint `api/v1/ingest` `otel-logs` for OTEL logs `otel-metrics` for OTEL metrics `otel-traces` for OTEL traces
1 parent 33b327d commit 41b7b7c

File tree

8 files changed

+874
-235
lines changed

8 files changed

+874
-235
lines changed

src/handlers/http/ingest.rs

Lines changed: 21 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@ use super::logstream::error::{CreateStreamError, StreamError};
2020
use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs};
2121
use super::users::dashboards::DashboardError;
2222
use super::users::filters::FiltersError;
23-
use super::{otel_logs, otel_metrics};
23+
use super::{otel_logs, otel_metrics, otel_traces};
2424
use crate::event::{
2525
self,
2626
error::EventError,
2727
format::{self, EventFormat},
2828
};
2929
use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage;
30-
use crate::handlers::{LOG_SOURCE_KEY, LOG_SOURCE_OTEL, STREAM_NAME_HEADER_KEY};
30+
use crate::handlers::STREAM_NAME_HEADER_KEY;
3131
use crate::metadata::error::stream_info::MetadataError;
3232
use crate::metadata::{SchemaVersion, STREAM_INFO};
3333
use crate::option::{Mode, CONFIG};
@@ -119,23 +119,11 @@ pub async fn handle_otel_logs_ingestion(
119119
let stream_name = stream_name.to_str().unwrap().to_owned();
120120
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
121121

122-
//flatten logs
123-
if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY)
124-
{
125-
let log_source: String = log_source.to_str().unwrap().to_owned();
126-
if log_source == LOG_SOURCE_OTEL {
127-
let mut json = otel_logs::flatten_otel_logs(&body);
128-
for record in json.iter_mut() {
129-
let body: Bytes = serde_json::to_vec(record).unwrap().into();
130-
push_logs(&stream_name, &req, &body).await?;
131-
}
132-
} else {
133-
return Err(PostError::CustomError("Unknown log source".to_string()));
134-
}
135-
} else {
136-
return Err(PostError::CustomError(
137-
"log source key header is missing".to_string(),
138-
));
122+
//custom flattening required for otel logs
123+
let mut json = otel_logs::flatten_otel_logs(&body);
124+
for record in json.iter_mut() {
125+
let body: Bytes = serde_json::to_vec(record).unwrap().into();
126+
push_logs(&stream_name, &req, &body).await?;
139127
}
140128
} else {
141129
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
@@ -158,23 +146,11 @@ pub async fn handle_otel_metrics_ingestion(
158146
let stream_name = stream_name.to_str().unwrap().to_owned();
159147
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
160148

161-
//flatten logs
162-
if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY)
163-
{
164-
let log_source: String = log_source.to_str().unwrap().to_owned();
165-
if log_source == LOG_SOURCE_OTEL {
166-
let mut json = otel_metrics::flatten_otel_metrics(&body);
167-
for record in json.iter_mut() {
168-
let body: Bytes = serde_json::to_vec(record).unwrap().into();
169-
push_logs(&stream_name, &req, &body).await?;
170-
}
171-
} else {
172-
return Err(PostError::CustomError("Unknown log source".to_string()));
173-
}
174-
} else {
175-
return Err(PostError::CustomError(
176-
"log source key header is missing".to_string(),
177-
));
149+
//custom flattening required for otel metrics
150+
let mut json = otel_metrics::flatten_otel_metrics(&body);
151+
for record in json.iter_mut() {
152+
let body: Bytes = serde_json::to_vec(record).unwrap().into();
153+
push_logs(&stream_name, &req, &body).await?;
178154
}
179155
} else {
180156
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
@@ -194,9 +170,15 @@ pub async fn handle_otel_traces_ingestion(
194170
.iter()
195171
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
196172
{
197-
let stream_name = stream_name.to_str().unwrap();
198-
create_stream_if_not_exists(stream_name, &StreamType::UserDefined.to_string()).await?;
199-
push_logs(stream_name, &req, &body).await?;
173+
let stream_name = stream_name.to_str().unwrap().to_owned();
174+
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
175+
176+
//custom flattening required for otel traces
177+
let mut json = otel_traces::flatten_otel_traces(&body);
178+
for record in json.iter_mut() {
179+
let body: Bytes = serde_json::to_vec(record).unwrap().into();
180+
push_logs(&stream_name, &req, &body).await?;
181+
}
200182
} else {
201183
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
202184
}

src/handlers/http/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ pub mod oidc;
4141
pub mod otel;
4242
pub mod otel_logs;
4343
pub mod otel_metrics;
44+
pub mod otel_traces;
4445
pub mod query;
4546
pub mod rbac;
4647
pub mod role;

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,9 @@ use crate::{
3535
Event,
3636
},
3737
handlers::{
38-
http::{ingest::PostError, kinesis, otel_logs},
39-
LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, LOG_SOURCE_OTEL, PREFIX_META, PREFIX_TAGS, SEPARATOR,
38+
http::{ingest::PostError, kinesis, otel_logs, otel_metrics, otel_traces},
39+
LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, LOG_SOURCE_OTEL_LOGS, LOG_SOURCE_OTEL_METRICS,
40+
LOG_SOURCE_OTEL_TRACES, PREFIX_META, PREFIX_TAGS, SEPARATOR,
4041
},
4142
metadata::{SchemaVersion, STREAM_INFO},
4243
storage::StreamType,
@@ -54,9 +55,21 @@ pub async fn flatten_and_push_logs(
5455
let log_source: String = log_source.to_str().unwrap().to_owned();
5556
match log_source.as_str() {
5657
LOG_SOURCE_KINESIS => json = kinesis::flatten_kinesis_logs(&body),
57-
LOG_SOURCE_OTEL => {
58+
59+
//custom flattening required for otel logs
60+
LOG_SOURCE_OTEL_LOGS => {
5861
json = otel_logs::flatten_otel_logs(&body);
5962
}
63+
64+
//custom flattening required for otel metrics
65+
LOG_SOURCE_OTEL_METRICS => {
66+
json = otel_metrics::flatten_otel_metrics(&body);
67+
}
68+
69+
//custom flattening required for otel traces
70+
LOG_SOURCE_OTEL_TRACES => {
71+
json = otel_traces::flatten_otel_traces(&body);
72+
}
6073
_ => {
6174
tracing::warn!("Unknown log source: {}", log_source);
6275
push_logs(stream_name, &req, &body).await?;

src/handlers/http/otel/opentelemetry.proto.trace.v1.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -86,21 +86,21 @@ pub struct Span {
8686
/// is zero-length and thus is also invalid).
8787
///
8888
/// This field is required.
89-
pub trace_id: Option<Vec<u8>>,
89+
pub trace_id: Option<String>,
9090
/// A unique identifier for a span within a trace, assigned when the span
9191
/// is created. The ID is an 8-byte array. An ID with all zeroes OR of length
9292
/// other than 8 bytes is considered invalid (empty string in OTLP/JSON
9393
/// is zero-length and thus is also invalid).
9494
///
9595
/// This field is required.
96-
pub span_id: Option<Vec<u8>>,
96+
pub span_id: Option<String>,
9797
/// trace_state conveys information about request position in multiple distributed tracing graphs.
9898
/// It is a trace_state in w3c-trace-context format: <https://www.w3.org/TR/trace-context/#tracestate-header>
9999
/// See also <https://github.com/w3c/distributed-tracing> for more details about this field.
100100
pub trace_state: Option<String>,
101101
/// The `span_id` of this span's parent span. If this is a root span, then this
102102
/// field must be empty. The ID is an 8-byte array.
103-
pub parent_span_id: Option<Vec<u8>>,
103+
pub parent_span_id: Option<String>,
104104
/// Flags, a bit field.
105105
///
106106
/// Bits 0-7 (8 least significant bits) are the trace flags as defined in W3C Trace
@@ -145,14 +145,14 @@ pub struct Span {
145145
/// Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970.
146146
///
147147
/// This field is semantically required and it is expected that end_time >= start_time.
148-
pub start_time_unix_nano: Option<u64>,
148+
pub start_time_unix_nano: Option<String>,
149149
/// end_time_unix_nano is the end time of the span. On the client side, this is the time
150150
/// kept by the local machine where the span execution ends. On the server side, this
151151
/// is the time when the server application handler stops running.
152152
/// Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970.
153153
///
154154
/// This field is semantically required and it is expected that end_time >= start_time.
155-
pub end_time_unix_nano: Option<u64>,
155+
pub end_time_unix_nano: Option<String>,
156156
/// attributes is a collection of key/value pairs. Note, global attributes
157157
/// like server name can be set using the resource API. Examples of attributes:
158158
///
@@ -213,9 +213,9 @@ pub mod span {
213213
pub struct Link {
214214
/// A unique identifier of a trace that this linked span is part of. The ID is a
215215
/// 16-byte array.
216-
pub trace_id: Option<Vec<u8>>,
216+
pub trace_id: Option<String>,
217217
/// A unique identifier for the linked span. The ID is an 8-byte array.
218-
pub span_id: Option<Vec<u8>>,
218+
pub span_id: Option<String>,
219219
/// The trace_state associated with the link.
220220
pub trace_state: Option<String>,
221221
/// attributes is a collection of attribute key/value pairs on the link.

src/handlers/http/otel_logs.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use serde_json::Value;
2828
use super::otel::collect_json_from_values;
2929
use super::otel::flatten_attributes;
3030

31+
//flatten log record and all its attributes to create one record for each log record
3132
pub fn flatten_log_record(log_record: &LogRecord) -> BTreeMap<String, Value> {
3233
let mut log_record_json: BTreeMap<String, Value> = BTreeMap::new();
3334
if log_record.time_unix_nano.is_some() {
@@ -121,6 +122,7 @@ pub fn flatten_log_record(log_record: &LogRecord) -> BTreeMap<String, Value> {
121122
log_record_json
122123
}
123124

125+
//flatten otel logs
124126
pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
125127
let mut vec_otel_json: Vec<BTreeMap<String, Value>> = Vec::new();
126128
let body_str = std::str::from_utf8(body).unwrap();
@@ -132,6 +134,7 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
132134
let mut resource_log_json: BTreeMap<String, Value> = BTreeMap::new();
133135

134136
if let Some(resource) = record.resource.as_ref() {
137+
//flatten resource attributes to create multiple key value pairs (headers) for each log record
135138
if let Some(attributes) = resource.attributes.as_ref() {
136139
let attributes_json = flatten_attributes(attributes);
137140
for key in attributes_json.keys() {
@@ -151,27 +154,28 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
151154

152155
if let Some(scope_logs) = record.scope_logs.as_ref() {
153156
let mut vec_scope_log_json: Vec<BTreeMap<String, Value>> = Vec::new();
157+
//create flattened record for each scope log
154158
for scope_log in scope_logs.iter() {
155159
let mut scope_log_json: BTreeMap<String, Value> = BTreeMap::new();
156160
if scope_log.scope.is_some() {
157161
let instrumentation_scope = scope_log.scope.as_ref().unwrap();
158162
if instrumentation_scope.name.is_some() {
159163
scope_log_json.insert(
160-
"instrumentation_scope_name".to_string(),
164+
"scope_name".to_string(),
161165
Value::String(
162166
instrumentation_scope.name.as_ref().unwrap().to_string(),
163167
),
164168
);
165169
}
166170
if instrumentation_scope.version.is_some() {
167171
scope_log_json.insert(
168-
"instrumentation_scope_version".to_string(),
172+
"scope_version".to_string(),
169173
Value::String(
170174
instrumentation_scope.version.as_ref().unwrap().to_string(),
171175
),
172176
);
173177
}
174-
178+
//flatten instrumentation scope attributes to create multiple key value pairs (headers) for each log record
175179
if let Some(attributes) = instrumentation_scope.attributes.as_ref() {
176180
let attributes_json = flatten_attributes(attributes);
177181
for key in attributes_json.keys() {
@@ -182,7 +186,7 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
182186

183187
if instrumentation_scope.dropped_attributes_count.is_some() {
184188
scope_log_json.insert(
185-
"instrumentation_scope_dropped_attributes_count".to_string(),
189+
"scope_dropped_attributes_count".to_string(),
186190
Value::Number(serde_json::Number::from(
187191
instrumentation_scope.dropped_attributes_count.unwrap(),
188192
)),
@@ -196,6 +200,7 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
196200
);
197201
}
198202

203+
//create flattened record for each log record
199204
for log_record in scope_log.log_records.iter() {
200205
let log_record_json = flatten_log_record(log_record);
201206

0 commit comments

Comments
 (0)