Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 3 additions & 16 deletions src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,8 @@ use serde_json::Value;
use std::{collections::HashMap, sync::Arc};
use tracing::error;

use super::{EventFormat, LogSource};
use crate::{
metadata::SchemaVersion,
utils::{arrow::get_field, json::flatten_json_body},
};
use super::EventFormat;
use crate::{metadata::SchemaVersion, utils::arrow::get_field};

pub struct Event {
pub data: Value,
Expand All @@ -50,23 +47,13 @@ impl EventFormat for Event {
static_schema_flag: Option<&String>,
time_partition: Option<&String>,
schema_version: SchemaVersion,
log_source: &LogSource,
) -> Result<(Self::Data, Vec<Arc<Field>>, bool), anyhow::Error> {
let data = flatten_json_body(
self.data,
None,
None,
None,
schema_version,
false,
log_source,
)?;
let stream_schema = schema;

// incoming event may be a single json or a json array
// but Data (type defined above) is a vector of json values
// hence we need to convert the incoming event to a vector of json values
let value_arr = match data {
let value_arr = match self.data {
Value::Array(arr) => arr,
value @ Value::Object(_) => vec![value],
_ => unreachable!("flatten would have failed beforehand"),
Expand Down
3 changes: 0 additions & 3 deletions src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ pub trait EventFormat: Sized {
static_schema_flag: Option<&String>,
time_partition: Option<&String>,
schema_version: SchemaVersion,
log_source: &LogSource,
) -> Result<(Self::Data, EventSchema, bool), AnyError>;

fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, AnyError>;
Expand All @@ -91,14 +90,12 @@ pub trait EventFormat: Sized {
static_schema_flag: Option<&String>,
time_partition: Option<&String>,
schema_version: SchemaVersion,
log_source: &LogSource,
) -> Result<(RecordBatch, bool), AnyError> {
let (data, mut schema, is_first) = self.to_data(
storage_schema,
static_schema_flag,
time_partition,
schema_version,
log_source,
)?;

if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() {
Expand Down
149 changes: 50 additions & 99 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::otel::metrics::flatten_otel_metrics;
use crate::otel::traces::flatten_otel_traces;
use crate::storage::{ObjectStorageError, StreamType};
use crate::utils::header_parsing::ParseHeaderError;
use crate::utils::json::flatten::JsonFlattenError;
use actix_web::{http::header::ContentType, HttpRequest, HttpResponse};
use arrow_array::RecordBatch;
use arrow_schema::Schema;
Expand Down Expand Up @@ -89,13 +90,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
.clone();
let event = format::json::Event { data: body_val };
// For internal streams, use old schema
event.into_recordbatch(
&schema,
None,
None,
SchemaVersion::V0,
&LogSource::default(),
)?
event.into_recordbatch(&schema, None, None, SchemaVersion::V0)?
};
event::Event {
rb,
Expand Down Expand Up @@ -328,6 +323,8 @@ pub enum PostError {
DashboardError(#[from] DashboardError),
#[error("Error: {0}")]
StreamError(#[from] StreamError),
#[error("Error: {0}")]
JsonFlattenError(#[from] JsonFlattenError),
}

impl actix_web::ResponseError for PostError {
Expand All @@ -349,6 +346,7 @@ impl actix_web::ResponseError for PostError {
PostError::DashboardError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::FiltersError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::StreamError(_) => StatusCode::INTERNAL_SERVER_ERROR,
PostError::JsonFlattenError(_) => StatusCode::INTERNAL_SERVER_ERROR,
}
}

Expand All @@ -369,8 +367,9 @@ mod tests {
use std::{collections::HashMap, sync::Arc};

use crate::{
event::format::LogSource, handlers::http::modal::utils::ingest_utils::into_event_batch,
handlers::http::modal::utils::ingest_utils::into_event_batch,
metadata::SchemaVersion,
utils::json::{convert_array_to_object, flatten::convert_to_array},
};

trait TestExt {
Expand Down Expand Up @@ -405,15 +404,8 @@ mod tests {
"b": "hello",
});

let (rb, _) = into_event_batch(
&json,
HashMap::default(),
None,
None,
SchemaVersion::V0,
&LogSource::default(),
)
.unwrap();
let (rb, _) =
into_event_batch(&json, HashMap::default(), None, None, SchemaVersion::V0).unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 4);
Expand All @@ -439,15 +431,8 @@ mod tests {
"c": null
});

let (rb, _) = into_event_batch(
&json,
HashMap::default(),
None,
None,
SchemaVersion::V0,
&LogSource::default(),
)
.unwrap();
let (rb, _) =
into_event_batch(&json, HashMap::default(), None, None, SchemaVersion::V0).unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 3);
Expand Down Expand Up @@ -477,15 +462,7 @@ mod tests {
.into_iter(),
);

let (rb, _) = into_event_batch(
&json,
schema,
None,
None,
SchemaVersion::V0,
&LogSource::default(),
)
.unwrap();
let (rb, _) = into_event_batch(&json, schema, None, None, SchemaVersion::V0).unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 3);
Expand Down Expand Up @@ -515,15 +492,7 @@ mod tests {
.into_iter(),
);

assert!(into_event_batch(
&json,
schema,
None,
None,
SchemaVersion::V0,
&LogSource::default()
)
.is_err());
assert!(into_event_batch(&json, schema, None, None, SchemaVersion::V0,).is_err());
}

#[test]
Expand All @@ -539,15 +508,7 @@ mod tests {
.into_iter(),
);

let (rb, _) = into_event_batch(
&json,
schema,
None,
None,
SchemaVersion::V0,
&LogSource::default(),
)
.unwrap();
let (rb, _) = into_event_batch(&json, schema, None, None, SchemaVersion::V0).unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 1);
Expand All @@ -556,14 +517,13 @@ mod tests {
#[test]
fn non_object_arr_is_err() {
let json = json!([1]);

assert!(into_event_batch(
&json,
HashMap::default(),
assert!(convert_array_to_object(
json,
None,
None,
None,
SchemaVersion::V0,
&LogSource::default()
&crate::event::format::LogSource::default()
)
.is_err())
}
Expand All @@ -586,15 +546,8 @@ mod tests {
},
]);

let (rb, _) = into_event_batch(
&json,
HashMap::default(),
None,
None,
SchemaVersion::V0,
&LogSource::default(),
)
.unwrap();
let (rb, _) =
into_event_batch(&json, HashMap::default(), None, None, SchemaVersion::V0).unwrap();

assert_eq!(rb.num_rows(), 3);
assert_eq!(rb.num_columns(), 4);
Expand Down Expand Up @@ -640,15 +593,8 @@ mod tests {
},
]);

let (rb, _) = into_event_batch(
&json,
HashMap::default(),
None,
None,
SchemaVersion::V0,
&LogSource::default(),
)
.unwrap();
let (rb, _) =
into_event_batch(&json, HashMap::default(), None, None, SchemaVersion::V0).unwrap();

assert_eq!(rb.num_rows(), 3);
assert_eq!(rb.num_columns(), 4);
Expand Down Expand Up @@ -695,15 +641,7 @@ mod tests {
.into_iter(),
);

let (rb, _) = into_event_batch(
&json,
schema,
None,
None,
SchemaVersion::V0,
&LogSource::default(),
)
.unwrap();
let (rb, _) = into_event_batch(&json, schema, None, None, SchemaVersion::V0).unwrap();

assert_eq!(rb.num_rows(), 3);
assert_eq!(rb.num_columns(), 4);
Expand Down Expand Up @@ -750,15 +688,7 @@ mod tests {
.into_iter(),
);

assert!(into_event_batch(
&json,
schema,
None,
None,
SchemaVersion::V0,
&LogSource::default()
)
.is_err());
assert!(into_event_batch(&json, schema, None, None, SchemaVersion::V0,).is_err());
}

#[test]
Expand All @@ -783,17 +713,27 @@ mod tests {
"c": [{"a": 1, "b": 2}]
},
]);
let flattened_json = convert_to_array(
convert_array_to_object(
json,
None,
None,
None,
SchemaVersion::V0,
&crate::event::format::LogSource::default(),
)
.unwrap(),
)
.unwrap();

let (rb, _) = into_event_batch(
&json,
&flattened_json,
HashMap::default(),
None,
None,
SchemaVersion::V0,
&LogSource::default(),
)
.unwrap();

assert_eq!(rb.num_rows(), 4);
assert_eq!(rb.num_columns(), 5);
assert_eq!(
Expand Down Expand Up @@ -861,14 +801,25 @@ mod tests {
"c": [{"a": 1, "b": 2}]
},
]);
let flattened_json = convert_to_array(
convert_array_to_object(
json,
None,
None,
None,
SchemaVersion::V1,
&crate::event::format::LogSource::default(),
)
.unwrap(),
)
.unwrap();

let (rb, _) = into_event_batch(
&json,
&flattened_json,
HashMap::default(),
None,
None,
SchemaVersion::V1,
&LogSource::default(),
)
.unwrap();

Expand Down
Loading
Loading