Skip to content

Commit cc4c406

Browse files
feat: enable OTEL traces
add custom flattening for otel traces use endpoints `/v1/logs` for OTEL logs `/v1/metrics` for OTEL metrics `/v1/traces` for OTEL traces add custom header X-P-Log-Source when using endpint `api/v1/ingest` `otel-logs` for OTEL logs `otel-metrics` for OTEL metrics `otel-traces` for OTEL traces
1 parent 0cbbaa3 commit cc4c406

File tree

8 files changed

+874
-235
lines changed

8 files changed

+874
-235
lines changed

src/handlers/http/ingest.rs

Lines changed: 21 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@ use super::logstream::error::{CreateStreamError, StreamError};
2020
use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs};
2121
use super::users::dashboards::DashboardError;
2222
use super::users::filters::FiltersError;
23-
use super::{otel_logs, otel_metrics};
23+
use super::{otel_logs, otel_metrics, otel_traces};
2424
use crate::event::{
2525
self,
2626
error::EventError,
2727
format::{self, EventFormat},
2828
};
2929
use crate::handlers::http::modal::utils::logstream_utils::create_stream_and_schema_from_storage;
30-
use crate::handlers::{LOG_SOURCE_KEY, LOG_SOURCE_OTEL, STREAM_NAME_HEADER_KEY};
30+
use crate::handlers::STREAM_NAME_HEADER_KEY;
3131
use crate::metadata::error::stream_info::MetadataError;
3232
use crate::metadata::STREAM_INFO;
3333
use crate::option::{Mode, CONFIG};
@@ -118,23 +118,11 @@ pub async fn handle_otel_logs_ingestion(
118118
let stream_name = stream_name.to_str().unwrap().to_owned();
119119
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
120120

121-
//flatten logs
122-
if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY)
123-
{
124-
let log_source: String = log_source.to_str().unwrap().to_owned();
125-
if log_source == LOG_SOURCE_OTEL {
126-
let mut json = otel_logs::flatten_otel_logs(&body);
127-
for record in json.iter_mut() {
128-
let body: Bytes = serde_json::to_vec(record).unwrap().into();
129-
push_logs(&stream_name, &req, &body).await?;
130-
}
131-
} else {
132-
return Err(PostError::CustomError("Unknown log source".to_string()));
133-
}
134-
} else {
135-
return Err(PostError::CustomError(
136-
"log source key header is missing".to_string(),
137-
));
121+
//custom flattening required for otel logs
122+
let mut json = otel_logs::flatten_otel_logs(&body);
123+
for record in json.iter_mut() {
124+
let body: Bytes = serde_json::to_vec(record).unwrap().into();
125+
push_logs(&stream_name, &req, &body).await?;
138126
}
139127
} else {
140128
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
@@ -157,23 +145,11 @@ pub async fn handle_otel_metrics_ingestion(
157145
let stream_name = stream_name.to_str().unwrap().to_owned();
158146
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
159147

160-
//flatten logs
161-
if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY)
162-
{
163-
let log_source: String = log_source.to_str().unwrap().to_owned();
164-
if log_source == LOG_SOURCE_OTEL {
165-
let mut json = otel_metrics::flatten_otel_metrics(&body);
166-
for record in json.iter_mut() {
167-
let body: Bytes = serde_json::to_vec(record).unwrap().into();
168-
push_logs(&stream_name, &req, &body).await?;
169-
}
170-
} else {
171-
return Err(PostError::CustomError("Unknown log source".to_string()));
172-
}
173-
} else {
174-
return Err(PostError::CustomError(
175-
"log source key header is missing".to_string(),
176-
));
148+
//custom flattening required for otel metrics
149+
let mut json = otel_metrics::flatten_otel_metrics(&body);
150+
for record in json.iter_mut() {
151+
let body: Bytes = serde_json::to_vec(record).unwrap().into();
152+
push_logs(&stream_name, &req, &body).await?;
177153
}
178154
} else {
179155
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
@@ -193,9 +169,15 @@ pub async fn handle_otel_traces_ingestion(
193169
.iter()
194170
.find(|&(key, _)| key == STREAM_NAME_HEADER_KEY)
195171
{
196-
let stream_name = stream_name.to_str().unwrap();
197-
create_stream_if_not_exists(stream_name, &StreamType::UserDefined.to_string()).await?;
198-
push_logs(stream_name, &req, &body).await?;
172+
let stream_name = stream_name.to_str().unwrap().to_owned();
173+
create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?;
174+
175+
//custom flattening required for otel traces
176+
let mut json = otel_traces::flatten_otel_traces(&body);
177+
for record in json.iter_mut() {
178+
let body: Bytes = serde_json::to_vec(record).unwrap().into();
179+
push_logs(&stream_name, &req, &body).await?;
180+
}
199181
} else {
200182
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
201183
}

src/handlers/http/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ pub mod oidc;
4040
pub mod otel;
4141
pub mod otel_logs;
4242
pub mod otel_metrics;
43+
pub mod otel_traces;
4344
pub mod query;
4445
pub mod rbac;
4546
pub mod role;

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@ use crate::{
3333
format::{self, EventFormat},
3434
},
3535
handlers::{
36-
http::{ingest::PostError, kinesis, otel_logs},
37-
LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, LOG_SOURCE_OTEL, PREFIX_META, PREFIX_TAGS, SEPARATOR,
36+
http::{ingest::PostError, kinesis, otel_logs, otel_metrics, otel_traces},
37+
LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, LOG_SOURCE_OTEL_LOGS, LOG_SOURCE_OTEL_METRICS,
38+
LOG_SOURCE_OTEL_TRACES, PREFIX_META, PREFIX_TAGS, SEPARATOR,
3839
},
3940
metadata::STREAM_INFO,
4041
storage::StreamType,
@@ -52,9 +53,21 @@ pub async fn flatten_and_push_logs(
5253
let log_source: String = log_source.to_str().unwrap().to_owned();
5354
match log_source.as_str() {
5455
LOG_SOURCE_KINESIS => json = kinesis::flatten_kinesis_logs(&body),
55-
LOG_SOURCE_OTEL => {
56+
57+
//custom flattening required for otel logs
58+
LOG_SOURCE_OTEL_LOGS => {
5659
json = otel_logs::flatten_otel_logs(&body);
5760
}
61+
62+
//custom flattening required for otel metrics
63+
LOG_SOURCE_OTEL_METRICS => {
64+
json = otel_metrics::flatten_otel_metrics(&body);
65+
}
66+
67+
//custom flattening required for otel traces
68+
LOG_SOURCE_OTEL_TRACES => {
69+
json = otel_traces::flatten_otel_traces(&body);
70+
}
5871
_ => {
5972
tracing::warn!("Unknown log source: {}", log_source);
6073
push_logs(stream_name, &req, &body).await?;

src/handlers/http/otel/opentelemetry.proto.trace.v1.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -86,21 +86,21 @@ pub struct Span {
8686
/// is zero-length and thus is also invalid).
8787
///
8888
/// This field is required.
89-
pub trace_id: Option<Vec<u8>>,
89+
pub trace_id: Option<String>,
9090
/// A unique identifier for a span within a trace, assigned when the span
9191
/// is created. The ID is an 8-byte array. An ID with all zeroes OR of length
9292
/// other than 8 bytes is considered invalid (empty string in OTLP/JSON
9393
/// is zero-length and thus is also invalid).
9494
///
9595
/// This field is required.
96-
pub span_id: Option<Vec<u8>>,
96+
pub span_id: Option<String>,
9797
/// trace_state conveys information about request position in multiple distributed tracing graphs.
9898
/// It is a trace_state in w3c-trace-context format: <https://www.w3.org/TR/trace-context/#tracestate-header>
9999
/// See also <https://github.com/w3c/distributed-tracing> for more details about this field.
100100
pub trace_state: Option<String>,
101101
/// The `span_id` of this span's parent span. If this is a root span, then this
102102
/// field must be empty. The ID is an 8-byte array.
103-
pub parent_span_id: Option<Vec<u8>>,
103+
pub parent_span_id: Option<String>,
104104
/// Flags, a bit field.
105105
///
106106
/// Bits 0-7 (8 least significant bits) are the trace flags as defined in W3C Trace
@@ -145,14 +145,14 @@ pub struct Span {
145145
/// Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970.
146146
///
147147
/// This field is semantically required and it is expected that end_time >= start_time.
148-
pub start_time_unix_nano: Option<u64>,
148+
pub start_time_unix_nano: Option<String>,
149149
/// end_time_unix_nano is the end time of the span. On the client side, this is the time
150150
/// kept by the local machine where the span execution ends. On the server side, this
151151
/// is the time when the server application handler stops running.
152152
/// Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970.
153153
///
154154
/// This field is semantically required and it is expected that end_time >= start_time.
155-
pub end_time_unix_nano: Option<u64>,
155+
pub end_time_unix_nano: Option<String>,
156156
/// attributes is a collection of key/value pairs. Note, global attributes
157157
/// like server name can be set using the resource API. Examples of attributes:
158158
///
@@ -213,9 +213,9 @@ pub mod span {
213213
pub struct Link {
214214
/// A unique identifier of a trace that this linked span is part of. The ID is a
215215
/// 16-byte array.
216-
pub trace_id: Option<Vec<u8>>,
216+
pub trace_id: Option<String>,
217217
/// A unique identifier for the linked span. The ID is an 8-byte array.
218-
pub span_id: Option<Vec<u8>>,
218+
pub span_id: Option<String>,
219219
/// The trace_state associated with the link.
220220
pub trace_state: Option<String>,
221221
/// attributes is a collection of attribute key/value pairs on the link.

src/handlers/http/otel_logs.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use serde_json::Value;
2828
use super::otel::collect_json_from_values;
2929
use super::otel::flatten_attributes;
3030

31+
//flatten log record and all its attributes to create one record for each log record
3132
pub fn flatten_log_record(log_record: &LogRecord) -> BTreeMap<String, Value> {
3233
let mut log_record_json: BTreeMap<String, Value> = BTreeMap::new();
3334
if log_record.time_unix_nano.is_some() {
@@ -121,6 +122,7 @@ pub fn flatten_log_record(log_record: &LogRecord) -> BTreeMap<String, Value> {
121122
log_record_json
122123
}
123124

125+
//flatten otel logs
124126
pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
125127
let mut vec_otel_json: Vec<BTreeMap<String, Value>> = Vec::new();
126128
let body_str = std::str::from_utf8(body).unwrap();
@@ -132,6 +134,7 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
132134
let mut resource_log_json: BTreeMap<String, Value> = BTreeMap::new();
133135

134136
if let Some(resource) = record.resource.as_ref() {
137+
//flatten resource attributes to create multiple key value pairs (headers) for each log record
135138
if let Some(attributes) = resource.attributes.as_ref() {
136139
let attributes_json = flatten_attributes(attributes);
137140
for key in attributes_json.keys() {
@@ -151,27 +154,28 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
151154

152155
if let Some(scope_logs) = record.scope_logs.as_ref() {
153156
let mut vec_scope_log_json: Vec<BTreeMap<String, Value>> = Vec::new();
157+
//create flattened record for each scope log
154158
for scope_log in scope_logs.iter() {
155159
let mut scope_log_json: BTreeMap<String, Value> = BTreeMap::new();
156160
if scope_log.scope.is_some() {
157161
let instrumentation_scope = scope_log.scope.as_ref().unwrap();
158162
if instrumentation_scope.name.is_some() {
159163
scope_log_json.insert(
160-
"instrumentation_scope_name".to_string(),
164+
"scope_name".to_string(),
161165
Value::String(
162166
instrumentation_scope.name.as_ref().unwrap().to_string(),
163167
),
164168
);
165169
}
166170
if instrumentation_scope.version.is_some() {
167171
scope_log_json.insert(
168-
"instrumentation_scope_version".to_string(),
172+
"scope_version".to_string(),
169173
Value::String(
170174
instrumentation_scope.version.as_ref().unwrap().to_string(),
171175
),
172176
);
173177
}
174-
178+
//flatten instrumentation scope attributes to create multiple key value pairs (headers) for each log record
175179
if let Some(attributes) = instrumentation_scope.attributes.as_ref() {
176180
let attributes_json = flatten_attributes(attributes);
177181
for key in attributes_json.keys() {
@@ -182,7 +186,7 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
182186

183187
if instrumentation_scope.dropped_attributes_count.is_some() {
184188
scope_log_json.insert(
185-
"instrumentation_scope_dropped_attributes_count".to_string(),
189+
"scope_dropped_attributes_count".to_string(),
186190
Value::Number(serde_json::Number::from(
187191
instrumentation_scope.dropped_attributes_count.unwrap(),
188192
)),
@@ -196,6 +200,7 @@ pub fn flatten_otel_logs(body: &Bytes) -> Vec<BTreeMap<String, Value>> {
196200
);
197201
}
198202

203+
//create flattened record for each log record
199204
for log_record in scope_log.log_records.iter() {
200205
let log_record_json = flatten_log_record(log_record);
201206

0 commit comments

Comments
 (0)