Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl EventFormat for Event {
fn to_data(
self,
schema: &HashMap<String, Arc<Field>>,
static_schema_flag: Option<&String>,
static_schema_flag: bool,
time_partition: Option<&String>,
schema_version: SchemaVersion,
) -> Result<(Self::Data, Vec<Arc<Field>>, bool), anyhow::Error> {
Expand Down Expand Up @@ -94,7 +94,7 @@ impl EventFormat for Event {
}
};

if static_schema_flag.is_none()
if !static_schema_flag
&& value_arr
.iter()
.any(|value| fields_mismatch(&schema, value, schema_version))
Expand Down
8 changes: 4 additions & 4 deletions src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ pub trait EventFormat: Sized {
fn to_data(
self,
schema: &HashMap<String, Arc<Field>>,
static_schema_flag: Option<&String>,
static_schema_flag: bool,
time_partition: Option<&String>,
schema_version: SchemaVersion,
) -> Result<(Self::Data, EventSchema, bool), AnyError>;
Expand All @@ -87,7 +87,7 @@ pub trait EventFormat: Sized {
fn into_recordbatch(
self,
storage_schema: &HashMap<String, Arc<Field>>,
static_schema_flag: Option<&String>,
static_schema_flag: bool,
time_partition: Option<&String>,
schema_version: SchemaVersion,
) -> Result<(RecordBatch, bool), AnyError> {
Expand Down Expand Up @@ -130,9 +130,9 @@ pub trait EventFormat: Sized {
fn is_schema_matching(
new_schema: Arc<Schema>,
storage_schema: &HashMap<String, Arc<Field>>,
static_schema_flag: Option<&String>,
static_schema_flag: bool,
) -> bool {
if static_schema_flag.is_none() {
if !static_schema_flag {
return true;
}
for field in new_schema.fields() {
Expand Down
27 changes: 14 additions & 13 deletions src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
.clone();
let event = format::json::Event { data: body_val };
// For internal streams, use old schema
event.into_recordbatch(&schema, None, None, SchemaVersion::V0)?
event.into_recordbatch(&schema, false, None, SchemaVersion::V0)?
};
event::Event {
rb,
Expand Down Expand Up @@ -285,7 +285,7 @@ pub async fn create_stream_if_not_exists(
"",
None,
"",
"",
false,
Arc::new(Schema::empty()),
stream_type,
)
Expand Down Expand Up @@ -405,7 +405,7 @@ mod tests {
});

let (rb, _) =
into_event_batch(&json, HashMap::default(), None, None, SchemaVersion::V0).unwrap();
into_event_batch(&json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 4);
Expand All @@ -432,7 +432,7 @@ mod tests {
});

let (rb, _) =
into_event_batch(&json, HashMap::default(), None, None, SchemaVersion::V0).unwrap();
into_event_batch(&json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 3);
Expand Down Expand Up @@ -462,7 +462,7 @@ mod tests {
.into_iter(),
);

let (rb, _) = into_event_batch(&json, schema, None, None, SchemaVersion::V0).unwrap();
let (rb, _) = into_event_batch(&json, schema, false, None, SchemaVersion::V0).unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 3);
Expand Down Expand Up @@ -492,7 +492,7 @@ mod tests {
.into_iter(),
);

assert!(into_event_batch(&json, schema, None, None, SchemaVersion::V0,).is_err());
assert!(into_event_batch(&json, schema, false, None, SchemaVersion::V0,).is_err());
}

#[test]
Expand All @@ -508,7 +508,7 @@ mod tests {
.into_iter(),
);

let (rb, _) = into_event_batch(&json, schema, None, None, SchemaVersion::V0).unwrap();
let (rb, _) = into_event_batch(&json, schema, false, None, SchemaVersion::V0).unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 1);
Expand All @@ -517,6 +517,7 @@ mod tests {
#[test]
fn non_object_arr_is_err() {
let json = json!([1]);

assert!(convert_array_to_object(
json,
None,
Expand Down Expand Up @@ -547,7 +548,7 @@ mod tests {
]);

let (rb, _) =
into_event_batch(&json, HashMap::default(), None, None, SchemaVersion::V0).unwrap();
into_event_batch(&json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();

assert_eq!(rb.num_rows(), 3);
assert_eq!(rb.num_columns(), 4);
Expand Down Expand Up @@ -594,7 +595,7 @@ mod tests {
]);

let (rb, _) =
into_event_batch(&json, HashMap::default(), None, None, SchemaVersion::V0).unwrap();
into_event_batch(&json, HashMap::default(), false, None, SchemaVersion::V0).unwrap();

assert_eq!(rb.num_rows(), 3);
assert_eq!(rb.num_columns(), 4);
Expand Down Expand Up @@ -641,7 +642,7 @@ mod tests {
.into_iter(),
);

let (rb, _) = into_event_batch(&json, schema, None, None, SchemaVersion::V0).unwrap();
let (rb, _) = into_event_batch(&json, schema, false, None, SchemaVersion::V0).unwrap();

assert_eq!(rb.num_rows(), 3);
assert_eq!(rb.num_columns(), 4);
Expand Down Expand Up @@ -688,7 +689,7 @@ mod tests {
.into_iter(),
);

assert!(into_event_batch(&json, schema, None, None, SchemaVersion::V0,).is_err());
assert!(into_event_batch(&json, schema, false, None, SchemaVersion::V0,).is_err());
}

#[test]
Expand Down Expand Up @@ -729,7 +730,7 @@ mod tests {
let (rb, _) = into_event_batch(
&flattened_json,
HashMap::default(),
None,
false,
None,
SchemaVersion::V0,
)
Expand Down Expand Up @@ -817,7 +818,7 @@ mod tests {
let (rb, _) = into_event_batch(
&flattened_json,
HashMap::default(),
None,
false,
None,
SchemaVersion::V1,
)
Expand Down
6 changes: 3 additions & 3 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ pub async fn create_stream(
time_partition: &str,
time_partition_limit: Option<NonZeroU32>,
custom_partition: &str,
static_schema_flag: &str,
static_schema_flag: bool,
schema: Arc<Schema>,
stream_type: &str,
) -> Result<(), CreateStreamError> {
Expand Down Expand Up @@ -529,7 +529,7 @@ pub async fn create_stream(
time_partition.to_string(),
time_partition_limit,
custom_partition.to_string(),
static_schema_flag.to_string(),
static_schema_flag,
static_schema,
stream_type,
SchemaVersion::V1, // New stream
Expand Down Expand Up @@ -582,7 +582,7 @@ pub async fn get_stream_info(req: HttpRequest) -> Result<impl Responder, StreamE
.time_partition_limit
.map(|limit| limit.to_string()),
custom_partition: stream_meta.custom_partition.clone(),
static_schema_flag: stream_meta.static_schema_flag.clone(),
static_schema_flag: stream_meta.static_schema_flag,
};

// get the other info from
Expand Down
4 changes: 2 additions & 2 deletions src/handlers/http/modal/utils/ingest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ pub async fn push_logs(
let (rb, is_first_event) = into_event_batch(
&value,
schema,
static_schema_flag.as_ref(),
static_schema_flag,
time_partition.as_ref(),
schema_version,
)?;
Expand All @@ -151,7 +151,7 @@ pub async fn push_logs(
pub fn into_event_batch(
body: &Value,
schema: HashMap<String, Arc<Field>>,
static_schema_flag: Option<&String>,
static_schema_flag: bool,
time_partition: Option<&String>,
schema_version: SchemaVersion,
) -> Result<(arrow_array::RecordBatch, bool), PostError> {
Expand Down
44 changes: 22 additions & 22 deletions src/handlers/http/modal/utils/logstream_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub async fn create_update_stream(
stream_type,
) = fetch_headers_from_put_stream_request(req);

if metadata::STREAM_INFO.stream_exists(stream_name) && update_stream_flag != "true" {
if metadata::STREAM_INFO.stream_exists(stream_name) && !update_stream_flag {
return Err(StreamError::Custom {
msg: format!(
"Logstream {stream_name} already exists, please create a new log stream with unique name"
Expand All @@ -71,12 +71,12 @@ pub async fn create_update_stream(
});
}

if update_stream_flag == "true" {
if update_stream_flag {
return update_stream(
req,
stream_name,
&time_partition,
&static_schema_flag,
static_schema_flag,
&time_partition_limit,
&custom_partition,
)
Expand All @@ -102,15 +102,15 @@ pub async fn create_update_stream(
stream_name,
&time_partition,
&custom_partition,
&static_schema_flag,
static_schema_flag,
)?;

create_stream(
stream_name.to_string(),
&time_partition,
time_partition_in_days,
&custom_partition,
&static_schema_flag,
static_schema_flag,
schema,
&stream_type,
)
Expand All @@ -123,7 +123,7 @@ async fn update_stream(
req: &HttpRequest,
stream_name: &str,
time_partition: &str,
static_schema_flag: &str,
static_schema_flag: bool,
time_partition_limit: &str,
custom_partition: &str,
) -> Result<HeaderMap, StreamError> {
Expand All @@ -136,7 +136,7 @@ async fn update_stream(
status: StatusCode::BAD_REQUEST,
});
}
if !static_schema_flag.is_empty() {
if static_schema_flag {
return Err(StreamError::Custom {
msg: "Altering the schema of an existing stream is restricted.".to_string(),
status: StatusCode::BAD_REQUEST,
Expand Down Expand Up @@ -167,12 +167,12 @@ async fn validate_and_update_custom_partition(

pub fn fetch_headers_from_put_stream_request(
req: &HttpRequest,
) -> (String, String, String, String, String, String) {
) -> (String, String, String, bool, bool, String) {
let mut time_partition = String::default();
let mut time_partition_limit = String::default();
let mut custom_partition = String::default();
let mut static_schema_flag = String::default();
let mut update_stream = String::default();
let mut static_schema_flag = false;
let mut update_stream_flag = false;
let mut stream_type = StreamType::UserDefined.to_string();
req.headers().iter().for_each(|(key, value)| {
if key == TIME_PARTITION_KEY {
Expand All @@ -184,11 +184,11 @@ pub fn fetch_headers_from_put_stream_request(
if key == CUSTOM_PARTITION_KEY {
custom_partition = value.to_str().unwrap().to_string();
}
if key == STATIC_SCHEMA_FLAG {
static_schema_flag = value.to_str().unwrap().to_string();
if key == STATIC_SCHEMA_FLAG && value.to_str().unwrap() == "true" {
static_schema_flag = true;
}
if key == UPDATE_STREAM_KEY {
update_stream = value.to_str().unwrap().to_string();
if key == UPDATE_STREAM_KEY && value.to_str().unwrap() == "true" {
update_stream_flag = true;
}
if key == STREAM_TYPE_KEY {
stream_type = value.to_str().unwrap().to_string();
Expand All @@ -200,7 +200,7 @@ pub fn fetch_headers_from_put_stream_request(
time_partition_limit,
custom_partition,
static_schema_flag,
update_stream,
update_stream_flag,
stream_type,
)
}
Expand Down Expand Up @@ -258,9 +258,9 @@ pub fn validate_static_schema(
stream_name: &str,
time_partition: &str,
custom_partition: &str,
static_schema_flag: &str,
static_schema_flag: bool,
) -> Result<Arc<Schema>, CreateStreamError> {
if static_schema_flag == "true" {
if static_schema_flag {
if body.is_empty() {
return Err(CreateStreamError::Custom {
msg: format!(
Expand Down Expand Up @@ -317,7 +317,7 @@ pub async fn update_custom_partition_in_stream(
) -> Result<(), CreateStreamError> {
let static_schema_flag = STREAM_INFO.get_static_schema_flag(&stream_name).unwrap();
let time_partition = STREAM_INFO.get_time_partition(&stream_name).unwrap();
if static_schema_flag.is_some() {
if static_schema_flag {
let schema = STREAM_INFO.schema(&stream_name).unwrap();

if !custom_partition.is_empty() {
Expand Down Expand Up @@ -383,7 +383,7 @@ pub async fn create_stream(
time_partition: &str,
time_partition_limit: Option<NonZeroU32>,
custom_partition: &str,
static_schema_flag: &str,
static_schema_flag: bool,
schema: Arc<Schema>,
stream_type: &str,
) -> Result<(), CreateStreamError> {
Expand Down Expand Up @@ -423,7 +423,7 @@ pub async fn create_stream(
time_partition.to_string(),
time_partition_limit,
custom_partition.to_string(),
static_schema_flag.to_string(),
static_schema_flag,
static_schema,
stream_type,
SchemaVersion::V1, // New stream
Expand Down Expand Up @@ -473,7 +473,7 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<
.time_partition_limit
.and_then(|limit| limit.parse().ok());
let custom_partition = stream_metadata.custom_partition.as_deref().unwrap_or("");
let static_schema_flag = stream_metadata.static_schema_flag.as_deref().unwrap_or("");
let static_schema_flag = stream_metadata.static_schema_flag;
let stream_type = stream_metadata.stream_type.as_deref().unwrap_or("");
let schema_version = stream_metadata.schema_version;

Expand All @@ -483,7 +483,7 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<
time_partition.to_string(),
time_partition_limit,
custom_partition.to_string(),
static_schema_flag.to_string(),
static_schema_flag,
static_schema,
stream_type,
schema_version,
Expand Down
2 changes: 1 addition & 1 deletion src/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ async fn ingest_message(msg: BorrowedMessage<'_>) -> Result<(), KafkaError> {
let (rb, is_first) = event
.into_recordbatch(
&schema,
static_schema_flag.as_ref(),
static_schema_flag,
time_partition.as_ref(),
schema_version,
)
Expand Down
Loading
Loading