Skip to content

Commit ce66c19

Browse files
add protobuf support for otel metrics and traces
1 parent 03e67b4 commit ce66c19

File tree

3 files changed

+197
-19
lines changed

3 files changed

+197
-19
lines changed

src/handlers/http/ingest.rs

Lines changed: 76 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,15 @@ use crate::handlers::{
3636
use crate::metadata::SchemaVersion;
3737
use crate::option::Mode;
3838
use crate::otel::logs::{OTEL_LOG_KNOWN_FIELD_LIST, flatten_otel_protobuf};
39-
use crate::otel::metrics::OTEL_METRICS_KNOWN_FIELD_LIST;
40-
use crate::otel::traces::OTEL_TRACES_KNOWN_FIELD_LIST;
39+
use crate::otel::metrics::{OTEL_METRICS_KNOWN_FIELD_LIST, flatten_otel_metrics_protobuf};
40+
use crate::otel::traces::{OTEL_TRACES_KNOWN_FIELD_LIST, flatten_otel_traces_protobuf};
4141
use crate::parseable::{PARSEABLE, StreamNotFound};
4242
use crate::storage::{ObjectStorageError, StreamType};
4343
use crate::utils::header_parsing::ParseHeaderError;
4444
use crate::utils::json::{flatten::JsonFlattenError, strict::StrictValue};
4545
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
46+
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
47+
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
4648
use prost::Message;
4749

4850
use super::logstream::error::{CreateStreamError, StreamError};
@@ -233,14 +235,25 @@ pub async fn handle_otel_logs_ingestion(
233235
}
234236

235237
if content_type == "application/x-protobuf" {
238+
const MAX_PROTOBUF_SIZE: usize = 10 * 1024 * 1024; // 10MB limit
239+
if body.len() > MAX_PROTOBUF_SIZE {
240+
return Err(PostError::Invalid(anyhow::anyhow!(
241+
"Protobuf message size {} exceeds maximum allowed size of {} bytes",
242+
body.len(),
243+
MAX_PROTOBUF_SIZE
244+
)));
245+
}
236246
match ExportLogsServiceRequest::decode(body) {
237247
Ok(json) => {
238248
for record in flatten_otel_protobuf(&json) {
239249
push_logs(&stream_name, record, &log_source, &p_custom_fields).await?;
240250
}
241251
}
242252
Err(e) => {
243-
return Err(PostError::Invalid(e.into()));
253+
return Err(PostError::Invalid(anyhow::anyhow!(
254+
"Failed to decode protobuf message: {}",
255+
e
256+
)));
244257
}
245258
}
246259
}
@@ -258,7 +271,7 @@ pub async fn handle_otel_logs_ingestion(
258271
// creates if stream does not exist
259272
pub async fn handle_otel_metrics_ingestion(
260273
req: HttpRequest,
261-
Json(json): Json<StrictValue>,
274+
body: web::Bytes,
262275
) -> Result<HttpResponse, PostError> {
263276
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
264277
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
@@ -308,13 +321,35 @@ pub async fn handle_otel_metrics_ingestion(
308321

309322
let p_custom_fields = get_custom_fields_from_header(&req);
310323

311-
flatten_and_push_logs(
312-
json.into_inner(),
313-
&stream_name,
314-
&log_source,
315-
&p_custom_fields,
316-
)
317-
.await?;
324+
match req.headers().get("Content-Type") {
325+
Some(content_type) => {
326+
if content_type == "application/json" {
327+
flatten_and_push_logs(
328+
serde_json::from_slice(&body)?,
329+
&stream_name,
330+
&log_source,
331+
&p_custom_fields,
332+
)
333+
.await?;
334+
}
335+
336+
if content_type == "application/x-protobuf" {
337+
match ExportMetricsServiceRequest::decode(body) {
338+
Ok(json) => {
339+
for record in flatten_otel_metrics_protobuf(&json) {
340+
push_logs(&stream_name, record, &log_source, &p_custom_fields).await?;
341+
}
342+
}
343+
Err(e) => {
344+
return Err(PostError::Invalid(e.into()));
345+
}
346+
}
347+
}
348+
}
349+
None => {
350+
return Err(PostError::Header(ParseHeaderError::InvalidValue));
351+
}
352+
}
318353

319354
Ok(HttpResponse::Ok().finish())
320355
}
@@ -324,7 +359,7 @@ pub async fn handle_otel_metrics_ingestion(
324359
// creates if stream does not exist
325360
pub async fn handle_otel_traces_ingestion(
326361
req: HttpRequest,
327-
Json(json): Json<StrictValue>,
362+
body: web::Bytes,
328363
) -> Result<HttpResponse, PostError> {
329364
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
330365
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
@@ -375,13 +410,35 @@ pub async fn handle_otel_traces_ingestion(
375410

376411
let p_custom_fields = get_custom_fields_from_header(&req);
377412

378-
flatten_and_push_logs(
379-
json.into_inner(),
380-
&stream_name,
381-
&log_source,
382-
&p_custom_fields,
383-
)
384-
.await?;
413+
match req.headers().get("Content-Type") {
414+
Some(content_type) => {
415+
if content_type == "application/json" {
416+
flatten_and_push_logs(
417+
serde_json::from_slice(&body)?,
418+
&stream_name,
419+
&log_source,
420+
&p_custom_fields,
421+
)
422+
.await?;
423+
}
424+
425+
if content_type == "application/x-protobuf" {
426+
match ExportTraceServiceRequest::decode(body) {
427+
Ok(json) => {
428+
for record in flatten_otel_traces_protobuf(&json) {
429+
push_logs(&stream_name, record, &log_source, &p_custom_fields).await?;
430+
}
431+
}
432+
Err(e) => {
433+
return Err(PostError::Invalid(e.into()));
434+
}
435+
}
436+
}
437+
}
438+
None => {
439+
return Err(PostError::Header(ParseHeaderError::InvalidValue));
440+
}
441+
}
385442

386443
Ok(HttpResponse::Ok().finish())
387444
}

src/otel/metrics.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* along with this program. If not, see <http://www.gnu.org/licenses/>.
1616
*
1717
*/
18+
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
1819
use opentelemetry_proto::tonic::metrics::v1::number_data_point::Value as NumberDataPointValue;
1920
use opentelemetry_proto::tonic::metrics::v1::{
2021
Exemplar, ExponentialHistogram, Gauge, Histogram, Metric, MetricsData, NumberDataPoint, Sum,
@@ -601,3 +602,60 @@ fn flatten_data_point_flags(flags: u32) -> Map<String, Value> {
601602
);
602603
data_point_flags_json
603604
}
605+
606+
/// Flattens OpenTelemetry metrics from protobuf format
607+
pub fn flatten_otel_metrics_protobuf(message: &ExportMetricsServiceRequest) -> Vec<Value> {
608+
let mut vec_otel_json = Vec::new();
609+
for resource_metrics in &message.resource_metrics {
610+
let mut resource_metrics_json = Map::new();
611+
if let Some(resource) = &resource_metrics.resource {
612+
insert_attributes(&mut resource_metrics_json, &resource.attributes);
613+
resource_metrics_json.insert(
614+
"resource_dropped_attributes_count".to_string(),
615+
Value::Number(resource.dropped_attributes_count.into()),
616+
);
617+
}
618+
619+
let mut vec_resource_metrics_json = Vec::new();
620+
for scope_metrics in &resource_metrics.scope_metrics {
621+
// scope-level metadata
622+
let mut scope_metrics_json = Map::new();
623+
if let Some(scope) = &scope_metrics.scope {
624+
scope_metrics_json
625+
.insert("scope_name".to_string(), Value::String(scope.name.clone()));
626+
scope_metrics_json.insert(
627+
"scope_version".to_string(),
628+
Value::String(scope.version.clone()),
629+
);
630+
insert_attributes(&mut scope_metrics_json, &scope.attributes);
631+
scope_metrics_json.insert(
632+
"scope_dropped_attributes_count".to_string(),
633+
Value::Number(scope.dropped_attributes_count.into()),
634+
);
635+
}
636+
scope_metrics_json.insert(
637+
"scope_schema_url".to_string(),
638+
Value::String(scope_metrics.schema_url.clone()),
639+
);
640+
641+
for metric in &scope_metrics.metrics {
642+
vec_resource_metrics_json.extend(flatten_metrics_record(metric));
643+
}
644+
645+
for resource_metrics_json_item in &mut vec_resource_metrics_json {
646+
resource_metrics_json_item.extend(scope_metrics_json.clone());
647+
}
648+
}
649+
650+
resource_metrics_json.insert(
651+
"resource_schema_url".to_string(),
652+
Value::String(resource_metrics.schema_url.clone()),
653+
);
654+
655+
for resource_metrics_json_item in &mut vec_resource_metrics_json {
656+
resource_metrics_json_item.extend(resource_metrics_json.clone());
657+
vec_otel_json.push(Value::Object(resource_metrics_json_item.clone()));
658+
}
659+
}
660+
vec_otel_json
661+
}

src/otel/traces.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* along with this program. If not, see <http://www.gnu.org/licenses/>.
1616
*
1717
*/
18+
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
1819
use opentelemetry_proto::tonic::trace::v1::ScopeSpans;
1920
use opentelemetry_proto::tonic::trace::v1::Span;
2021
use opentelemetry_proto::tonic::trace::v1::Status;
@@ -932,3 +933,65 @@ mod tests {
932933
}
933934
}
934935
}
936+
937+
/// Flattens OpenTelemetry traces from protobuf format
938+
pub fn flatten_otel_traces_protobuf(message: &ExportTraceServiceRequest) -> Vec<Value> {
939+
let mut vec_otel_json = Vec::new();
940+
for resource_spans in &message.resource_spans {
941+
let mut resource_spans_json = Map::new();
942+
if let Some(resource) = &resource_spans.resource {
943+
insert_attributes(&mut resource_spans_json, &resource.attributes);
944+
resource_spans_json.insert(
945+
"resource_dropped_attributes_count".to_string(),
946+
Value::Number(resource.dropped_attributes_count.into()),
947+
);
948+
}
949+
950+
let mut vec_resource_spans_json: Vec<Map<String, Value>> = Vec::new();
951+
for scope_spans in &resource_spans.scope_spans {
952+
// 1) collect all span JSON under this scope
953+
let mut scope_spans_json = Vec::new();
954+
for span in &scope_spans.spans {
955+
scope_spans_json.extend(flatten_span_record(span));
956+
}
957+
958+
// 2) build a Map of scope‐level fields
959+
let mut scope_span_json = Map::new();
960+
if let Some(scope) = &scope_spans.scope {
961+
scope_span_json.insert("scope_name".to_string(), Value::String(scope.name.clone()));
962+
scope_span_json.insert(
963+
"scope_version".to_string(),
964+
Value::String(scope.version.clone()),
965+
);
966+
insert_attributes(&mut scope_span_json, &scope.attributes);
967+
scope_span_json.insert(
968+
"scope_dropped_attributes_count".to_string(),
969+
Value::Number(scope.dropped_attributes_count.into()),
970+
);
971+
}
972+
scope_span_json.insert(
973+
"scope_schema_url".to_string(),
974+
Value::String(scope_spans.schema_url.clone()),
975+
);
976+
977+
// 3) merge scope fields into each span record
978+
for span_json in &mut scope_spans_json {
979+
span_json.extend(scope_span_json.clone());
980+
}
981+
982+
// 4) append to the resource‐level accumulator
983+
vec_resource_spans_json.extend(scope_spans_json);
984+
}
985+
986+
resource_spans_json.insert(
987+
"resource_schema_url".to_string(),
988+
Value::String(resource_spans.schema_url.clone()),
989+
);
990+
991+
for resource_spans_json_item in &mut vec_resource_spans_json {
992+
resource_spans_json_item.extend(resource_spans_json.clone());
993+
vec_otel_json.push(Value::Object(resource_spans_json_item.clone()));
994+
}
995+
}
996+
vec_otel_json
997+
}

0 commit comments

Comments
 (0)