Skip to content

Commit bf2b2cf

Browse files
refactor protobuf ingestion code
1 parent a22a12c commit bf2b2cf

File tree

5 files changed

+258
-369
lines changed

5 files changed

+258
-369
lines changed

src/handlers/http/ingest.rs

Lines changed: 123 additions & 193 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,12 @@ use crate::event::error::EventError;
2929
use crate::event::format::known_schema::{self, KNOWN_SCHEMA_LIST};
3030
use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry};
3131
use crate::event::{self, FORMAT_KEY, USER_AGENT_KEY};
32+
use crate::handlers::http::MAX_EVENT_PAYLOAD_SIZE;
3233
use crate::handlers::http::modal::utils::ingest_utils::push_logs;
33-
use crate::handlers::{EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY};
34+
use crate::handlers::{
35+
CONTENT_TYPE_JSON, CONTENT_TYPE_PROTOBUF, EXTRACT_LOG_KEY, LOG_SOURCE_KEY,
36+
STREAM_NAME_HEADER_KEY,
37+
};
3438
use crate::metadata::SchemaVersion;
3539
use crate::option::Mode;
3640
use crate::otel::logs::{OTEL_LOG_KNOWN_FIELD_LIST, flatten_otel_protobuf};
@@ -157,34 +161,32 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
157161
Ok(())
158162
}
159163

