Skip to content

Commit a4bf963

Browse files
author
Devdutt Shenoi
authored
refactor: use bools for flags (#1070)
NOTE: Only "true" is valid, de-serializing anything else is not allowed, if the field doesn't exist only then value is set to false, this is perfectly fine because we are reading from file and this will not impact previous behaviour as well
1 parent 05f2065 commit a4bf963

File tree

11 files changed

+223
-71
lines changed

11 files changed

+223
-71
lines changed

src/event/format/json.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ impl EventFormat for Event {
4444
fn to_data(
4545
self,
4646
schema: &HashMap<String, Arc<Field>>,
47-
static_schema_flag: Option<&String>,
47+
static_schema_flag: bool,
4848
time_partition: Option<&String>,
4949
schema_version: SchemaVersion,
5050
) -> Result<(Self::Data, Vec<Arc<Field>>, bool), anyhow::Error> {
@@ -94,7 +94,7 @@ impl EventFormat for Event {
9494
}
9595
};
9696

97-
if static_schema_flag.is_none()
97+
if !static_schema_flag
9898
&& value_arr
9999
.iter()
100100
.any(|value| fields_mismatch(&schema, value, schema_version))

src/event/format/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ pub trait EventFormat: Sized {
7777
fn to_data(
7878
self,
7979
schema: &HashMap<String, Arc<Field>>,
80-
static_schema_flag: Option<&String>,
80+
static_schema_flag: bool,
8181
time_partition: Option<&String>,
8282
schema_version: SchemaVersion,
8383
) -> Result<(Self::Data, EventSchema, bool), AnyError>;
@@ -87,7 +87,7 @@ pub trait EventFormat: Sized {
8787
fn into_recordbatch(
8888
self,
8989
storage_schema: &HashMap<String, Arc<Field>>,
90-
static_schema_flag: Option<&String>,
90+
static_schema_flag: bool,
9191
time_partition: Option<&String>,
9292
schema_version: SchemaVersion,
9393
) -> Result<(RecordBatch, bool), AnyError> {
@@ -130,9 +130,9 @@ pub trait EventFormat: Sized {
130130
fn is_schema_matching(
131131
new_schema: Arc<Schema>,
132132
storage_schema: &HashMap<String, Arc<Field>>,
133-
static_schema_flag: Option<&String>,
133+
static_schema_flag: bool,
134134
) -> bool {
135-
if static_schema_flag.is_none() {
135+
if !static_schema_flag {
136136
return true;
137137
}
138138
for field in new_schema.fields() {

src/handlers/http/ingest.rs

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
9090
.clone();
9191
let event = format::json::Event { data: body_val };
9292
// For internal streams, use old schema
93-
event.into_recordbatch(&schema, None, None, SchemaVersion::V0)?
93+
event.into_recordbatch(&schema, false, None, SchemaVersion::V0)?
9494
};
9595
event::Event {
9696
rb,
@@ -285,7 +285,7 @@ pub async fn create_stream_if_not_exists(
285285
"",
286286
None,
287287
"",
288-
"",
288+
false,
289289
Arc::new(Schema::empty()),
290290
stream_type,
291291
)
@@ -405,7 +405,7 @@ mod tests {
405405
});
406406

407407
let (rb, _) =
408-
into_event_batch(&json, HashMap::default(), None, None, SchemaVersion::V0).unwrap();
408+
into_event_batch(&json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();
409409

410410
assert_eq!(rb.num_rows(), 1);
411411
assert_eq!(rb.num_columns(), 4);
@@ -432,7 +432,7 @@ mod tests {
432432
});
433433

434434
let (rb, _) =
435-
into_event_batch(&json, HashMap::default(), None, None, SchemaVersion::V0).unwrap();
435+
into_event_batch(&json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();
436436

437437
assert_eq!(rb.num_rows(), 1);
438438
assert_eq!(rb.num_columns(), 3);
@@ -462,7 +462,7 @@ mod tests {
462462
.into_iter(),
463463
);
464464

465-
let (rb, _) = into_event_batch(&json, schema, None, None, SchemaVersion::V0).unwrap();
465+
let (rb, _) = into_event_batch(&json, schema, false, None, SchemaVersion::V0).unwrap();
466466

467467
assert_eq!(rb.num_rows(), 1);
468468
assert_eq!(rb.num_columns(), 3);
@@ -492,7 +492,7 @@ mod tests {
492492
.into_iter(),
493493
);
494494

495-
assert!(into_event_batch(&json, schema, None, None, SchemaVersion::V0,).is_err());
495+
assert!(into_event_batch(&json, schema, false, None, SchemaVersion::V0,).is_err());
496496
}
497497

498498
#[test]
@@ -508,7 +508,7 @@ mod tests {
508508
.into_iter(),
509509
);
510510

511-
let (rb, _) = into_event_batch(&json, schema, None, None, SchemaVersion::V0).unwrap();
511+
let (rb, _) = into_event_batch(&json, schema, false, None, SchemaVersion::V0).unwrap();
512512

513513
assert_eq!(rb.num_rows(), 1);
514514
assert_eq!(rb.num_columns(), 1);
@@ -517,6 +517,7 @@ mod tests {
517517
#[test]
518518
fn non_object_arr_is_err() {
519519
let json = json!([1]);
520+
520521
assert!(convert_array_to_object(
521522
json,
522523
None,
@@ -547,7 +548,7 @@ mod tests {
547548
]);
548549

549550
let (rb, _) =
550-
into_event_batch(&json, HashMap::default(), None, None, SchemaVersion::V0).unwrap();
551+
into_event_batch(&json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();
551552

552553
assert_eq!(rb.num_rows(), 3);
553554
assert_eq!(rb.num_columns(), 4);
@@ -594,7 +595,7 @@ mod tests {
594595
]);
595596

596597
let (rb, _) =
597-
into_event_batch(&json, HashMap::default(), None, None, SchemaVersion::V0).unwrap();
598+
into_event_batch(&json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();
598599

599600
assert_eq!(rb.num_rows(), 3);
600601
assert_eq!(rb.num_columns(), 4);
@@ -641,7 +642,7 @@ mod tests {
641642
.into_iter(),
642643
);
643644

644-
let (rb, _) = into_event_batch(&json, schema, None, None, SchemaVersion::V0).unwrap();
645+
let (rb, _) = into_event_batch(&json, schema, false, None, SchemaVersion::V0).unwrap();
645646

646647
assert_eq!(rb.num_rows(), 3);
647648
assert_eq!(rb.num_columns(), 4);
@@ -688,7 +689,7 @@ mod tests {
688689
.into_iter(),
689690
);
690691

691-
assert!(into_event_batch(&json, schema, None, None, SchemaVersion::V0,).is_err());
692+
assert!(into_event_batch(&json, schema, false, None, SchemaVersion::V0,).is_err());
692693
}
693694

694695
#[test]
@@ -729,7 +730,7 @@ mod tests {
729730
let (rb, _) = into_event_batch(
730731
&flattened_json,
731732
HashMap::default(),
732-
None,
733+
false,
733734
None,
734735
SchemaVersion::V0,
735736
)
@@ -817,7 +818,7 @@ mod tests {
817818
let (rb, _) = into_event_batch(
818819
&flattened_json,
819820
HashMap::default(),
820-
None,
821+
false,
821822
None,
822823
SchemaVersion::V1,
823824
)

src/handlers/http/logstream.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,7 @@ pub async fn create_stream(
489489
time_partition: &str,
490490
time_partition_limit: Option<NonZeroU32>,
491491
custom_partition: &str,
492-
static_schema_flag: &str,
492+
static_schema_flag: bool,
493493
schema: Arc<Schema>,
494494
stream_type: &str,
495495
) -> Result<(), CreateStreamError> {
@@ -529,7 +529,7 @@ pub async fn create_stream(
529529
time_partition.to_string(),
530530
time_partition_limit,
531531
custom_partition.to_string(),
532-
static_schema_flag.to_string(),
532+
static_schema_flag,
533533
static_schema,
534534
stream_type,
535535
SchemaVersion::V1, // New stream
@@ -582,7 +582,7 @@ pub async fn get_stream_info(req: HttpRequest) -> Result<impl Responder, StreamE
582582
.time_partition_limit
583583
.map(|limit| limit.to_string()),
584584
custom_partition: stream_meta.custom_partition.clone(),
585-
static_schema_flag: stream_meta.static_schema_flag.clone(),
585+
static_schema_flag: stream_meta.static_schema_flag,
586586
};
587587

588588
// get the other info from

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ pub async fn push_logs(
126126
let (rb, is_first_event) = into_event_batch(
127127
&value,
128128
schema,
129-
static_schema_flag.as_ref(),
129+
static_schema_flag,
130130
time_partition.as_ref(),
131131
schema_version,
132132
)?;
@@ -151,7 +151,7 @@ pub async fn push_logs(
151151
pub fn into_event_batch(
152152
body: &Value,
153153
schema: HashMap<String, Arc<Field>>,
154-
static_schema_flag: Option<&String>,
154+
static_schema_flag: bool,
155155
time_partition: Option<&String>,
156156
schema_version: SchemaVersion,
157157
) -> Result<(arrow_array::RecordBatch, bool), PostError> {

src/handlers/http/modal/utils/logstream_utils.rs

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ pub async fn create_update_stream(
5050
stream_type,
5151
) = fetch_headers_from_put_stream_request(req);
5252

53-
if metadata::STREAM_INFO.stream_exists(stream_name) && update_stream_flag != "true" {
53+
if metadata::STREAM_INFO.stream_exists(stream_name) && !update_stream_flag {
5454
return Err(StreamError::Custom {
5555
msg: format!(
5656
"Logstream {stream_name} already exists, please create a new log stream with unique name"
@@ -71,12 +71,12 @@ pub async fn create_update_stream(
7171
});
7272
}
7373

74-
if update_stream_flag == "true" {
74+
if update_stream_flag {
7575
return update_stream(
7676
req,
7777
stream_name,
7878
&time_partition,
79-
&static_schema_flag,
79+
static_schema_flag,
8080
&time_partition_limit,
8181
&custom_partition,
8282
)
@@ -102,15 +102,15 @@ pub async fn create_update_stream(
102102
stream_name,
103103
&time_partition,
104104
&custom_partition,
105-
&static_schema_flag,
105+
static_schema_flag,
106106
)?;
107107

108108
create_stream(
109109
stream_name.to_string(),
110110
&time_partition,
111111
time_partition_in_days,
112112
&custom_partition,
113-
&static_schema_flag,
113+
static_schema_flag,
114114
schema,
115115
&stream_type,
116116
)
@@ -123,7 +123,7 @@ async fn update_stream(
123123
req: &HttpRequest,
124124
stream_name: &str,
125125
time_partition: &str,
126-
static_schema_flag: &str,
126+
static_schema_flag: bool,
127127
time_partition_limit: &str,
128128
custom_partition: &str,
129129
) -> Result<HeaderMap, StreamError> {
@@ -136,7 +136,7 @@ async fn update_stream(
136136
status: StatusCode::BAD_REQUEST,
137137
});
138138
}
139-
if !static_schema_flag.is_empty() {
139+
if static_schema_flag {
140140
return Err(StreamError::Custom {
141141
msg: "Altering the schema of an existing stream is restricted.".to_string(),
142142
status: StatusCode::BAD_REQUEST,
@@ -167,12 +167,12 @@ async fn validate_and_update_custom_partition(
167167

168168
pub fn fetch_headers_from_put_stream_request(
169169
req: &HttpRequest,
170-
) -> (String, String, String, String, String, String) {
170+
) -> (String, String, String, bool, bool, String) {
171171
let mut time_partition = String::default();
172172
let mut time_partition_limit = String::default();
173173
let mut custom_partition = String::default();
174-
let mut static_schema_flag = String::default();
175-
let mut update_stream = String::default();
174+
let mut static_schema_flag = false;
175+
let mut update_stream_flag = false;
176176
let mut stream_type = StreamType::UserDefined.to_string();
177177
req.headers().iter().for_each(|(key, value)| {
178178
if key == TIME_PARTITION_KEY {
@@ -184,11 +184,11 @@ pub fn fetch_headers_from_put_stream_request(
184184
if key == CUSTOM_PARTITION_KEY {
185185
custom_partition = value.to_str().unwrap().to_string();
186186
}
187-
if key == STATIC_SCHEMA_FLAG {
188-
static_schema_flag = value.to_str().unwrap().to_string();
187+
if key == STATIC_SCHEMA_FLAG && value.to_str().unwrap() == "true" {
188+
static_schema_flag = true;
189189
}
190-
if key == UPDATE_STREAM_KEY {
191-
update_stream = value.to_str().unwrap().to_string();
190+
if key == UPDATE_STREAM_KEY && value.to_str().unwrap() == "true" {
191+
update_stream_flag = true;
192192
}
193193
if key == STREAM_TYPE_KEY {
194194
stream_type = value.to_str().unwrap().to_string();
@@ -200,7 +200,7 @@ pub fn fetch_headers_from_put_stream_request(
200200
time_partition_limit,
201201
custom_partition,
202202
static_schema_flag,
203-
update_stream,
203+
update_stream_flag,
204204
stream_type,
205205
)
206206
}
@@ -258,9 +258,9 @@ pub fn validate_static_schema(
258258
stream_name: &str,
259259
time_partition: &str,
260260
custom_partition: &str,
261-
static_schema_flag: &str,
261+
static_schema_flag: bool,
262262
) -> Result<Arc<Schema>, CreateStreamError> {
263-
if static_schema_flag == "true" {
263+
if static_schema_flag {
264264
if body.is_empty() {
265265
return Err(CreateStreamError::Custom {
266266
msg: format!(
@@ -317,7 +317,7 @@ pub async fn update_custom_partition_in_stream(
317317
) -> Result<(), CreateStreamError> {
318318
let static_schema_flag = STREAM_INFO.get_static_schema_flag(&stream_name).unwrap();
319319
let time_partition = STREAM_INFO.get_time_partition(&stream_name).unwrap();
320-
if static_schema_flag.is_some() {
320+
if static_schema_flag {
321321
let schema = STREAM_INFO.schema(&stream_name).unwrap();
322322

323323
if !custom_partition.is_empty() {
@@ -383,7 +383,7 @@ pub async fn create_stream(
383383
time_partition: &str,
384384
time_partition_limit: Option<NonZeroU32>,
385385
custom_partition: &str,
386-
static_schema_flag: &str,
386+
static_schema_flag: bool,
387387
schema: Arc<Schema>,
388388
stream_type: &str,
389389
) -> Result<(), CreateStreamError> {
@@ -423,7 +423,7 @@ pub async fn create_stream(
423423
time_partition.to_string(),
424424
time_partition_limit,
425425
custom_partition.to_string(),
426-
static_schema_flag.to_string(),
426+
static_schema_flag,
427427
static_schema,
428428
stream_type,
429429
SchemaVersion::V1, // New stream
@@ -473,7 +473,7 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<
473473
.time_partition_limit
474474
.and_then(|limit| limit.parse().ok());
475475
let custom_partition = stream_metadata.custom_partition.as_deref().unwrap_or("");
476-
let static_schema_flag = stream_metadata.static_schema_flag.as_deref().unwrap_or("");
476+
let static_schema_flag = stream_metadata.static_schema_flag;
477477
let stream_type = stream_metadata.stream_type.as_deref().unwrap_or("");
478478
let schema_version = stream_metadata.schema_version;
479479

@@ -483,7 +483,7 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<
483483
time_partition.to_string(),
484484
time_partition_limit,
485485
custom_partition.to_string(),
486-
static_schema_flag.to_string(),
486+
static_schema_flag,
487487
static_schema,
488488
stream_type,
489489
schema_version,

src/kafka.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ async fn ingest_message(msg: BorrowedMessage<'_>) -> Result<(), KafkaError> {
194194
let (rb, is_first) = event
195195
.into_recordbatch(
196196
&schema,
197-
static_schema_flag.as_ref(),
197+
static_schema_flag,
198198
time_partition.as_ref(),
199199
schema_version,
200200
)

0 commit comments

Comments
 (0)