Skip to content

Commit a86fc47

Browse files
author
Devdutt Shenoi
committed
refactor: prepare_and_validate_schema
1 parent 1c98e3b commit a86fc47

File tree

2 files changed

+53
-107
lines changed

2 files changed

+53
-107
lines changed

src/event/format/json.rs

Lines changed: 34 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ impl EventFormat for Event {
138138
// also extract the arrow schema, tags and metadata from the incoming json
139139
fn to_data(
140140
self,
141+
static_schema_flag: bool,
141142
stored_schema: &HashMap<String, Arc<Field>>,
142143
time_partition: Option<&String>,
143144
time_partition_limit: Option<NonZeroU32>,
@@ -203,6 +204,8 @@ impl EventFormat for Event {
203204
));
204205
}
205206

207+
let schema = Self::prepare_and_validate_schema(schema, &stored_schema, static_schema_flag)?;
208+
206209
Ok((flattened, schema, is_first))
207210
}
208211

@@ -239,6 +242,7 @@ impl EventFormat for Event {
239242

240243
let p_timestamp = self.p_timestamp;
241244
let (data, schema, is_first_event) = self.to_data(
245+
static_schema_flag,
242246
&storage_schema,
243247
time_partition.as_ref(),
244248
time_partition_limit,
@@ -265,9 +269,7 @@ impl EventFormat for Event {
265269
let batch = Self::into_recordbatch(
266270
p_timestamp,
267271
vec![json],
268-
schema.clone(),
269-
&storage_schema,
270-
static_schema_flag,
272+
&schema,
271273
time_partition.as_ref(),
272274
schema_version,
273275
)?;
@@ -507,6 +509,7 @@ mod tests {
507509
let store_schema = HashMap::default();
508510
let (data, schema, _) = Event::new(json)
509511
.to_data(
512+
false,
510513
&store_schema,
511514
None,
512515
None,
@@ -515,16 +518,8 @@ mod tests {
515518
&LogSource::Json,
516519
)
517520
.unwrap();
518-
let rb = Event::into_recordbatch(
519-
Utc::now(),
520-
data,
521-
schema,
522-
&store_schema,
523-
false,
524-
None,
525-
SchemaVersion::V0,
526-
)
527-
.unwrap();
521+
let rb =
522+
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V0).unwrap();
528523

529524
assert_eq!(rb.num_rows(), 1);
530525
assert_eq!(rb.num_columns(), 4);
@@ -553,6 +548,7 @@ mod tests {
553548
let store_schema = HashMap::default();
554549
let (data, schema, _) = Event::new(json)
555550
.to_data(
551+
false,
556552
&store_schema,
557553
None,
558554
None,
@@ -561,16 +557,8 @@ mod tests {
561557
&LogSource::Json,
562558
)
563559
.unwrap();
564-
let rb = Event::into_recordbatch(
565-
Utc::now(),
566-
data,
567-
schema,
568-
&store_schema,
569-
false,
570-
None,
571-
SchemaVersion::V0,
572-
)
573-
.unwrap();
560+
let rb =
561+
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V0).unwrap();
574562

575563
assert_eq!(rb.num_rows(), 1);
576564
assert_eq!(rb.num_columns(), 3);
@@ -601,6 +589,7 @@ mod tests {
601589
);
602590
let (data, schema, _) = Event::new(json)
603591
.to_data(
592+
false,
604593
&store_schema,
605594
None,
606595
None,
@@ -609,16 +598,8 @@ mod tests {
609598
&LogSource::Json,
610599
)
611600
.unwrap();
612-
let rb = Event::into_recordbatch(
613-
Utc::now(),
614-
data,
615-
schema,
616-
&store_schema,
617-
false,
618-
None,
619-
SchemaVersion::V0,
620-
)
621-
.unwrap();
601+
let rb =
602+
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V0).unwrap();
622603

623604
assert_eq!(rb.num_rows(), 1);
624605
assert_eq!(rb.num_columns(), 3);
@@ -650,6 +631,7 @@ mod tests {
650631

651632
assert!(Event::new(json)
652633
.to_data(
634+
false,
653635
&store_schema,
654636
None,
655637
None,
@@ -675,6 +657,7 @@ mod tests {
675657

676658
let (data, schema, _) = Event::new(json)
677659
.to_data(
660+
false,
678661
&store_schema,
679662
None,
680663
None,
@@ -683,16 +666,8 @@ mod tests {
683666
&LogSource::Json,
684667
)
685668
.unwrap();
686-
let rb = Event::into_recordbatch(
687-
Utc::now(),
688-
data,
689-
schema,
690-
&store_schema,
691-
false,
692-
None,
693-
SchemaVersion::V0,
694-
)
695-
.unwrap();
669+
let rb =
670+
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V0).unwrap();
696671

697672
assert_eq!(rb.num_rows(), 1);
698673
assert_eq!(rb.num_columns(), 1);
@@ -719,6 +694,7 @@ mod tests {
719694
let store_schema = HashMap::new();
720695
let (data, schema, _) = Event::new(json)
721696
.to_data(
697+
false,
722698
&store_schema,
723699
None,
724700
None,
@@ -727,16 +703,8 @@ mod tests {
727703
&LogSource::Json,
728704
)
729705
.unwrap();
730-
let rb = Event::into_recordbatch(
731-
Utc::now(),
732-
data,
733-
schema,
734-
&store_schema,
735-
false,
736-
None,
737-
SchemaVersion::V0,
738-
)
739-
.unwrap();
706+
let rb =
707+
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V0).unwrap();
740708

741709
assert_eq!(rb.num_rows(), 3);
742710
assert_eq!(rb.num_columns(), 4);
@@ -785,6 +753,7 @@ mod tests {
785753
let store_schema = HashMap::new();
786754
let (data, schema, _) = Event::new(json)
787755
.to_data(
756+
false,
788757
&store_schema,
789758
None,
790759
None,
@@ -793,16 +762,8 @@ mod tests {
793762
&LogSource::Json,
794763
)
795764
.unwrap();
796-
let rb = Event::into_recordbatch(
797-
Utc::now(),
798-
data,
799-
schema,
800-
&store_schema,
801-
false,
802-
None,
803-
SchemaVersion::V0,
804-
)
805-
.unwrap();
765+
let rb =
766+
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V0).unwrap();
806767

807768
assert_eq!(rb.num_rows(), 3);
808769
assert_eq!(rb.num_columns(), 4);
@@ -850,6 +811,7 @@ mod tests {
850811
);
851812
let (data, schema, _) = Event::new(json)
852813
.to_data(
814+
false,
853815
&store_schema,
854816
None,
855817
None,
@@ -858,16 +820,8 @@ mod tests {
858820
&LogSource::Json,
859821
)
860822
.unwrap();
861-
let rb = Event::into_recordbatch(
862-
Utc::now(),
863-
data,
864-
schema,
865-
&store_schema,
866-
false,
867-
None,
868-
SchemaVersion::V0,
869-
)
870-
.unwrap();
823+
let rb =
824+
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V0).unwrap();
871825

872826
assert_eq!(rb.num_rows(), 3);
873827
assert_eq!(rb.num_columns(), 4);
@@ -916,6 +870,7 @@ mod tests {
916870

917871
assert!(Event::new(json)
918872
.to_data(
873+
false,
919874
&store_schema,
920875
None,
921876
None,
@@ -953,6 +908,7 @@ mod tests {
953908
let store_schema = HashMap::new();
954909
let (data, schema, _) = Event::new(json)
955910
.to_data(
911+
false,
956912
&store_schema,
957913
None,
958914
None,
@@ -961,16 +917,8 @@ mod tests {
961917
&LogSource::Json,
962918
)
963919
.unwrap();
964-
let rb = Event::into_recordbatch(
965-
Utc::now(),
966-
data,
967-
schema,
968-
&store_schema,
969-
false,
970-
None,
971-
SchemaVersion::V0,
972-
)
973-
.unwrap();
920+
let rb =
921+
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V0).unwrap();
974922

975923
assert_eq!(rb.num_rows(), 4);
976924
assert_eq!(rb.num_columns(), 5);
@@ -1044,6 +992,7 @@ mod tests {
1044992
let store_schema = HashMap::new();
1045993
let (data, schema, _) = Event::new(json)
1046994
.to_data(
995+
false,
1047996
&store_schema,
1048997
None,
1049998
None,
@@ -1052,16 +1001,8 @@ mod tests {
10521001
&LogSource::Json,
10531002
)
10541003
.unwrap();
1055-
let rb = Event::into_recordbatch(
1056-
Utc::now(),
1057-
data,
1058-
schema,
1059-
&store_schema,
1060-
false,
1061-
None,
1062-
SchemaVersion::V1,
1063-
)
1064-
.unwrap();
1004+
let rb =
1005+
Event::into_recordbatch(Utc::now(), data, &schema, None, SchemaVersion::V1).unwrap();
10651006

10661007
assert_eq!(rb.num_rows(), 4);
10671008
assert_eq!(rb.num_columns(), 5);

src/event/format/mod.rs

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -103,32 +103,26 @@ pub trait EventFormat: Sized {
103103

104104
fn to_data(
105105
self,
106-
schema: &HashMap<String, Arc<Field>>,
106+
static_schema_flag: bool,
107+
stored_schema: &HashMap<String, Arc<Field>>,
107108
time_partition: Option<&String>,
108109
time_partition_limit: Option<NonZeroU32>,
109-
custom_partition: Option<&String>,
110+
custom_partitions: Option<&String>,
110111
schema_version: SchemaVersion,
111112
log_source: &LogSource,
112113
) -> Result<(Self::Data, EventSchema, bool), AnyError>;
113114

114115
fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, AnyError>;
115116

116-
#[allow(clippy::too_many_arguments)]
117-
fn into_recordbatch(
118-
p_timestamp: DateTime<Utc>,
119-
data: Self::Data,
117+
/// Updates inferred schema with `p_timestamp` field and ensures it adheres to expectations
118+
fn prepare_and_validate_schema(
120119
mut schema: EventSchema,
121120
storage_schema: &HashMap<String, Arc<Field>>,
122121
static_schema_flag: bool,
123-
time_partition: Option<&String>,
124-
schema_version: SchemaVersion,
125-
) -> Result<RecordBatch, AnyError> {
122+
) -> Result<EventSchema, AnyError> {
126123
if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() {
127-
return Err(anyhow!(
128-
"field {} is a reserved field",
129-
DEFAULT_TIMESTAMP_KEY
130-
));
131-
};
124+
return Err(anyhow!("field {DEFAULT_TIMESTAMP_KEY} is a reserved field",));
125+
}
132126

133127
// add the p_timestamp field to the event schema to the 0th index
134128
schema.insert(
@@ -150,6 +144,17 @@ pub trait EventFormat: Sized {
150144
return Err(anyhow!("Schema mismatch"));
151145
}
152146

147+
Ok(schema)
148+
}
149+
150+
#[allow(clippy::too_many_arguments)]
151+
fn into_recordbatch(
152+
p_timestamp: DateTime<Utc>,
153+
data: Self::Data,
154+
schema: &EventSchema,
155+
time_partition: Option<&String>,
156+
schema_version: SchemaVersion,
157+
) -> Result<RecordBatch, AnyError> {
153158
// prepare the record batch and new fields to be added
154159
let mut new_schema = Arc::new(Schema::new(schema.clone()));
155160
new_schema =

0 commit comments

Comments
 (0)