Skip to content

Commit 23155b6

Browse files
author
Devdutt Shenoi
committed
push flattening into Event
1 parent 6fb5385 commit 23155b6

File tree

5 files changed

+174
-59
lines changed

5 files changed

+174
-59
lines changed

src/connectors/kafka/processor.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ impl ParseableSinkProcessor {
5757
let stream = PARSEABLE.get_stream(stream_name)?;
5858
let schema = stream.get_schema_raw();
5959
let time_partition = stream.get_time_partition();
60+
let time_partition_limit = stream.get_time_partition_limit();
6061
let custom_partition = stream.get_custom_partition();
6162
let static_schema_flag = stream.get_static_schema_flag();
6263
let schema_version = stream.get_schema_version();
@@ -78,7 +79,9 @@ impl ParseableSinkProcessor {
7879
static_schema_flag,
7980
custom_partition.as_ref(),
8081
time_partition.as_ref(),
82+
time_partition_limit,
8183
schema_version,
84+
&LogSource::Custom("Kafka".to_owned()),
8285
StreamType::UserDefined,
8386
)?;
8487

src/event/format/json.rs

Lines changed: 140 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,20 @@ use itertools::Itertools;
2929
use serde_json::Value;
3030
use std::{
3131
collections::{HashMap, HashSet},
32+
num::NonZeroU32,
3233
sync::Arc,
3334
};
3435
use tracing::error;
3536

36-
use super::EventFormat;
37-
use crate::{metadata::SchemaVersion, storage::StreamType, utils::arrow::get_field};
37+
use super::{EventFormat, LogSource};
38+
use crate::{
39+
metadata::SchemaVersion,
40+
storage::StreamType,
41+
utils::{
42+
arrow::get_field,
43+
json::{convert_array_to_object, flatten::convert_to_array},
44+
},
45+
};
3846

3947
pub struct Event {
4048
pub json: Value,
@@ -64,34 +72,48 @@ impl EventFormat for Event {
6472
self,
6573
stored_schema: &HashMap<String, Arc<Field>>,
6674
time_partition: Option<&String>,
75+
time_partition_limit: Option<NonZeroU32>,
76+
custom_partition: Option<&String>,
6777
schema_version: SchemaVersion,
78+
log_source: &LogSource,
6879
) -> Result<(Self::Data, Vec<Arc<Field>>, bool), anyhow::Error> {
69-
// incoming event may be a single json or a json array
70-
// but Data (type defined above) is a vector of json values
71-
// hence we need to convert the incoming event to a vector of json values
72-
let value_arr = match self.json {
73-
Value::Array(arr) => arr,
74-
value @ Value::Object(_) => vec![value],
75-
_ => unreachable!("flatten would have failed beforehand"),
80+
let flattened = if time_partition.is_some() || custom_partition.is_some() {
81+
convert_array_to_object(
82+
self.json,
83+
time_partition,
84+
time_partition_limit,
85+
custom_partition,
86+
schema_version,
87+
log_source,
88+
)?
89+
} else {
90+
vec![convert_to_array(convert_array_to_object(
91+
self.json,
92+
None,
93+
None,
94+
None,
95+
schema_version,
96+
log_source,
97+
)?)?]
7698
};
7799

78100
// collect all the keys from all the json objects in the request body
79101
let fields =
80-
collect_keys(value_arr.iter()).expect("fields can be collected from array of objects");
102+
collect_keys(flattened.iter()).expect("fields can be collected from array of objects");
81103

82104
let mut is_first = false;
83105
let schema = match derive_arrow_schema(stored_schema, fields) {
84106
Some(schema) => schema,
85107
_ => {
86-
let mut infer_schema = infer_json_schema_from_iterator(value_arr.iter().map(Ok))
108+
let mut infer_schema = infer_json_schema_from_iterator(flattened.iter().map(Ok))
87109
.map_err(|err| {
88110
anyhow!("Could not infer schema for this event due to err {:?}", err)
89111
})?;
90112
let new_infer_schema = super::update_field_type_in_schema(
91113
Arc::new(infer_schema),
92114
Some(stored_schema),
93115
time_partition,
94-
Some(&value_arr),
116+
Some(&flattened),
95117
schema_version,
96118
);
97119
infer_schema = Schema::new(new_infer_schema.fields().clone());
@@ -110,7 +132,7 @@ impl EventFormat for Event {
110132
}
111133
};
112134

113-
if value_arr
135+
if flattened
114136
.iter()
115137
.any(|value| fields_mismatch(&schema, value, schema_version))
116138
{
@@ -119,7 +141,7 @@ impl EventFormat for Event {
119141
));
120142
}
121143

122-
Ok((value_arr, schema, is_first))
144+
Ok((flattened, schema, is_first))
123145
}
124146

125147
// Convert the Data type (defined above) to arrow record batch
@@ -147,7 +169,9 @@ impl EventFormat for Event {
147169
static_schema_flag: bool,
148170
custom_partitions: Option<&String>,
149171
time_partition: Option<&String>,
172+
time_partition_limit: Option<NonZeroU32>,
150173
schema_version: SchemaVersion,
174+
log_source: &LogSource,
151175
stream_type: StreamType,
152176
) -> Result<super::Event, anyhow::Error> {
153177
let custom_partition_values = match custom_partitions.as_ref() {
@@ -167,7 +191,10 @@ impl EventFormat for Event {
167191
storage_schema,
168192
static_schema_flag,
169193
time_partition,
194+
time_partition_limit,
195+
custom_partitions,
170196
schema_version,
197+
log_source,
171198
)?;
172199

173200
Ok(super::Event {
@@ -385,7 +412,15 @@ mod tests {
385412
});
386413

387414
let (rb, _) = Event::new(json)
388-
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
415+
.into_recordbatch(
416+
&HashMap::default(),
417+
false,
418+
None,
419+
None,
420+
None,
421+
SchemaVersion::V0,
422+
&LogSource::Json,
423+
)
389424
.unwrap();
390425

391426
assert_eq!(rb.num_rows(), 1);
@@ -413,7 +448,15 @@ mod tests {
413448
});
414449

415450
let (rb, _) = Event::new(json)
416-
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
451+
.into_recordbatch(
452+
&HashMap::default(),
453+
false,
454+
None,
455+
None,
456+
None,
457+
SchemaVersion::V0,
458+
&LogSource::Json,
459+
)
417460
.unwrap();
418461

419462
assert_eq!(rb.num_rows(), 1);
@@ -445,7 +488,15 @@ mod tests {
445488
);
446489

447490
let (rb, _) = Event::new(json)
448-
.into_recordbatch(&schema, false, None, SchemaVersion::V0)
491+
.into_recordbatch(
492+
&schema,
493+
false,
494+
None,
495+
None,
496+
None,
497+
SchemaVersion::V0,
498+
&LogSource::Json,
499+
)
449500
.unwrap();
450501

451502
assert_eq!(rb.num_rows(), 1);
@@ -477,7 +528,15 @@ mod tests {
477528
);
478529

479530
assert!(Event::new(json)
480-
.into_recordbatch(&schema, false, None, SchemaVersion::V0,)
531+
.into_recordbatch(
532+
&schema,
533+
false,
534+
None,
535+
None,
536+
None,
537+
SchemaVersion::V0,
538+
&LogSource::Json
539+
)
481540
.is_err());
482541
}
483542

@@ -495,7 +554,15 @@ mod tests {
495554
);
496555

497556
let (rb, _) = Event::new(json)
498-
.into_recordbatch(&schema, false, None, SchemaVersion::V0)
557+
.into_recordbatch(
558+
&schema,
559+
false,
560+
None,
561+
None,
562+
None,
563+
SchemaVersion::V0,
564+
&LogSource::Json,
565+
)
499566
.unwrap();
500567

501568
assert_eq!(rb.num_rows(), 1);
@@ -521,7 +588,15 @@ mod tests {
521588
]);
522589

523590
let (rb, _) = Event::new(json)
524-
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
591+
.into_recordbatch(
592+
&HashMap::default(),
593+
false,
594+
None,
595+
None,
596+
None,
597+
SchemaVersion::V0,
598+
&LogSource::Json,
599+
)
525600
.unwrap();
526601

527602
assert_eq!(rb.num_rows(), 3);
@@ -569,7 +644,15 @@ mod tests {
569644
]);
570645

571646
let (rb, _) = Event::new(json)
572-
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
647+
.into_recordbatch(
648+
&HashMap::default(),
649+
false,
650+
None,
651+
None,
652+
None,
653+
SchemaVersion::V0,
654+
&LogSource::Json,
655+
)
573656
.unwrap();
574657

575658
assert_eq!(rb.num_rows(), 3);
@@ -618,7 +701,15 @@ mod tests {
618701
);
619702

620703
let (rb, _) = Event::new(json)
621-
.into_recordbatch(&schema, false, None, SchemaVersion::V0)
704+
.into_recordbatch(
705+
&schema,
706+
false,
707+
None,
708+
None,
709+
None,
710+
SchemaVersion::V0,
711+
&LogSource::Json,
712+
)
622713
.unwrap();
623714

624715
assert_eq!(rb.num_rows(), 3);
@@ -667,7 +758,15 @@ mod tests {
667758
);
668759

669760
assert!(Event::new(json)
670-
.into_recordbatch(&schema, false, None, SchemaVersion::V0,)
761+
.into_recordbatch(
762+
&schema,
763+
false,
764+
None,
765+
None,
766+
None,
767+
SchemaVersion::V0,
768+
&LogSource::Json
769+
)
671770
.is_err());
672771
}
673772

@@ -696,7 +795,15 @@ mod tests {
696795
]);
697796

