Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 32 additions & 12 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use arrow_array::RecordBatch;
use bytes::Bytes;
use chrono::Utc;
use http::StatusCode;
use serde_json::Value;

use crate::event::error::EventError;
use crate::event::format::known_schema::{self, KNOWN_SCHEMA_LIST};
Expand All @@ -39,7 +38,7 @@ use crate::otel::traces::OTEL_TRACES_KNOWN_FIELD_LIST;
use crate::parseable::{StreamNotFound, PARSEABLE};
use crate::storage::{ObjectStorageError, StreamType};
use crate::utils::header_parsing::ParseHeaderError;
use crate::utils::json::flatten::JsonFlattenError;
use crate::utils::json::{flatten::JsonFlattenError, strict::StrictValue};

use super::logstream::error::{CreateStreamError, StreamError};
use super::modal::utils::ingest_utils::{flatten_and_push_logs, get_custom_fields_from_header};
Expand All @@ -51,7 +50,7 @@ use super::users::filters::FiltersError;
// creates if stream does not exist
pub async fn ingest(
req: HttpRequest,
Json(mut json): Json<Value>,
Json(json): Json<StrictValue>,
) -> Result<HttpResponse, PostError> {
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
Expand Down Expand Up @@ -83,6 +82,8 @@ pub async fn ingest(

let mut p_custom_fields = get_custom_fields_from_header(&req);

let mut json = json.into_inner();

let fields = match &log_source {
LogSource::Custom(src) => KNOWN_SCHEMA_LIST.extract_from_inline_log(
&mut json,
Expand Down Expand Up @@ -127,13 +128,13 @@ pub async fn ingest(

pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<(), PostError> {
let size: usize = body.len();
let json: Value = serde_json::from_slice(&body)?;
let json: StrictValue = serde_json::from_slice(&body)?;
let schema = PARSEABLE.get_stream(&stream_name)?.get_schema_raw();
let mut p_custom_fields = HashMap::new();
p_custom_fields.insert(USER_AGENT_KEY.to_string(), "parseable".to_string());
p_custom_fields.insert(FORMAT_KEY.to_string(), LogSource::Pmeta.to_string());
// For internal streams, use old schema
format::json::Event::new(json)
format::json::Event::new(json.into_inner())
.into_event(
stream_name,
size as u64,
Expand All @@ -155,7 +156,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
// creates if stream does not exist
pub async fn handle_otel_logs_ingestion(
req: HttpRequest,
Json(json): Json<Value>,
Json(json): Json<StrictValue>,
) -> Result<HttpResponse, PostError> {
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
Expand Down Expand Up @@ -205,7 +206,13 @@ pub async fn handle_otel_logs_ingestion(

let p_custom_fields = get_custom_fields_from_header(&req);

flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
flatten_and_push_logs(
json.into_inner(),
&stream_name,
&log_source,
&p_custom_fields,
)
.await?;

Ok(HttpResponse::Ok().finish())
}
Expand All @@ -215,7 +222,7 @@ pub async fn handle_otel_logs_ingestion(
// creates if stream does not exist
pub async fn handle_otel_metrics_ingestion(
req: HttpRequest,
Json(json): Json<Value>,
Json(json): Json<StrictValue>,
) -> Result<HttpResponse, PostError> {
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
Expand Down Expand Up @@ -263,7 +270,13 @@ pub async fn handle_otel_metrics_ingestion(

let p_custom_fields = get_custom_fields_from_header(&req);

flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
flatten_and_push_logs(
json.into_inner(),
&stream_name,
&log_source,
&p_custom_fields,
)
.await?;

Ok(HttpResponse::Ok().finish())
}
Expand All @@ -273,7 +286,7 @@ pub async fn handle_otel_metrics_ingestion(
// creates if stream does not exist
pub async fn handle_otel_traces_ingestion(
req: HttpRequest,
Json(json): Json<Value>,
Json(json): Json<StrictValue>,
) -> Result<HttpResponse, PostError> {
let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else {
return Err(PostError::Header(ParseHeaderError::MissingStreamName));
Expand Down Expand Up @@ -322,7 +335,13 @@ pub async fn handle_otel_traces_ingestion(

let p_custom_fields = get_custom_fields_from_header(&req);

flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?;
flatten_and_push_logs(
json.into_inner(),
&stream_name,
&log_source,
&p_custom_fields,
)
.await?;

Ok(HttpResponse::Ok().finish())
}
Expand All @@ -333,7 +352,7 @@ pub async fn handle_otel_traces_ingestion(
pub async fn post_event(
req: HttpRequest,
stream_name: Path<String>,
Json(mut json): Json<Value>,
Json(json): Json<StrictValue>,
) -> Result<HttpResponse, PostError> {
let stream_name = stream_name.into_inner();

Expand Down Expand Up @@ -369,6 +388,7 @@ pub async fn post_event(
.get(EXTRACT_LOG_KEY)
.and_then(|h| h.to_str().ok());
let mut p_custom_fields = get_custom_fields_from_header(&req);
let mut json = json.into_inner();
match &log_source {
LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces => {
return Err(PostError::OtelNotSupported)
Expand Down
1 change: 1 addition & 0 deletions src/utils/json/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::event::format::LogSource;
use crate::metadata::SchemaVersion;

pub mod flatten;
pub mod strict;

/// calls the function `flatten_json` which results Vec<Value> or Error
/// in case when Vec<Value> is returned, converts the Vec<Value> to Value of Array
Expand Down
Loading
Loading