Skip to content

Commit a22a12c

Browse files
add protobuf support for otel metrics and traces
1 parent feb5e05 commit a22a12c

File tree

3 files changed

+199
-27
lines changed

3 files changed

+199
-27
lines changed

src/handlers/http/ingest.rs

Lines changed: 78 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
use std::collections::{HashMap, HashSet};
2020

21-
use actix_web::web::{Json, Path};
21+
use actix_web::web::{self, Json, Path};
2222
use actix_web::{HttpRequest, HttpResponse, http::header::ContentType};
2323
use arrow_array::RecordBatch;
2424
use bytes::Bytes;
@@ -33,20 +33,16 @@ use crate::handlers::http::modal::utils::ingest_utils::push_logs;
3333
use crate::handlers::{EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
3434
use crate::metadata::SchemaVersion;
3535
use crate::option::Mode;
36-
use crate::otel::logs::{flatten_otel_protobuf, OTEL_LOG_KNOWN_FIELD_LIST};
37-
use crate::otel::metrics::OTEL_METRICS_KNOWN_FIELD_LIST;
38-
use crate::otel::traces::OTEL_TRACES_KNOWN_FIELD_LIST;
36+
use crate::otel::logs::{OTEL_LOG_KNOWN_FIELD_LIST, flatten_otel_protobuf};
37+
use crate::otel::metrics::{OTEL_METRICS_KNOWN_FIELD_LIST, flatten_otel_metrics_protobuf};
38+
use crate::otel::traces::{OTEL_TRACES_KNOWN_FIELD_LIST, flatten_otel_traces_protobuf};
3939
use crate::parseable::{PARSEABLE, StreamNotFound};
4040
use crate::storage::{ObjectStorageError, StreamType};
4141
use crate::utils::header_parsing::ParseHeaderError;
4242
use crate::utils::json::{flatten::JsonFlattenError, strict::StrictValue};
43-
use actix_web::web::{self, Json, Path};
44-
use actix_web::{http::header::ContentType, HttpRequest, HttpResponse};
45-
use arrow_array::RecordBatch;
46-
use bytes::Bytes;
47-
use chrono::Utc;
48-
use http::StatusCode;
4943
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
44+
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
45+
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
5046
use prost::Message;
5147

5248
use super::logstream::error::{CreateStreamError, StreamError};
@@ -229,14 +225,25 @@ pub async fn handle_otel_logs_ingestion(
229225
}
230226

231227
if content_type == "application/x-protobuf" {
228+
const MAX_PROTOBUF_SIZE: usize = 10 * 1024 * 1024; // 10MB limit
229+
if body.len() > MAX_PROTOBUF_SIZE {
230+
return Err(PostError::Invalid(anyhow::anyhow!(
231+
"Protobuf message size {} exceeds maximum allowed size of {} bytes",
232+
body.len(),
233+
MAX_PROTOBUF_SIZE
234+
)));
235+
}
232236
match ExportLogsServiceRequest::decode(body) {
233237
Ok(json) => {
234238
for record in flatten_otel_protobuf(&json) {
235239
push_logs(&stream_name, record, &log_source, &p_custom_fields).await?;
236240
}
237241
}
238242
Err(e) => {
239-
return Err(PostError::Invalid(e.into()));
243+
return Err(PostError::Invalid(anyhow::anyhow!(
244+
"Failed to decode protobuf message: {}",
245+
e
246+
)));
240247
}
241248
}
242249
}
@@ -254,7 +261,7 @@ pub async fn handle_otel_logs_ingestion(
254261
// creates if stream does not exist
255262
pub async fn handle_otel_metrics_ingestion(
256263
req: HttpRequest,
257-
Json(json): Json<StrictValue>,
264+
body: web::Bytes,
258265
) -> Result<HttpResponse, PostError> {
259266
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
260267
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
@@ -303,13 +310,35 @@ pub async fn handle_otel_metrics_ingestion(
303310

304311
let p_custom_fields = get_custom_fields_from_header(&req);
305312

306-
flatten_and_push_logs(
307-
json.into_inner(),
308-
&stream_name,
309-
&log_source,
310-
&p_custom_fields,
311-
)
312-
.await?;
313+
match req.headers().get("Content-Type") {
314+
Some(content_type) => {
315+
if content_type == "application/json" {
316+
flatten_and_push_logs(
317+
serde_json::from_slice(&body)?,
318+
&stream_name,
319+
&log_source,
320+
&p_custom_fields,
321+
)
322+
.await?;
323+
}
324+
325+
if content_type == "application/x-protobuf" {
326+
match ExportMetricsServiceRequest::decode(body) {
327+
Ok(json) => {
328+
for record in flatten_otel_metrics_protobuf(&json) {
329+
push_logs(&stream_name, record, &log_source, &p_custom_fields).await?;
330+
}
331+
}
332+
Err(e) => {
333+
return Err(PostError::Invalid(e.into()));
334+
}
335+
}
336+
}
337+
}
338+
None => {
339+
return Err(PostError::Header(ParseHeaderError::InvalidValue));
340+
}
341+
}
313342

314343
Ok(HttpResponse::Ok().finish())
315344
}
@@ -319,7 +348,7 @@ pub async fn handle_otel_metrics_ingestion(
319348
// creates if stream does not exist
320349
pub async fn handle_otel_traces_ingestion(
321350
req: HttpRequest,
322-
Json(json): Json<StrictValue>,
351+
body: web::Bytes,
323352
) -> Result<HttpResponse, PostError> {
324353
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
325354
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
@@ -369,13 +398,35 @@ pub async fn handle_otel_traces_ingestion(
369398

370399
let p_custom_fields = get_custom_fields_from_header(&req);
371400

372-
flatten_and_push_logs(
373-
json.into_inner(),
374-
&stream_name,
375-
&log_source,
376-
&p_custom_fields,
377-
)
378-
.await?;
401+
match req.headers().get("Content-Type") {
402+
Some(content_type) => {
403+
if content_type == "application/json" {
404+
flatten_and_push_logs(
405+
serde_json::from_slice(&body)?,
406+
&stream_name,
407+
&log_source,
408+
&p_custom_fields,
409+
)
410+
.await?;
411+
}
412+
413+
if content_type == "application/x-protobuf" {
414+
match ExportTraceServiceRequest::decode(body) {
415+
Ok(json) => {
416+
for record in flatten_otel_traces_protobuf(&json) {
417+
push_logs(&stream_name, record, &log_source, &p_custom_fields).await?;
418+
}
419+
}
420+
Err(e) => {
421+
return Err(PostError::Invalid(e.into()));
422+
}
423+
}
424+
}
425+
}
426+
None => {
427+
return Err(PostError::Header(ParseHeaderError::InvalidValue));
428+
}
429+
}
379430

380431
Ok(HttpResponse::Ok().finish())
381432
}

src/otel/metrics.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* along with this program. If not, see <http://www.gnu.org/licenses/>.
1616
*
1717
*/
18+
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
1819
use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value as NumberDataPointValue;
1920
use opentelemetry_proto::tonic::metrics::v1::{
2021
Exemplar, ExponentialHistogram, Gauge, Histogram, Metric, MetricsData, NumberDataPoint, Sum,
@@ -601,3 +602,60 @@ fn flatten_data_point_flags(flags: u32) -> Map<String, Value> {
601602
);
602603
data_point_flags_json
603604
}
605+
606+
/// Flattens OpenTelemetry metrics from protobuf format
607+
pub fn flatten_otel_metrics_protobuf(message: &ExportMetricsServiceRequest) -> Vec<Value> {
608+
let mut vec_otel_json = Vec::new();
609+
for resource_metrics in &message.resource_metrics {
610+
let mut resource_metrics_json = Map::new();
611+
if let Some(resource) = &resource_metrics.resource {
612+
insert_attributes(&mut resource_metrics_json, &resource.attributes);
613+
resource_metrics_json.insert(
614+
"resource_dropped_attributes_count".to_string(),
615+
Value::Number(resource.dropped_attributes_count.into()),
616+
);
617+
}
618+
619+
let mut vec_resource_metrics_json = Vec::new();
620+
for scope_metrics in &resource_metrics.scope_metrics {
621+
// scope-level metadata
622+
let mut scope_metrics_json = Map::new();
623+
if let Some(scope) = &scope_metrics.scope {
624+
scope_metrics_json
625+
.insert("scope_name".to_string(), Value::String(scope.name.clone()));
626+
scope_metrics_json.insert(
627+
"scope_version".to_string(),
628+
Value::String(scope.version.clone()),
629+
);
630+
insert_attributes(&mut scope_metrics_json, &scope.attributes);
631+
scope_metrics_json.insert(
632+
"scope_dropped_attributes_count".to_string(),
633+
Value::Number(scope.dropped_attributes_count.into()),
634+
);
635+
}
636+
scope_metrics_json.insert(
637+
"scope_schema_url".to_string(),
638+
Value::String(scope_metrics.schema_url.clone()),
639+
);
640+
641+
for metric in &scope_metrics.metrics {
642+
vec_resource_metrics_json.extend(flatten_metrics_record(metric));
643+
}
644+
645+
for resource_metrics_json_item in &mut vec_resource_metrics_json {
646+
resource_metrics_json_item.extend(scope_metrics_json.clone());
647+
}
648+
}
649+
650+
resource_metrics_json.insert(
651+
"resource_schema_url".to_string(),
652+
Value::String(resource_metrics.schema_url.clone()),
653+
);
654+
655+
for resource_metrics_json_item in &mut vec_resource_metrics_json {
656+
resource_metrics_json_item.extend(resource_metrics_json.clone());
657+
vec_otel_json.push(Value::Object(resource_metrics_json_item.clone()));
658+
}
659+
}
660+
vec_otel_json
661+
}

src/otel/traces.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* along with this program. If not, see <http://www.gnu.org/licenses/>.
1616
*
1717
*/
18+
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
1819
use opentelemetry_proto::tonic::trace::v1::ScopeSpans;
1920
use opentelemetry_proto::tonic::trace::v1::Span;
2021
use opentelemetry_proto::tonic::trace::v1::Status;
@@ -932,3 +933,65 @@ mod tests {
932933
}
933934
}
934935
}
936+
937+
/// Flattens OpenTelemetry traces from protobuf format
938+
pub fn flatten_otel_traces_protobuf(message: &ExportTraceServiceRequest) -> Vec<Value> {
939+
let mut vec_otel_json = Vec::new();
940+
for resource_spans in &message.resource_spans {
941+
let mut resource_spans_json = Map::new();
942+
if let Some(resource) = &resource_spans.resource {
943+
insert_attributes(&mut resource_spans_json, &resource.attributes);
944+
resource_spans_json.insert(
945+
"resource_dropped_attributes_count".to_string(),
946+
Value::Number(resource.dropped_attributes_count.into()),
947+
);
948+
}
949+
950+
let mut vec_resource_spans_json: Vec<Map<String, Value>> = Vec::new();
951+
for scope_spans in &resource_spans.scope_spans {
952+
// 1) collect all span JSON under this scope
953+
let mut scope_spans_json = Vec::new();
954+
for span in &scope_spans.spans {
955+
scope_spans_json.extend(flatten_span_record(span));
956+
}
957+
958+
// 2) build a Map of scope‐level fields
959+
let mut scope_span_json = Map::new();
960+
if let Some(scope) = &scope_spans.scope {
961+
scope_span_json.insert("scope_name".to_string(), Value::String(scope.name.clone()));
962+
scope_span_json.insert(
963+
"scope_version".to_string(),
964+
Value::String(scope.version.clone()),
965+
);
966+
insert_attributes(&mut scope_span_json, &scope.attributes);
967+
scope_span_json.insert(
968+
"scope_dropped_attributes_count".to_string(),
969+
Value::Number(scope.dropped_attributes_count.into()),
970+
);
971+
}
972+
scope_span_json.insert(
973+
"scope_schema_url".to_string(),
974+
Value::String(scope_spans.schema_url.clone()),
975+
);
976+
977+
// 3) merge scope fields into each span record
978+
for span_json in &mut scope_spans_json {
979+
span_json.extend(scope_span_json.clone());
980+
}
981+
982+
// 4) append to the resource‐level accumulator
983+
vec_resource_spans_json.extend(scope_spans_json);
984+
}
985+
986+
resource_spans_json.insert(
987+
"resource_schema_url".to_string(),
988+
Value::String(resource_spans.schema_url.clone()),
989+
);
990+
991+
for resource_spans_json_item in &mut vec_resource_spans_json {
992+
resource_spans_json_item.extend(resource_spans_json.clone());
993+
vec_otel_json.push(Value::Object(resource_spans_json_item.clone()));
994+
}
995+
}
996+
vec_otel_json
997+
}

0 commit comments

Comments
 (0)