698797
let (rb, _) = Event::new(json)
699-
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0)
798+
.into_recordbatch(
799+
&HashMap::default(),
800+
false,
801+
None,
802+
None,
803+
None,
804+
SchemaVersion::V0,
805+
&LogSource::Json,
806+
)
700807
.unwrap();
701808
assert_eq!(rb.num_rows(), 4);
702809
assert_eq!(rb.num_columns(), 5);
@@ -768,7 +875,15 @@ mod tests {
768875
]);
769876

770877
let (rb, _) = Event::new(json)
771-
.into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V1)
878+
.into_recordbatch(
879+
&HashMap::default(),
880+
false,
881+
None,
882+
None,
883+
None,
884+
SchemaVersion::V1,
885+
&LogSource::Json,
886+
)
772887
.unwrap();
773888

774889
assert_eq!(rb.num_rows(), 4);

src/event/format/mod.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
use std::{
2121
collections::{HashMap, HashSet},
2222
fmt::Display,
23+
num::NonZeroU32,
2324
sync::Arc,
2425
};
2526

@@ -101,7 +102,10 @@ pub trait EventFormat: Sized {
101102
self,
102103
schema: &HashMap<String, Arc<Field>>,
103104
time_partition: Option<&String>,
105+
time_partition_limit: Option<NonZeroU32>,
106+
custom_partition: Option<&String>,
104107
schema_version: SchemaVersion,
108+
log_source: &LogSource,
105109
) -> Result<(Self::Data, EventSchema, bool), AnyError>;
106110

