diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 89ba947b9..7ba35b4da 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -29,25 +29,19 @@ use crate::event::error::EventError; use crate::event::format::known_schema::{self, KNOWN_SCHEMA_LIST}; use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry}; use crate::event::{self, FORMAT_KEY, USER_AGENT_KEY}; -use crate::handlers::http::MAX_EVENT_PAYLOAD_SIZE; -use crate::handlers::http::modal::utils::ingest_utils::push_logs; use crate::handlers::{ CONTENT_TYPE_JSON, CONTENT_TYPE_PROTOBUF, EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY, TELEMETRY_TYPE_KEY, TelemetryType, }; use crate::metadata::SchemaVersion; use crate::option::Mode; -use crate::otel::logs::{OTEL_LOG_KNOWN_FIELD_LIST, flatten_otel_protobuf}; -use crate::otel::metrics::{OTEL_METRICS_KNOWN_FIELD_LIST, flatten_otel_metrics_protobuf}; -use crate::otel::traces::{OTEL_TRACES_KNOWN_FIELD_LIST, flatten_otel_traces_protobuf}; +use crate::otel::logs::OTEL_LOG_KNOWN_FIELD_LIST; +use crate::otel::metrics::OTEL_METRICS_KNOWN_FIELD_LIST; +use crate::otel::traces::OTEL_TRACES_KNOWN_FIELD_LIST; use crate::parseable::{PARSEABLE, StreamNotFound}; use crate::storage::{ObjectStorageError, StreamType}; use crate::utils::header_parsing::ParseHeaderError; use crate::utils::json::{flatten::JsonFlattenError, strict::StrictValue}; -use opentelemetry_proto::tonic::collector::logs::v1::ExportLogsServiceRequest; -use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; -use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest; -use prost::Message; use super::logstream::error::{CreateStreamError, StreamError}; use super::modal::utils::ingest_utils::{flatten_and_push_logs, get_custom_fields_from_header}; @@ -169,7 +163,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< } // Common validation and setup for OTEL ingestion -async fn setup_otel_stream( +pub async fn setup_otel_stream( req: &HttpRequest, expected_log_source: LogSource, known_fields: &[&str], @@ -241,18 +235,12 @@ async fn setup_otel_stream( } // Common content processing for OTEL ingestion -async fn process_otel_content( +async fn process_otel_content( req: &HttpRequest, body: web::Bytes, stream_name: &str, log_source: &LogSource, - decode_protobuf: F, - flatten_protobuf: fn(&T) -> Vec, -) -> Result<(), PostError> -where - T: prost::Message + Default, - F: FnOnce(web::Bytes) -> Result, -{ +) -> Result<(), PostError> { let p_custom_fields = get_custom_fields_from_header(req); match req @@ -270,27 +258,9 @@ where ) .await?; } else if content_type == CONTENT_TYPE_PROTOBUF { - // 10MB limit - if body.len() > MAX_EVENT_PAYLOAD_SIZE { - return Err(PostError::Invalid(anyhow::anyhow!( - "Protobuf message size {} exceeds maximum allowed size of {} bytes", - body.len(), - MAX_EVENT_PAYLOAD_SIZE - ))); - } - match decode_protobuf(body) { - Ok(decoded) => { - for record in flatten_protobuf(&decoded) { - push_logs(stream_name, record, log_source, &p_custom_fields).await?; - } - } - Err(e) => { - return Err(PostError::Invalid(anyhow::anyhow!( - "Failed to decode protobuf message: {}", - e - ))); - } - } + return Err(PostError::Invalid(anyhow::anyhow!( + "Protobuf ingestion is not supported in Parseable OSS" + ))); } else { return Err(PostError::Invalid(anyhow::anyhow!( "Unsupported Content-Type: {}. Expected application/json or application/x-protobuf", @@ -323,15 +293,7 @@ pub async fn handle_otel_logs_ingestion( ) .await?; - process_otel_content( - &req, - body, - &stream_name, - &log_source, - ExportLogsServiceRequest::decode, - flatten_otel_protobuf, - ) - .await?; + process_otel_content(&req, body, &stream_name, &log_source).await?; Ok(HttpResponse::Ok().finish()) } @@ -351,15 +313,7 @@ pub async fn handle_otel_metrics_ingestion( ) .await?; - process_otel_content( - &req, - body, - &stream_name, - &log_source, - ExportMetricsServiceRequest::decode, - flatten_otel_metrics_protobuf, - ) - .await?; + process_otel_content(&req, body, &stream_name, &log_source).await?; Ok(HttpResponse::Ok().finish()) } @@ -379,15 +333,7 @@ pub async fn handle_otel_traces_ingestion( ) .await?; - process_otel_content( - &req, - body, - &stream_name, - &log_source, - ExportTraceServiceRequest::decode, - flatten_otel_traces_protobuf, - ) - .await?; + process_otel_content(&req, body, &stream_name, &log_source).await?; Ok(HttpResponse::Ok().finish()) } diff --git a/src/handlers/mod.rs b/src/handlers/mod.rs index b1263c775..e3dd453f0 100644 --- a/src/handlers/mod.rs +++ b/src/handlers/mod.rs @@ -25,7 +25,7 @@ pub mod http; pub mod livetail; pub const STREAM_NAME_HEADER_KEY: &str = "x-p-stream"; -const LOG_SOURCE_KEY: &str = "x-p-log-source"; +pub const LOG_SOURCE_KEY: &str = "x-p-log-source"; const EXTRACT_LOG_KEY: &str = "x-p-extract-log"; const TIME_PARTITION_KEY: &str = "x-p-time-partition"; const TIME_PARTITION_LIMIT_KEY: &str = "x-p-time-partition-limit"; diff --git a/src/lib.rs b/src/lib.rs index f34a62448..d38b134d5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,7 +48,7 @@ pub mod storage; pub mod sync; pub mod users; pub mod utils; -mod validator; +pub mod validator; use std::time::Duration;