160-
// Handler for POST /v1/logs to ingest OTEL logs
161-
// ingests events by extracting stream name from header
162-
// creates if stream does not exist
163-
pub async fn handle_otel_logs_ingestion(
164-
req: HttpRequest,
165-
body: web::Bytes,
166-
) -> Result<HttpResponse, PostError> {
164+
// Common validation and setup for OTEL ingestion
165+
async fn setup_otel_stream(
166+
req: &HttpRequest,
167+
expected_log_source: LogSource,
168+
known_fields: &[&str],
169+
) -> Result<(String, LogSource, LogSourceEntry), PostError> {
167170
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
168171
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
169172
};
170173

171174
let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else {
172175
return Err(PostError::Header(ParseHeaderError::MissingLogSource));
173176
};
177+
174178
let log_source = LogSource::from(log_source.to_str().unwrap());
175-
if log_source != LogSource::OtelLogs {
176-
return Err(PostError::IncorrectLogSource(LogSource::OtelLogs));
179+
if log_source != expected_log_source {
180+
return Err(PostError::IncorrectLogSource(expected_log_source));
177181
}
178182

179183
let stream_name = stream_name.to_str().unwrap().to_owned();
180184

181185
let log_source_entry = LogSourceEntry::new(
182186
log_source.clone(),
183-
OTEL_LOG_KNOWN_FIELD_LIST
184-
.iter()
185-
.map(|&s| s.to_string())
186-
.collect(),
187+
known_fields.iter().map(|&s| s.to_string()).collect(),
187188
);
189+
188190
PARSEABLE
189191
.create_stream_if_not_exists(
190192
&stream_name,
@@ -194,49 +196,83 @@ pub async fn handle_otel_logs_ingestion(
194196
)
195197
.await?;
196198

197-
//if stream exists, fetch the stream log source
198-
//return error if the stream log source is otel traces or otel metrics
199+
// Validate stream compatibility
199200
if let Ok(stream) = PARSEABLE.get_stream(&stream_name) {
200-
stream
201-
.get_log_source()
202-
.iter()
203-
.find(|&stream_log_source_entry| {
204-
stream_log_source_entry.log_source_format != LogSource::OtelTraces
205-
&& stream_log_source_entry.log_source_format != LogSource::OtelMetrics
206-
})
207-
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;
201+
match log_source {
202+
LogSource::OtelLogs => {
203+
// For logs, reject if stream is metrics or traces
204+
stream
205+
.get_log_source()
206+
.iter()
207+
.find(|&stream_log_source_entry| {
208+
stream_log_source_entry.log_source_format != LogSource::OtelTraces
209+
&& stream_log_source_entry.log_source_format != LogSource::OtelMetrics
210+
})
211+
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;
212+
}
213+
LogSource::OtelMetrics | LogSource::OtelTraces => {
214+
// For metrics/traces, only allow same type
215+
stream
216+
.get_log_source()
217+
.iter()
218+
.find(|&stream_log_source_entry| {
219+
stream_log_source_entry.log_source_format == log_source
220+
})
221+
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;
222+
}
223+
_ => {}
224+
}
208225
}
209226

210227
PARSEABLE
211-
.add_update_log_source(&stream_name, log_source_entry)
228+
.add_update_log_source(&stream_name, log_source_entry.clone())
212229
.await?;
213230

214-
let p_custom_fields = get_custom_fields_from_header(&req);
215-
match req.headers().get("Content-Type") {
231+
Ok((stream_name, log_source, log_source_entry))
232+
}
233+
234+
// Common content processing for OTEL ingestion
235+
async fn process_otel_content<T, F>(
236+
req: &HttpRequest,
237+
body: web::Bytes,
238+
stream_name: &str,
239+
log_source: &LogSource,
240+
decode_protobuf: F,
241+
flatten_protobuf: fn(&T) -> Vec<serde_json::Value>,
242+
) -> Result<(), PostError>
243+
where
244+
T: prost::Message + Default,
245+
F: FnOnce(web::Bytes) -> Result<T, prost::DecodeError>,
246+
{
247+
let p_custom_fields = get_custom_fields_from_header(req);
248+
249+
match req
250+
.headers()
251+
.get("Content-Type")
252+
.and_then(|h| h.to_str().ok())
253+
{
216254
Some(content_type) => {
217-
if content_type == "application/json" {
255+
if content_type == CONTENT_TYPE_JSON {
218256
flatten_and_push_logs(
219257
serde_json::from_slice(&body)?,
220-
&stream_name,
221-
&log_source,
258+
stream_name,
259+
log_source,
222260
&p_custom_fields,
223261
)
224262
.await?;
225-
}
226-
227-
if content_type == "application/x-protobuf" {
228-
const MAX_PROTOBUF_SIZE: usize = 10 * 1024 * 1024; // 10MB limit
229-
if body.len() > MAX_PROTOBUF_SIZE {
263+
} else if content_type == CONTENT_TYPE_PROTOBUF {
264+
// 10MB limit
265+
if body.len() > MAX_EVENT_PAYLOAD_SIZE {
230266
return Err(PostError::Invalid(anyhow::anyhow!(
231267
"Protobuf message size {} exceeds maximum allowed size of {} bytes",
232268
body.len(),
233-
MAX_PROTOBUF_SIZE
269+
MAX_EVENT_PAYLOAD_SIZE
234270
)));
235271
}
236-
match ExportLogsServiceRequest::decode(body) {
237-
Ok(json) => {
238-
for record in flatten_otel_protobuf(&json) {
239-
push_logs(&stream_name, record, &log_source, &p_custom_fields).await?;
272+
match decode_protobuf(body) {
273+
Ok(decoded) => {
274+
for record in flatten_protobuf(&decoded) {
275+
push_logs(stream_name, record, log_source, &p_custom_fields).await?;
240276
}
241277
}
242278
Err(e) => {
@@ -253,6 +289,29 @@ pub async fn handle_otel_logs_ingestion(
253289
}
254290
}
255291

292+
Ok(())
293+
}
294+
295+
// Handler for POST /v1/logs to ingest OTEL logs
296+
// ingests events by extracting stream name from header
297+
// creates if stream does not exist
298+
pub async fn handle_otel_logs_ingestion(
299+
req: HttpRequest,
300+
body: web::Bytes,
301+
) -> Result<HttpResponse, PostError> {
302+
let (stream_name, log_source, _) =
303+
setup_otel_stream(&req, LogSource::OtelLogs, &OTEL_LOG_KNOWN_FIELD_LIST).await?;
304+
305+
process_otel_content(
306+
&req,
307+
body,
308+
&stream_name,
309+
&log_source,
310+
ExportLogsServiceRequest::decode,
311+
flatten_otel_protobuf,
312+
)
313+
.await?;
314+
256315
Ok(HttpResponse::Ok().finish())
257316
}
258317

@@ -263,82 +322,18 @@ pub async fn handle_otel_metrics_ingestion(
263322
req: HttpRequest,
264323
body: web::Bytes,
265324
) -> Result<HttpResponse, PostError> {
266-
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
267-
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
268-
};
269-
let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else {
270-
return Err(PostError::Header(ParseHeaderError::MissingLogSource));
271-
};
272-
let log_source = LogSource::from(log_source.to_str().unwrap());
273-
if log_source != LogSource::OtelMetrics {
274-
return Err(PostError::IncorrectLogSource(LogSource::OtelMetrics));
275-
}
276-
277-
let stream_name = stream_name.to_str().unwrap().to_owned();
278-
279-
let log_source_entry = LogSourceEntry::new(
280-
log_source.clone(),
281-
OTEL_METRICS_KNOWN_FIELD_LIST
282-
.iter()
283-
.map(|&s| s.to_string())
284-
.collect(),
285-
);
286-
PARSEABLE
287-
.create_stream_if_not_exists(
288-
&stream_name,
289-
StreamType::UserDefined,
290-
None,
291-
vec![log_source_entry.clone()],
292-
)
293-
.await?;
294-
295-
//if stream exists, fetch the stream log source
296-
//return error if the stream log source is not otel metrics
297-
if let Ok(stream) = PARSEABLE.get_stream(&stream_name) {
298-
stream
299-
.get_log_source()
300-
.iter()
301-
.find(|&stream_log_source_entry| {
302-
stream_log_source_entry.log_source_format == log_source.clone()
303-
})
304-
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;
305-
}
306-
307-
PARSEABLE
308-
.add_update_log_source(&stream_name, log_source_entry)
309-
.await?;
310-
311-
let p_custom_fields = get_custom_fields_from_header(&req);
312-
313-
match req.headers().get("Content-Type") {
314-
Some(content_type) => {
315-
if content_type == "application/json" {
316-
flatten_and_push_logs(
317-
serde_json::from_slice(&body)?,
318-
&stream_name,
319-
&log_source,
320-
&p_custom_fields,
321-
)
322-
.await?;
323-
}
324-
325-
if content_type == "application/x-protobuf" {
326-
match ExportMetricsServiceRequest::decode(body) {
327-
Ok(json) => {
328-
for record in flatten_otel_metrics_protobuf(&json) {
329-
push_logs(&stream_name, record, &log_source, &p_custom_fields).await?;
330-
}
331-
}
332-
Err(e) => {
333-
return Err(PostError::Invalid(e.into()));
334-
}
335-
}
336-
}
337-
}
338-
None => {
339-
return Err(PostError::Header(ParseHeaderError::InvalidValue));
340-
}
341-
}
325+
let (stream_name, log_source, _) =
326+
setup_otel_stream(&req, LogSource::OtelMetrics, &OTEL_METRICS_KNOWN_FIELD_LIST).await?;
327+
328+
process_otel_content(
329+
&req,
330+
body,
331+
&stream_name,
332+
&log_source,
333+
ExportMetricsServiceRequest::decode,
334+
flatten_otel_metrics_protobuf,
335+
)
336+
.await?;
342337

343338
Ok(HttpResponse::Ok().finish())
344339
}
@@ -350,83 +345,18 @@ pub async fn handle_otel_traces_ingestion(
350345
req: HttpRequest,
351346
body: web::Bytes,
352347
) -> Result<HttpResponse, PostError> {
353-
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
354-
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
355-
};
356-
357-
let Some(log_source) = req.headers().get(LOG_SOURCE_KEY) else {
358-
return Err(PostError::Header(ParseHeaderError::MissingLogSource));
359-
};
360-
let log_source = LogSource::from(log_source.to_str().unwrap());
361-
if log_source != LogSource::OtelTraces {
362-
return Err(PostError::IncorrectLogSource(LogSource::OtelTraces));
363-
}
364-
let stream_name = stream_name.to_str().unwrap().to_owned();
365-
366-
let log_source_entry = LogSourceEntry::new(
367-
log_source.clone(),
368-
OTEL_TRACES_KNOWN_FIELD_LIST
369-
.iter()
370-
.map(|&s| s.to_string())
371-
.collect(),
372-
);
373-
374-
PARSEABLE
375-
.create_stream_if_not_exists(
376-
&stream_name,
377-
StreamType::UserDefined,
378-
None,
379-
vec![log_source_entry.clone()],
380-
)
381-
.await?;
382-
383-
//if stream exists, fetch the stream log source
384-
//return error if the stream log source is not otel traces
385-
if let Ok(stream) = PARSEABLE.get_stream(&stream_name) {
386-
stream
387-
.get_log_source()
388-
.iter()
389-
.find(|&stream_log_source_entry| {
390-
stream_log_source_entry.log_source_format == log_source.clone()
391-
})
392-
.ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?;
393-
}
394-
395-
PARSEABLE
396-
.add_update_log_source(&stream_name, log_source_entry)
397-
.await?;
398-
399-
let p_custom_fields = get_custom_fields_from_header(&req);
400-
401-
match req.headers().get("Content-Type") {
402-
Some(content_type) => {
403-
if content_type == "application/json" {
404-
flatten_and_push_logs(
405-
serde_json::from_slice(&body)?,
406-
&stream_name,
407-
&log_source,
408-
&p_custom_fields,
409-
)
410-
.await?;
411-
}
412-
413-
if content_type == "application/x-protobuf" {
414-
match ExportTraceServiceRequest::decode(body) {
415-
Ok(json) => {
416-
for record in flatten_otel_traces_protobuf(&json) {
417-
push_logs(&stream_name, record, &log_source, &p_custom_fields).await?;
418-
}
419-
}
420-
Err(e) => {
421-
return Err(PostError::Invalid(e.into()));
422-
}
423-
}
424-
}
425-
}
426-
None => {
427-
return Err(PostError::Header(ParseHeaderError::InvalidValue));
428-
}
429-
}
348+
let (stream_name, log_source, _) =
349+
setup_otel_stream(&req, LogSource::OtelTraces, &OTEL_TRACES_KNOWN_FIELD_LIST).await?;
350+
351+
process_otel_content(
352+
&req,
353+
body,
354+
&stream_name,
355+
&log_source,
356+
ExportTraceServiceRequest::decode,
357+
flatten_otel_traces_protobuf,
358+
)
359+
.await?;
430360

431361
Ok(HttpResponse::Ok().finish())
432362
}

src/handlers/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,3 +39,7 @@ const LOG_SOURCE_KINESIS: &str = "kinesis";
3939

4040
// AWS Kinesis constants
4141
const KINESIS_COMMON_ATTRIBUTES_KEY: &str = "x-amz-firehose-common-attributes";
42+
43+
// constants for content type values
44+
pub const CONTENT_TYPE_JSON: &str = "application/json";
45+
pub const CONTENT_TYPE_PROTOBUF: &str = "application/x-protobuf";

0 commit comments

Comments
 (0)