Skip to content

Commit e4e0ae5

Browse files
fix: refactor ingestion (#1073)
update ingestion flow to flatten only once improve readability Fixes #1064
1 parent 1abc8a9 commit e4e0ae5

File tree

5 files changed

+107
-284
lines changed

5 files changed

+107
-284
lines changed

src/event/format/json.rs

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,8 @@ use serde_json::Value;
2929
use std::{collections::HashMap, sync::Arc};
3030
use tracing::error;
3131

32-
use super::{EventFormat, LogSource};
33-
use crate::{
34-
metadata::SchemaVersion,
35-
utils::{arrow::get_field, json::flatten_json_body},
36-
};
32+
use super::EventFormat;
33+
use crate::{metadata::SchemaVersion, utils::arrow::get_field};
3734

3835
pub struct Event {
3936
pub data: Value,
@@ -50,23 +47,13 @@ impl EventFormat for Event {
5047
static_schema_flag: Option<&String>,
5148
time_partition: Option<&String>,
5249
schema_version: SchemaVersion,
53-
log_source: &LogSource,
5450
) -> Result<(Self::Data, Vec<Arc<Field>>, bool), anyhow::Error> {
55-
let data = flatten_json_body(
56-
self.data,
57-
None,
58-
None,
59-
None,
60-
schema_version,
61-
false,
62-
log_source,
63-
)?;
6451
let stream_schema = schema;
6552

6653
// incoming event may be a single json or a json array
6754
// but Data (type defined above) is a vector of json values
6855
// hence we need to convert the incoming event to a vector of json values
69-
let value_arr = match data {
56+
let value_arr = match self.data {
7057
Value::Array(arr) => arr,
7158
value @ Value::Object(_) => vec![value],
7259
_ => unreachable!("flatten would have failed beforehand"),

src/event/format/mod.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@ pub trait EventFormat: Sized {
8080
static_schema_flag: Option<&String>,
8181
time_partition: Option<&String>,
8282
schema_version: SchemaVersion,
83-
log_source: &LogSource,
8483
) -> Result<(Self::Data, EventSchema, bool), AnyError>;
8584

8685
fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, AnyError>;
@@ -91,14 +90,12 @@ pub trait EventFormat: Sized {
9190
static_schema_flag: Option<&String>,
9291
time_partition: Option<&String>,
9392
schema_version: SchemaVersion,
94-
log_source: &LogSource,
9593
) -> Result<(RecordBatch, bool), AnyError> {
9694
let (data, mut schema, is_first) = self.to_data(
9795
storage_schema,
9896
static_schema_flag,
9997
time_partition,
10098
schema_version,
101-
log_source,
10299
)?;
103100

104101
if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() {

src/handlers/http/ingest.rs

Lines changed: 50 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use crate::otel::metrics::flatten_otel_metrics;
3636
use crate::otel::traces::flatten_otel_traces;
3737
use crate::storage::{ObjectStorageError, StreamType};
3838
use crate::utils::header_parsing::ParseHeaderError;
39+
use crate::utils::json::flatten::JsonFlattenError;
3940
use actix_web::{http::header::ContentType, HttpRequest, HttpResponse};
4041
use arrow_array::RecordBatch;
4142
use arrow_schema::Schema;
@@ -89,13 +90,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
8990
.clone();
9091
let event = format::json::Event { data: body_val };
9192
// For internal streams, use old schema
92-
event.into_recordbatch(
93-
&schema,
94-
None,
95-
None,
96-
SchemaVersion::V0,
97-
&LogSource::default(),
98-
)?
93+
event.into_recordbatch(&schema, None, None, SchemaVersion::V0)?
9994
};
10095
event::Event {
10196
rb,
@@ -328,6 +323,8 @@ pub enum PostError {
328323
DashboardError(#[from] DashboardError),
329324
#[error("Error: {0}")]
330325
StreamError(#[from] StreamError),
326+
#[error("Error: {0}")]
327+
JsonFlattenError(#[from] JsonFlattenError),
331328
}
332329

333330
impl actix_web::ResponseError for PostError {
@@ -349,6 +346,7 @@ impl actix_web::ResponseError for PostError {
349346
PostError::DashboardError(_) => StatusCode::INTERNAL_SERVER_ERROR,
350347
PostError::FiltersError(_) => StatusCode::INTERNAL_SERVER_ERROR,
351348
PostError::StreamError(_) => StatusCode::INTERNAL_SERVER_ERROR,
349+
PostError::JsonFlattenError(_) => StatusCode::INTERNAL_SERVER_ERROR,
352350
}
353351
}
354352

@@ -369,8 +367,9 @@ mod tests {
369367
use std::{collections::HashMap, sync::Arc};
370368

371369
use crate::{
372-
event::format::LogSource, handlers::http::modal::utils::ingest_utils::into_event_batch,
370+
handlers::http::modal::utils::ingest_utils::into_event_batch,
373371
metadata::SchemaVersion,
372+
utils::json::{convert_array_to_object, flatten::convert_to_array},
374373
};
375374

376375
trait TestExt {
@@ -405,15 +404,8 @@ mod tests {
405404
"b": "hello",
406405
});
407406

408-
let (rb, _) = into_event_batch(
409-
&json,
410-
HashMap::default(),
411-
None,
412-
None,
413-
SchemaVersion::V0,
414-
&LogSource::default(),
415-
)
416-
.unwrap();
407+
let (rb, _) =
408+
into_event_batch(&json, HashMap::default(), None, None, SchemaVersion::V0).unwrap();
417409

418410
assert_eq!(rb.num_rows(), 1);
419411
assert_eq!(rb.num_columns(), 4);
@@ -439,15 +431,8 @@ mod tests {
439431
"c": null
440432
});
441433

442-
let (rb, _) = into_event_batch(
443-
&json,
444-
HashMap::default(),
445-
None,
446-
None,
447-
SchemaVersion::V0,
448-
&LogSource::default(),
449-
)
450-
.unwrap();
434+
let (rb, _) =
435+
into_event_batch(&json, HashMap::default(), None, None, SchemaVersion::V0).unwrap();
451436

452437
assert_eq!(rb.num_rows(), 1);
453438
assert_eq!(rb.num_columns(), 3);
@@ -477,15 +462,7 @@ mod tests {
477462
.into_iter(),
478463
);
479464

480-
let (rb, _) = into_event_batch(
481-
&json,
482-
schema,
483-
None,
484-
None,
485-
SchemaVersion::V0,
486-
&LogSource::default(),
487-
)
488-
.unwrap();
465+
let (rb, _) = into_event_batch(&json, schema, None, None, SchemaVersion::V0).unwrap();
489466

490467
assert_eq!(rb.num_rows(), 1);
491468
assert_eq!(rb.num_columns(), 3);
@@ -515,15 +492,7 @@ mod tests {
515492
.into_iter(),
516493
);
517494

518-
assert!(into_event_batch(
519-
&json,
520-
schema,
521-
None,
522-
None,
523-
SchemaVersion::V0,
524-
&LogSource::default()
525-
)
526-
.is_err());
495+
assert!(into_event_batch(&json, schema, None, None, SchemaVersion::V0,).is_err());
527496
}
528497

529498
#[test]
@@ -539,15 +508,7 @@ mod tests {
539508
.into_iter(),
540509
);
541510

542-
let (rb, _) = into_event_batch(
543-
&json,
544-
schema,
545-
None,
546-
None,
547-
SchemaVersion::V0,
548-
&LogSource::default(),
549-
)
550-
.unwrap();
511+
let (rb, _) = into_event_batch(&json, schema, None, None, SchemaVersion::V0).unwrap();
551512

552513
assert_eq!(rb.num_rows(), 1);
553514
assert_eq!(rb.num_columns(), 1);
@@ -556,14 +517,13 @@ mod tests {
556517
#[test]
557518
fn non_object_arr_is_err() {
558519
let json = json!([1]);
559-
560-
assert!(into_event_batch(
561-
&json,
562-
HashMap::default(),
520+
assert!(convert_array_to_object(
521+
json,
522+
None,
563523
None,
564524
None,
565525
SchemaVersion::V0,
566-
&LogSource::default()
526+
&crate::event::format::LogSource::default()
567527
)
568528
.is_err())
569529
}
@@ -586,15 +546,8 @@ mod tests {
586546
},
587547
]);
588548

589-
let (rb, _) = into_event_batch(
590-
&json,
591-
HashMap::default(),
592-
None,
593-
None,
594-
SchemaVersion::V0,
595-
&LogSource::default(),
596-
)
597-
.unwrap();
549+
let (rb, _) =
550+
into_event_batch(&json, HashMap::default(), None, None, SchemaVersion::V0).unwrap();
598551

599552
assert_eq!(rb.num_rows(), 3);
600553
assert_eq!(rb.num_columns(), 4);
@@ -640,15 +593,8 @@ mod tests {
640593
},
641594
]);
642595

643-
let (rb, _) = into_event_batch(
644-
&json,
645-
HashMap::default(),
646-
None,
647-
None,
648-
SchemaVersion::V0,
649-
&LogSource::default(),
650-
)
651-
.unwrap();
596+
let (rb, _) =
597+
into_event_batch(&json, HashMap::default(), None, None, SchemaVersion::V0).unwrap();
652598

653599
assert_eq!(rb.num_rows(), 3);
654600
assert_eq!(rb.num_columns(), 4);
@@ -695,15 +641,7 @@ mod tests {
695641
.into_iter(),
696642
);
697643

698-
let (rb, _) = into_event_batch(
699-
&json,
700-
schema,
701-
None,
702-
None,
703-
SchemaVersion::V0,
704-
&LogSource::default(),
705-
)
706-
.unwrap();
644+
let (rb, _) = into_event_batch(&json, schema, None, None, SchemaVersion::V0).unwrap();
707645

708646
assert_eq!(rb.num_rows(), 3);
709647
assert_eq!(rb.num_columns(), 4);
@@ -750,15 +688,7 @@ mod tests {
750688
.into_iter(),
751689
);
752690

753-
assert!(into_event_batch(
754-
&json,
755-
schema,
756-
None,
757-
None,
758-
SchemaVersion::V0,
759-
&LogSource::default()
760-
)
761-
.is_err());
691+
assert!(into_event_batch(&json, schema, None, None, SchemaVersion::V0,).is_err());
762692
}
763693

764694
#[test]
@@ -783,17 +713,27 @@ mod tests {
783713
"c": [{"a": 1, "b": 2}]
784714
},
785715
]);
716+
let flattened_json = convert_to_array(
717+
convert_array_to_object(
718+
json,
719+
None,
720+
None,
721+
None,
722+
SchemaVersion::V0,
723+
&crate::event::format::LogSource::default(),
724+
)
725+
.unwrap(),
726+
)
727+
.unwrap();
786728

787729
let (rb, _) = into_event_batch(
788-
&json,
730+
&flattened_json,
789731
HashMap::default(),
790732
None,
791733
None,
792734
SchemaVersion::V0,
793-
&LogSource::default(),
794735
)
795736
.unwrap();
796-
797737
assert_eq!(rb.num_rows(), 4);
798738
assert_eq!(rb.num_columns(), 5);
799739
assert_eq!(
@@ -861,14 +801,25 @@ mod tests {
861801
"c": [{"a": 1, "b": 2}]
862802
},
863803
]);
804+
let flattened_json = convert_to_array(
805+
convert_array_to_object(
806+
json,
807+
None,
808+
None,
809+
None,
810+
SchemaVersion::V1,
811+
&crate::event::format::LogSource::default(),
812+
)
813+
.unwrap(),
814+
)
815+
.unwrap();
864816

865817
let (rb, _) = into_event_batch(
866-
&json,
818+
&flattened_json,
867819
HashMap::default(),
868820
None,
869821
None,
870822
SchemaVersion::V1,
871-
&LogSource::default(),
872823
)
873824
.unwrap();
874825

0 commit comments

Comments
 (0)