Skip to content

remove protobuf for oTel ingestion #1406

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 12 additions & 66 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,19 @@ use crate::event::error::EventError;
use crate::event::format::known_schema::{self, KNOWN_SCHEMA_LIST};
use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry};
use crate::event::{self, FORMAT_KEY, USER_AGENT_KEY};
use crate::handlers::http::MAX_EVENT_PAYLOAD_SIZE;
use crate::handlers::http::modal::utils::ingest_utils::push_logs;
use crate::handlers::{
CONTENT_TYPE_JSON, CONTENT_TYPE_PROTOBUF, EXTRACT_LOG_KEY, LOG_SOURCE_KEY,
STREAM_NAME_HEADER_KEY, TELEMETRY_TYPE_KEY, TelemetryType,
};
use crate::metadata::SchemaVersion;
use crate::option::Mode;
use crate::otel::logs::{OTEL_LOG_KNOWN_FIELD_LIST, flatten_otel_protobuf};
use crate::otel::metrics::{OTEL_METRICS_KNOWN_FIELD_LIST, flatten_otel_metrics_protobuf};
use crate::otel::traces::{OTEL_TRACES_KNOWN_FIELD_LIST, flatten_otel_traces_protobuf};
use crate::otel::logs::OTEL_LOG_KNOWN_FIELD_LIST;
use crate::otel::metrics::OTEL_METRICS_KNOWN_FIELD_LIST;
use crate::otel::traces::OTEL_TRACES_KNOWN_FIELD_LIST;
use crate::parseable::{PARSEABLE, StreamNotFound};
use crate::storage::{ObjectStorageError, StreamType};
use crate::utils::header_parsing::ParseHeaderError;
use crate::utils::json::{flatten::JsonFlattenError, strict::StrictValue};
use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest;
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use prost::Message;

use super::logstream::error::{CreateStreamError, StreamError};
use super::modal::utils::ingest_utils::{flatten_and_push_logs, get_custom_fields_from_header};
Expand Down Expand Up @@ -169,7 +163,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
}

// Common validation and setup for OTEL ingestion
async fn setup_otel_stream(
pub async fn setup_otel_stream(
req: &HttpRequest,
expected_log_source: LogSource,
known_fields: &[&str],
Expand Down Expand Up @@ -241,18 +235,12 @@ async fn setup_otel_stream(
}

// Common content processing for OTEL ingestion
async fn process_otel_content<T, F>(
async fn process_otel_content(
req: &HttpRequest,
body: web::Bytes,
stream_name: &str,
log_source: &LogSource,
decode_protobuf: F,
flatten_protobuf: fn(&T) -> Vec<serde_json::Value>,
) -> Result<(), PostError>
where
T: prost::Message + Default,
F: FnOnce(web::Bytes) -> Result<T, prost::DecodeError>,
{
) -> Result<(), PostError> {
let p_custom_fields = get_custom_fields_from_header(req);

match req
Expand All @@ -270,27 +258,9 @@ where
)
.await?;
} else if content_type == CONTENT_TYPE_PROTOBUF {
// 10MB limit
if body.len() > MAX_EVENT_PAYLOAD_SIZE {
return Err(PostError::Invalid(anyhow::anyhow!(
"Protobuf message size {} exceeds maximum allowed size of {} bytes",
body.len(),
MAX_EVENT_PAYLOAD_SIZE
)));
}
match decode_protobuf(body) {
Ok(decoded) => {
for record in flatten_protobuf(&decoded) {
push_logs(stream_name, record, log_source, &p_custom_fields).await?;
}
}
Err(e) => {
return Err(PostError::Invalid(anyhow::anyhow!(
"Failed to decode protobuf message: {}",
e
)));
}
}
return Err(PostError::Invalid(anyhow::anyhow!(
"Protobuf ingestion is not supported in Parseable OSS"
)));
} else {
return Err(PostError::Invalid(anyhow::anyhow!(
"Unsupported Content-Type: {}. Expected application/json or application/x-protobuf",
Expand Down Expand Up @@ -323,15 +293,7 @@ pub async fn handle_otel_logs_ingestion(
)
.await?;

process_otel_content(
&req,
body,
&stream_name,
&log_source,
ExportLogsServiceRequest::decode,
flatten_otel_protobuf,
)
.await?;
process_otel_content(&req, body, &stream_name, &log_source).await?;

Ok(HttpResponse::Ok().finish())
}
Expand All @@ -351,15 +313,7 @@ pub async fn handle_otel_metrics_ingestion(
)
.await?;

process_otel_content(
&req,
body,
&stream_name,
&log_source,
ExportMetricsServiceRequest::decode,
flatten_otel_metrics_protobuf,
)
.await?;
process_otel_content(&req, body, &stream_name, &log_source).await?;

Ok(HttpResponse::Ok().finish())
}
Expand All @@ -379,15 +333,7 @@ pub async fn handle_otel_traces_ingestion(
)
.await?;

process_otel_content(
&req,
body,
&stream_name,
&log_source,
ExportTraceServiceRequest::decode,
flatten_otel_traces_protobuf,
)
.await?;
process_otel_content(&req, body, &stream_name, &log_source).await?;

Ok(HttpResponse::Ok().finish())
}
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub mod http;
pub mod livetail;

pub const STREAM_NAME_HEADER_KEY: &str = "x-p-stream";
const LOG_SOURCE_KEY: &str = "x-p-log-source";
pub const LOG_SOURCE_KEY: &str = "x-p-log-source";
const EXTRACT_LOG_KEY: &str = "x-p-extract-log";
const TIME_PARTITION_KEY: &str = "x-p-time-partition";
const TIME_PARTITION_LIMIT_KEY: &str = "x-p-time-partition-limit";
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub mod storage;
pub mod sync;
pub mod users;
pub mod utils;
mod validator;
pub mod validator;

use std::time::Duration;

Expand Down
Loading