@@ -29,25 +29,19 @@ use crate::event::error::EventError;
29
29
use crate :: event:: format:: known_schema:: { self , KNOWN_SCHEMA_LIST } ;
30
30
use crate :: event:: format:: { self , EventFormat , LogSource , LogSourceEntry } ;
31
31
use crate :: event:: { self , FORMAT_KEY , USER_AGENT_KEY } ;
32
- use crate :: handlers:: http:: MAX_EVENT_PAYLOAD_SIZE ;
33
- use crate :: handlers:: http:: modal:: utils:: ingest_utils:: push_logs;
34
32
use crate :: handlers:: {
35
33
CONTENT_TYPE_JSON , CONTENT_TYPE_PROTOBUF , EXTRACT_LOG_KEY , LOG_SOURCE_KEY ,
36
34
STREAM_NAME_HEADER_KEY , TELEMETRY_TYPE_KEY , TelemetryType ,
37
35
} ;
38
36
use crate :: metadata:: SchemaVersion ;
39
37
use crate :: option:: Mode ;
40
- use crate :: otel:: logs:: { OTEL_LOG_KNOWN_FIELD_LIST , flatten_otel_protobuf } ;
41
- use crate :: otel:: metrics:: { OTEL_METRICS_KNOWN_FIELD_LIST , flatten_otel_metrics_protobuf } ;
42
- use crate :: otel:: traces:: { OTEL_TRACES_KNOWN_FIELD_LIST , flatten_otel_traces_protobuf } ;
38
+ use crate :: otel:: logs:: OTEL_LOG_KNOWN_FIELD_LIST ;
39
+ use crate :: otel:: metrics:: OTEL_METRICS_KNOWN_FIELD_LIST ;
40
+ use crate :: otel:: traces:: OTEL_TRACES_KNOWN_FIELD_LIST ;
43
41
use crate :: parseable:: { PARSEABLE , StreamNotFound } ;
44
42
use crate :: storage:: { ObjectStorageError , StreamType } ;
45
43
use crate :: utils:: header_parsing:: ParseHeaderError ;
46
44
use crate :: utils:: json:: { flatten:: JsonFlattenError , strict:: StrictValue } ;
47
- use opentelemetry_proto:: tonic:: collector:: logs:: v1:: ExportLogsServiceRequest ;
48
- use opentelemetry_proto:: tonic:: collector:: metrics:: v1:: ExportMetricsServiceRequest ;
49
- use opentelemetry_proto:: tonic:: collector:: trace:: v1:: ExportTraceServiceRequest ;
50
- use prost:: Message ;
51
45
52
46
use super :: logstream:: error:: { CreateStreamError , StreamError } ;
53
47
use super :: modal:: utils:: ingest_utils:: { flatten_and_push_logs, get_custom_fields_from_header} ;
@@ -169,7 +163,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
169
163
}
170
164
171
165
// Common validation and setup for OTEL ingestion
172
- async fn setup_otel_stream (
166
+ pub async fn setup_otel_stream (
173
167
req : & HttpRequest ,
174
168
expected_log_source : LogSource ,
175
169
known_fields : & [ & str ] ,
@@ -241,18 +235,12 @@ async fn setup_otel_stream(
241
235
}
242
236
243
237
// Common content processing for OTEL ingestion
244
- async fn process_otel_content < T , F > (
238
+ async fn process_otel_content (
245
239
req : & HttpRequest ,
246
240
body : web:: Bytes ,
247
241
stream_name : & str ,
248
242
log_source : & LogSource ,
249
- decode_protobuf : F ,
250
- flatten_protobuf : fn ( & T ) -> Vec < serde_json:: Value > ,
251
- ) -> Result < ( ) , PostError >
252
- where
253
- T : prost:: Message + Default ,
254
- F : FnOnce ( web:: Bytes ) -> Result < T , prost:: DecodeError > ,
255
- {
243
+ ) -> Result < ( ) , PostError > {
256
244
let p_custom_fields = get_custom_fields_from_header ( req) ;
257
245
258
246
match req
@@ -270,27 +258,9 @@ where
270
258
)
271
259
. await ?;
272
260
} else if content_type == CONTENT_TYPE_PROTOBUF {
273
- // 10MB limit
274
- if body. len ( ) > MAX_EVENT_PAYLOAD_SIZE {
275
- return Err ( PostError :: Invalid ( anyhow:: anyhow!(
276
- "Protobuf message size {} exceeds maximum allowed size of {} bytes" ,
277
- body. len( ) ,
278
- MAX_EVENT_PAYLOAD_SIZE
279
- ) ) ) ;
280
- }
281
- match decode_protobuf ( body) {
282
- Ok ( decoded) => {
283
- for record in flatten_protobuf ( & decoded) {
284
- push_logs ( stream_name, record, log_source, & p_custom_fields) . await ?;
285
- }
286
- }
287
- Err ( e) => {
288
- return Err ( PostError :: Invalid ( anyhow:: anyhow!(
289
- "Failed to decode protobuf message: {}" ,
290
- e
291
- ) ) ) ;
292
- }
293
- }
261
+ return Err ( PostError :: Invalid ( anyhow:: anyhow!(
262
+ "Protobuf ingestion is not supported in Parseable OSS"
263
+ ) ) ) ;
294
264
} else {
295
265
return Err ( PostError :: Invalid ( anyhow:: anyhow!(
296
266
"Unsupported Content-Type: {}. Expected application/json or application/x-protobuf" ,
@@ -323,15 +293,7 @@ pub async fn handle_otel_logs_ingestion(
323
293
)
324
294
. await ?;
325
295
326
- process_otel_content (
327
- & req,
328
- body,
329
- & stream_name,
330
- & log_source,
331
- ExportLogsServiceRequest :: decode,
332
- flatten_otel_protobuf,
333
- )
334
- . await ?;
296
+ process_otel_content ( & req, body, & stream_name, & log_source) . await ?;
335
297
336
298
Ok ( HttpResponse :: Ok ( ) . finish ( ) )
337
299
}
@@ -351,15 +313,7 @@ pub async fn handle_otel_metrics_ingestion(
351
313
)
352
314
. await ?;
353
315
354
- process_otel_content (
355
- & req,
356
- body,
357
- & stream_name,
358
- & log_source,
359
- ExportMetricsServiceRequest :: decode,
360
- flatten_otel_metrics_protobuf,
361
- )
362
- . await ?;
316
+ process_otel_content ( & req, body, & stream_name, & log_source) . await ?;
363
317
364
318
Ok ( HttpResponse :: Ok ( ) . finish ( ) )
365
319
}
@@ -379,15 +333,7 @@ pub async fn handle_otel_traces_ingestion(
379
333
)
380
334
. await ?;
381
335
382
- process_otel_content (
383
- & req,
384
- body,
385
- & stream_name,
386
- & log_source,
387
- ExportTraceServiceRequest :: decode,
388
- flatten_otel_traces_protobuf,
389
- )
390
- . await ?;
336
+ process_otel_content ( & req, body, & stream_name, & log_source) . await ?;
391
337
392
338
Ok ( HttpResponse :: Ok ( ) . finish ( ) )
393
339
}
0 commit comments