107111
fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, AnyError>;
@@ -114,11 +118,20 @@ pub trait EventFormat: Sized {
114118
storage_schema: &HashMap<String, Arc<Field>>,
115119
static_schema_flag: bool,
116120
time_partition: Option<&String>,
121+
time_partition_limit: Option<NonZeroU32>,
122+
custom_partition: Option<&String>,
117123
schema_version: SchemaVersion,
124+
log_source: &LogSource,
118125
) -> Result<(RecordBatch, bool), AnyError> {
119126
let p_timestamp = self.get_p_timestamp();
120-
let (data, mut schema, is_first) =
121-
self.to_data(storage_schema, time_partition, schema_version)?;
127+
let (data, mut schema, is_first) = self.to_data(
128+
storage_schema,
129+
time_partition,
130+
time_partition_limit,
131+
custom_partition,
132+
schema_version,
133+
log_source,
134+
)?;
122135

123136
if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() {
124137
return Err(anyhow!(
@@ -171,7 +184,9 @@ pub trait EventFormat: Sized {
171184
static_schema_flag: bool,
172185
custom_partitions: Option<&String>,
173186
time_partition: Option<&String>,
187+
time_partition_limit: Option<NonZeroU32>,
174188
schema_version: SchemaVersion,
189+
log_source: &LogSource,
175190
stream_type: StreamType,
176191
) -> Result<Event, AnyError>;
177192
}

src/handlers/http/ingest.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,9 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
9191
false,
9292
None,
9393
None,
94+
None,
9495
SchemaVersion::V0,
96+
&LogSource::Pmeta,
9597
StreamType::Internal,
9698
)?
9799
.process()?;

0 commit comments

Comments
 (0)