From cf09d8788769fba8f893f2df3fb09ddfb4e63b57 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 16 Feb 2025 14:52:25 +0530 Subject: [PATCH 1/2] refactor: memory/cpu efficient handling of stream configuration variables --- src/handlers/http/ingest.rs | 6 +- src/handlers/http/logstream.rs | 16 +- src/handlers/http/modal/utils/ingest_utils.rs | 19 +- .../http/modal/utils/logstream_utils.rs | 94 +++++++-- src/metadata.rs | 14 +- src/migration/mod.rs | 4 +- src/parseable/mod.rs | 178 ++++++------------ src/parseable/streams.rs | 49 +++-- src/static_schema.rs | 42 ++--- src/storage/mod.rs | 21 ++- src/storage/object_storage.rs | 20 +- src/utils/json/flatten.rs | 59 +++--- src/utils/json/mod.rs | 63 ++++++- 13 files changed, 303 insertions(+), 282 deletions(-) diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index dab4f84df..525ef3731 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -512,7 +512,7 @@ mod tests { json, None, None, - None, + &[], SchemaVersion::V0, &crate::event::format::LogSource::default() ) @@ -709,7 +709,7 @@ mod tests { json, None, None, - None, + &[], SchemaVersion::V0, &crate::event::format::LogSource::default(), ) @@ -797,7 +797,7 @@ mod tests { json, None, None, - None, + &[], SchemaVersion::V1, &crate::event::format::LogSource::default(), ) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index b9fb64edc..56a32c1f6 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -363,7 +363,7 @@ pub async fn get_stream_info(stream_name: Path) -> Result StatusCode::FORBIDDEN, StreamError::HotTierValidation(_) => StatusCode::BAD_REQUEST, StreamError::HotTierError(_) => StatusCode::INTERNAL_SERVER_ERROR, + StreamError::HeaderParsing(_) => StatusCode::BAD_REQUEST, } } @@ -626,7 +630,7 @@ mod tests { #[actix_web::test] async fn header_without_log_source() { let req = TestRequest::default().to_http_request(); - let PutStreamHeaders { log_source, .. } = req.headers().into(); + let PutStreamHeaders { log_source, .. } = req.headers().try_into().unwrap(); assert_eq!(log_source, crate::event::format::LogSource::Json); } @@ -635,19 +639,19 @@ mod tests { let mut req = TestRequest::default() .insert_header(("X-P-Log-Source", "pmeta")) .to_http_request(); - let PutStreamHeaders { log_source, .. } = req.headers().into(); + let PutStreamHeaders { log_source, .. } = req.headers().try_into().unwrap(); assert_eq!(log_source, crate::event::format::LogSource::Pmeta); req = TestRequest::default() .insert_header(("X-P-Log-Source", "otel-logs")) .to_http_request(); - let PutStreamHeaders { log_source, .. } = req.headers().into(); + let PutStreamHeaders { log_source, .. } = req.headers().try_into().unwrap(); assert_eq!(log_source, crate::event::format::LogSource::OtelLogs); req = TestRequest::default() .insert_header(("X-P-Log-Source", "kinesis")) .to_http_request(); - let PutStreamHeaders { log_source, .. } = req.headers().into(); + let PutStreamHeaders { log_source, .. } = req.headers().try_into().unwrap(); assert_eq!(log_source, crate::event::format::LogSource::Kinesis); } @@ -656,7 +660,7 @@ mod tests { let req = TestRequest::default() .insert_header(("X-P-Log-Source", "teststream")) .to_http_request(); - let PutStreamHeaders { log_source, .. } = req.headers().into(); + let PutStreamHeaders { log_source, .. } = req.headers().try_into().unwrap(); assert_eq!(log_source, crate::event::format::LogSource::Json); } } diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 402f4d3df..1e6f53188 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -18,7 +18,6 @@ use arrow_schema::Field; use chrono::{DateTime, NaiveDateTime, Utc}; -use itertools::Itertools; use serde_json::Value; use std::{collections::HashMap, sync::Arc}; @@ -72,15 +71,15 @@ pub async fn push_logs( .get_stream(stream_name)? .get_time_partition_limit(); let static_schema_flag = stream.get_static_schema_flag(); - let custom_partition = stream.get_custom_partition(); + let custom_partitions = stream.get_custom_partitions(); let schema_version = stream.get_schema_version(); - let data = if time_partition.is_some() || custom_partition.is_some() { + let data = if time_partition.is_some() || !custom_partitions.is_empty() { convert_array_to_object( json, time_partition.as_ref(), time_partition_limit, - custom_partition.as_ref(), + &custom_partitions, schema_version, log_source, )? @@ -89,7 +88,7 @@ pub async fn push_logs( json, None, None, - None, + &[], schema_version, log_source, )?)?] @@ -101,13 +100,7 @@ pub async fn push_logs( Some(time_partition) => get_parsed_timestamp(&value, time_partition)?, _ => Utc::now().naive_utc(), }; - let custom_partition_values = match custom_partition.as_ref() { - Some(custom_partition) => { - let custom_partitions = custom_partition.split(',').collect_vec(); - get_custom_partition_values(&value, &custom_partitions) - } - None => HashMap::new(), - }; + let custom_partition_values = get_custom_partition_values(&value, &custom_partitions); let schema = PARSEABLE .streams .read() @@ -162,7 +155,7 @@ pub fn into_event_batch( pub fn get_custom_partition_values( json: &Value, - custom_partition_list: &[&str], + custom_partition_list: &[String], ) -> HashMap { let mut custom_partition_values: HashMap = HashMap::new(); for custom_partition_field in custom_partition_list { diff --git a/src/handlers/http/modal/utils/logstream_utils.rs b/src/handlers/http/modal/utils/logstream_utils.rs index 15c25da2b..841c79c54 100644 --- a/src/handlers/http/modal/utils/logstream_utils.rs +++ b/src/handlers/http/modal/utils/logstream_utils.rs @@ -16,6 +16,8 @@ * */ +use std::num::NonZeroU32; + use actix_web::http::header::HeaderMap; use crate::{ @@ -27,31 +29,57 @@ use crate::{ storage::StreamType, }; +/// Name of a field that appears within a data stream +pub type FieldName = String; + +/// Name of the field used as a custom partition +pub type CustomPartition = String; + +#[derive(Debug, thiserror::Error)] +pub enum HeaderParseError { + #[error("Maximum 3 custom partition keys are supported")] + TooManyPartitions, + #[error("Missing 'd' suffix for duration value")] + UnsupportedUnit, + #[error("Could not convert duration to an unsigned number")] + ZeroOrNegative, +} + #[derive(Debug, Default)] pub struct PutStreamHeaders { - pub time_partition: String, - pub time_partition_limit: String, - pub custom_partition: Option, + pub time_partition: Option, + pub time_partition_limit: Option, + pub custom_partitions: Vec, pub static_schema_flag: bool, pub update_stream_flag: bool, pub stream_type: StreamType, pub log_source: LogSource, } -impl From<&HeaderMap> for PutStreamHeaders { - fn from(headers: &HeaderMap) -> Self { - PutStreamHeaders { - time_partition: headers - .get(TIME_PARTITION_KEY) - .map_or("", |v| v.to_str().unwrap()) - .to_string(), - time_partition_limit: headers - .get(TIME_PARTITION_LIMIT_KEY) - .map_or("", |v| v.to_str().unwrap()) - .to_string(), - custom_partition: headers - .get(CUSTOM_PARTITION_KEY) - .map(|v| v.to_str().unwrap().to_string()), +impl TryFrom<&HeaderMap> for PutStreamHeaders { + type Error = HeaderParseError; + + fn try_from(headers: &HeaderMap) -> Result { + let time_partition = headers + .get(TIME_PARTITION_KEY) + .map(|v| v.to_str().unwrap().to_owned()); + let time_partition_limit = match headers + .get(TIME_PARTITION_LIMIT_KEY) + .map(|v| v.to_str().unwrap()) + { + Some(limit) => Some(parse_time_partition_limit(limit)?), + None => None, + }; + let custom_partition = headers + .get(CUSTOM_PARTITION_KEY) + .map(|v| v.to_str().unwrap()) + .unwrap_or_default(); + let custom_partitions = parse_custom_partition(custom_partition)?; + + let headers = PutStreamHeaders { + time_partition, + time_partition_limit, + custom_partitions, static_schema_flag: headers .get(STATIC_SCHEMA_FLAG) .is_some_and(|v| v.to_str().unwrap() == "true"), @@ -65,6 +93,36 @@ impl From<&HeaderMap> for PutStreamHeaders { log_source: headers .get(LOG_SOURCE_KEY) .map_or(LogSource::default(), |v| v.to_str().unwrap().into()), - } + }; + + Ok(headers) } } + +pub fn parse_custom_partition( + custom_partition: &str, +) -> Result, HeaderParseError> { + let custom_partition_list = custom_partition + .split(',') + .map(String::from) + .collect::>(); + if custom_partition_list.len() > 3 { + return Err(HeaderParseError::TooManyPartitions); + } + + Ok(custom_partition_list) +} + +pub fn parse_time_partition_limit( + time_partition_limit: &str, +) -> Result { + if !time_partition_limit.ends_with('d') { + return Err(HeaderParseError::UnsupportedUnit); + } + let days = &time_partition_limit[0..time_partition_limit.len() - 1]; + let Ok(days) = days.parse::() else { + return Err(HeaderParseError::ZeroOrNegative); + }; + + Ok(days) +} diff --git a/src/metadata.rs b/src/metadata.rs index a29fdfee2..28cc4d904 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -83,7 +83,7 @@ pub struct LogStreamMetadata { pub first_event_at: Option, pub time_partition: Option, pub time_partition_limit: Option, - pub custom_partition: Option, + pub custom_partitions: Vec, pub static_schema_flag: bool, pub hot_tier_enabled: bool, pub stream_type: StreamType, @@ -94,9 +94,9 @@ impl LogStreamMetadata { #[allow(clippy::too_many_arguments)] pub fn new( created_at: String, - time_partition: String, + time_partition: Option, time_partition_limit: Option, - custom_partition: Option, + custom_partitions: Vec, static_schema_flag: bool, static_schema: HashMap>, stream_type: StreamType, @@ -109,13 +109,9 @@ impl LogStreamMetadata { } else { created_at }, - time_partition: if time_partition.is_empty() { - None - } else { - Some(time_partition) - }, + time_partition, time_partition_limit, - custom_partition, + custom_partitions, static_schema_flag, schema: if static_schema.is_empty() { HashMap::new() diff --git a/src/migration/mod.rs b/src/migration/mod.rs index 5e82ec0d6..5b038493a 100644 --- a/src/migration/mod.rs +++ b/src/migration/mod.rs @@ -272,7 +272,7 @@ async fn migration_stream( stats, time_partition, time_partition_limit, - custom_partition, + custom_partitions, static_schema_flag, hot_tier_enabled, stream_type, @@ -307,7 +307,7 @@ async fn migration_stream( first_event_at, time_partition, time_partition_limit: time_partition_limit.and_then(|limit| limit.parse().ok()), - custom_partition, + custom_partitions, static_schema_flag, hot_tier_enabled, stream_type, diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 2fba1d5b5..14ae68f16 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -319,11 +319,11 @@ impl Parseable { .collect(); let created_at = stream_metadata.created_at; - let time_partition = stream_metadata.time_partition.unwrap_or_default(); + let time_partition = stream_metadata.time_partition; let time_partition_limit = stream_metadata .time_partition_limit .and_then(|limit| limit.parse().ok()); - let custom_partition = stream_metadata.custom_partition; + let custom_partition = stream_metadata.custom_partitions; let static_schema_flag = stream_metadata.static_schema_flag; let stream_type = stream_metadata.stream_type; let schema_version = stream_metadata.schema_version; @@ -399,9 +399,9 @@ impl Parseable { self.create_stream( stream_name.to_string(), - "", None, None, + &[], false, Arc::new(Schema::empty()), stream_type, @@ -421,12 +421,12 @@ impl Parseable { let PutStreamHeaders { time_partition, time_partition_limit, - custom_partition, + custom_partitions, static_schema_flag, update_stream_flag, stream_type, log_source, - } = headers.into(); + } = headers.try_into()?; let stream_in_memory_dont_update = self.streams.contains(stream_name) && !update_stream_flag; @@ -451,53 +451,36 @@ impl Parseable { stream_name, &time_partition, static_schema_flag, - &time_partition_limit, - custom_partition.as_ref(), + time_partition_limit, + &custom_partitions, ) .await; } - let time_partition_in_days = if !time_partition_limit.is_empty() { - Some(validate_time_partition_limit(&time_partition_limit)?) - } else { - None - }; - - if let Some(custom_partition) = &custom_partition { - validate_custom_partition(custom_partition)?; - } - - if !time_partition.is_empty() && custom_partition.is_some() { - let custom_partition_list = custom_partition - .as_ref() - .unwrap() - .split(',') - .collect::>(); - if custom_partition_list.contains(&time_partition.as_str()) { - return Err(CreateStreamError::Custom { - msg: format!( - "time partition {} cannot be set as custom partition", - time_partition - ), - status: StatusCode::BAD_REQUEST, - } - .into()); + if time_partition.is_some() + && !custom_partitions.is_empty() + && custom_partitions.contains(time_partition.as_ref().expect("Is Some")) + { + return Err(CreateStreamError::Custom { + msg: format!("time partition {time_partition:?} cannot be set as custom partition"), + status: StatusCode::BAD_REQUEST, } + .into()); } let schema = validate_static_schema( body, stream_name, &time_partition, - custom_partition.as_ref(), + &custom_partitions, static_schema_flag, )?; self.create_stream( stream_name.to_string(), - &time_partition, - time_partition_in_days, - custom_partition.as_ref(), + time_partition, + time_partition_limit, + &custom_partitions, static_schema_flag, schema, stream_type, @@ -512,15 +495,15 @@ impl Parseable { &self, headers: &HeaderMap, stream_name: &str, - time_partition: &str, + time_partition: &Option, static_schema_flag: bool, - time_partition_limit: &str, - custom_partition: Option<&String>, + time_partition_limit: Option, + custom_partitions: &[String], ) -> Result { if !self.streams.contains(stream_name) { return Err(StreamNotFound(stream_name.to_string()).into()); } - if !time_partition.is_empty() { + if time_partition.is_some() { return Err(StreamError::Custom { msg: "Altering the time partition of an existing stream is restricted.".to_string(), status: StatusCode::BAD_REQUEST, @@ -532,8 +515,7 @@ impl Parseable { status: StatusCode::BAD_REQUEST, }); } - if !time_partition_limit.is_empty() { - let time_partition_days = validate_time_partition_limit(time_partition_limit)?; + if let Some(time_partition_days) = time_partition_limit { self.update_time_partition_limit_in_stream( stream_name.to_string(), time_partition_days, @@ -541,7 +523,7 @@ impl Parseable { .await?; return Ok(headers.clone()); } - self.validate_and_update_custom_partition(stream_name, custom_partition) + self.update_custom_partition_in_stream(stream_name.to_string(), custom_partitions) .await?; Ok(headers.clone()) @@ -551,9 +533,9 @@ impl Parseable { pub async fn create_stream( &self, stream_name: String, - time_partition: &str, + time_partition: Option, time_partition_limit: Option, - custom_partition: Option<&String>, + custom_partitions: &[String], static_schema_flag: bool, schema: Arc, stream_type: StreamType, @@ -570,9 +552,9 @@ impl Parseable { created_at: Local::now().to_rfc3339(), permissions: vec![Permisssion::new(PARSEABLE.options.username.clone())], stream_type, - time_partition: (!time_partition.is_empty()).then(|| time_partition.to_string()), + time_partition: time_partition.clone(), time_partition_limit: time_partition_limit.map(|limit| limit.to_string()), - custom_partition: custom_partition.cloned(), + custom_partitions: custom_partitions.to_vec(), static_schema_flag, schema_version: SchemaVersion::V1, // NOTE: Newly created streams are all V1 owner: Owner { @@ -600,9 +582,9 @@ impl Parseable { let metadata = LogStreamMetadata::new( created_at, - time_partition.to_owned(), + time_partition, time_partition_limit, - custom_partition.cloned(), + custom_partitions.to_vec(), static_schema_flag, static_schema, stream_type, @@ -625,20 +607,6 @@ impl Parseable { Ok(()) } - async fn validate_and_update_custom_partition( - &self, - stream_name: &str, - custom_partition: Option<&String>, - ) -> Result<(), StreamError> { - if let Some(custom_partition) = custom_partition { - validate_custom_partition(custom_partition)?; - } - self.update_custom_partition_in_stream(stream_name.to_string(), custom_partition) - .await?; - - Ok(()) - } - pub async fn update_time_partition_limit_in_stream( &self, stream_name: String, @@ -667,7 +635,7 @@ impl Parseable { pub async fn update_custom_partition_in_stream( &self, stream_name: String, - custom_partition: Option<&String>, + custom_partitions: &[String], ) -> Result<(), CreateStreamError> { let stream = self.get_stream(&stream_name).expect(STREAM_EXISTS); let static_schema_flag = stream.get_static_schema_flag(); @@ -675,46 +643,39 @@ impl Parseable { if static_schema_flag { let schema = stream.get_schema(); - if let Some(custom_partition) = custom_partition { - let custom_partition_list = custom_partition.split(',').collect::>(); - for partition in custom_partition_list.iter() { - if !schema - .fields() - .iter() - .any(|field| field.name() == partition) - { - return Err(CreateStreamError::Custom { + for partition in custom_partitions.iter() { + if !schema + .fields() + .iter() + .any(|field| field.name() == partition) + { + return Err(CreateStreamError::Custom { msg: format!("custom partition field {partition} does not exist in the schema for the stream {stream_name}"), status: StatusCode::BAD_REQUEST, }); - } - } - - for partition in custom_partition_list { - if time_partition - .as_ref() - .is_some_and(|time| time == partition) - { - return Err(CreateStreamError::Custom { - msg: format!( - "time partition {} cannot be set as custom partition", - partition - ), - status: StatusCode::BAD_REQUEST, - }); - } + } else if time_partition + .as_ref() + .is_some_and(|time| time == partition) + { + return Err(CreateStreamError::Custom { + msg: format!( + "time partition {} cannot be set as custom partition", + partition + ), + status: StatusCode::BAD_REQUEST, + }); } } } let storage = self.storage.get_object_store(); if let Err(err) = storage - .update_custom_partition_in_stream(&stream_name, custom_partition) + .update_custom_partitions_in_stream(&stream_name, custom_partitions) .await { return Err(CreateStreamError::Storage { stream_name, err }); } - stream.set_custom_partition(custom_partition); + stream.set_custom_partitions(custom_partitions.to_vec()); Ok(()) } @@ -781,8 +742,8 @@ impl Parseable { pub fn validate_static_schema( body: &Bytes, stream_name: &str, - time_partition: &str, - custom_partition: Option<&String>, + time_partition: &Option, + custom_partition: &[String], static_schema_flag: bool, ) -> Result, CreateStreamError> { if !static_schema_flag { @@ -808,34 +769,3 @@ pub fn validate_static_schema( Ok(parsed_schema) } - -pub fn validate_time_partition_limit( - time_partition_limit: &str, -) -> Result { - if !time_partition_limit.ends_with('d') { - return Err(CreateStreamError::Custom { - msg: "Missing 'd' suffix for duration value".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } - let days = &time_partition_limit[0..time_partition_limit.len() - 1]; - let Ok(days) = days.parse::() else { - return Err(CreateStreamError::Custom { - msg: "Could not convert duration to an unsigned number".to_string(), - status: StatusCode::BAD_REQUEST, - }); - }; - - Ok(days) -} - -pub fn validate_custom_partition(custom_partition: &str) -> Result<(), CreateStreamError> { - let custom_partition_list = custom_partition.split(',').collect::>(); - if custom_partition_list.len() > 3 { - return Err(CreateStreamError::Custom { - msg: "Maximum 3 custom partition keys are supported".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } - Ok(()) -} diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 7d8decdca..4abe0aa7a 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -304,14 +304,14 @@ impl Stream { ); let time_partition = self.get_time_partition(); - let custom_partition = self.get_custom_partition(); + let custom_partitions = self.get_custom_partitions(); // read arrow files on disk // convert them to parquet let schema = self .convert_disk_files_to_parquet( time_partition.as_ref(), - custom_partition.as_ref(), + custom_partitions, shutdown_signal, ) .inspect_err(|err| warn!("Error while converting arrow to parquet- {err:?}"))?; @@ -376,7 +376,7 @@ impl Stream { &self, merged_schema: &Schema, time_partition: Option<&String>, - custom_partition: Option<&String>, + custom_partitions: &Vec, ) -> WriterProperties { // Determine time partition field let time_partition_field = time_partition.map_or(DEFAULT_TIMESTAMP_KEY, |tp| tp.as_str()); @@ -400,18 +400,16 @@ impl Stream { }]; // Describe custom partition column encodings and sorting - if let Some(custom_partition) = custom_partition { - for partition in custom_partition.split(',') { - if let Ok(idx) = merged_schema.index_of(partition) { - let column_path = ColumnPath::new(vec![partition.to_string()]); - props = props.set_column_encoding(column_path, Encoding::DELTA_BYTE_ARRAY); - - sorting_column_vec.push(SortingColumn { - column_idx: idx as i32, - descending: true, - nulls_first: true, - }); - } + for partition in custom_partitions { + if let Ok(idx) = merged_schema.index_of(partition) { + let column_path = ColumnPath::new(vec![partition.to_string()]); + props = props.set_column_encoding(column_path, Encoding::DELTA_BYTE_ARRAY); + + sorting_column_vec.push(SortingColumn { + column_idx: idx as i32, + descending: true, + nulls_first: true, + }); } } @@ -425,7 +423,7 @@ impl Stream { pub fn convert_disk_files_to_parquet( &self, time_partition: Option<&String>, - custom_partition: Option<&String>, + custom_partitions: Vec, shutdown_signal: bool, ) -> Result, StagingError> { let mut schemas = Vec::new(); @@ -465,7 +463,8 @@ impl Stream { } let merged_schema = record_reader.merged_schema(); - let props = self.parquet_writer_props(&merged_schema, time_partition, custom_partition); + let props = + self.parquet_writer_props(&merged_schema, time_partition, &custom_partitions); schemas.push(merged_schema.clone()); let schema = Arc::new(merged_schema); let mut part_path = parquet_path.to_owned(); @@ -556,11 +555,11 @@ impl Stream { .time_partition_limit } - pub fn get_custom_partition(&self) -> Option { + pub fn get_custom_partitions(&self) -> Vec { self.metadata .read() .expect(LOCK_EXPECT) - .custom_partition + .custom_partitions .clone() } @@ -642,8 +641,8 @@ impl Stream { .time_partition_limit = Some(time_partition_limit); } - pub fn set_custom_partition(&self, custom_partition: Option<&String>) { - self.metadata.write().expect(LOCK_EXPECT).custom_partition = custom_partition.cloned(); + pub fn set_custom_partitions(&self, custom_partitions: Vec) { + self.metadata.write().expect(LOCK_EXPECT).custom_partitions = custom_partitions; } pub fn set_hot_tier(&self, enable: bool) { @@ -923,7 +922,7 @@ mod tests { LogStreamMetadata::default(), None, ) - .convert_disk_files_to_parquet(None, None, false)?; + .convert_disk_files_to_parquet(None, vec![], false)?; assert!(result.is_none()); // Verify metrics were set to 0 let staging_files = metrics::STAGING_FILES.with_label_values(&[&stream]).get(); @@ -1002,7 +1001,7 @@ mod tests { // Start with a fresh staging let staging = Stream::new(options, stream_name, LogStreamMetadata::default(), None); let result = staging - .convert_disk_files_to_parquet(None, None, true) + .convert_disk_files_to_parquet(None, vec![], true) .unwrap(); assert!(result.is_some()); @@ -1051,7 +1050,7 @@ mod tests { // Start with a fresh staging let staging = Stream::new(options, stream_name, LogStreamMetadata::default(), None); let result = staging - .convert_disk_files_to_parquet(None, None, true) + .convert_disk_files_to_parquet(None, vec![], true) .unwrap(); assert!(result.is_some()); @@ -1105,7 +1104,7 @@ mod tests { // Start with a fresh staging let staging = Stream::new(options, stream_name, LogStreamMetadata::default(), None); let result = staging - .convert_disk_files_to_parquet(None, None, false) + .convert_disk_files_to_parquet(None, vec![], false) .unwrap(); assert!(result.is_some()); diff --git a/src/static_schema.rs b/src/static_schema.rs index 5b1a5cada..61ef5db9b 100644 --- a/src/static_schema.rs +++ b/src/static_schema.rs @@ -58,8 +58,8 @@ pub struct Fields { pub struct Metadata {} pub fn convert_static_schema_to_arrow_schema( static_schema: StaticSchema, - time_partition: &str, - custom_partition: Option<&String>, + time_partition: &Option, + custom_partitions: &[String], ) -> Result, AnyError> { let mut parsed_schema = ParsedSchema { fields: Vec::new(), @@ -67,28 +67,18 @@ pub fn convert_static_schema_to_arrow_schema( }; let mut time_partition_exists = false; - if let Some(custom_partition) = custom_partition { - let custom_partition_list = custom_partition.split(',').collect::>(); - let mut custom_partition_exists = HashMap::with_capacity(custom_partition_list.len()); - - for partition in &custom_partition_list { - if static_schema - .fields - .iter() - .any(|field| &field.name == partition) - { - custom_partition_exists.insert(partition.to_string(), true); - } - } - - for partition in &custom_partition_list { - if !custom_partition_exists.contains_key(*partition) { - return Err(anyhow!("custom partition field {partition} does not exist in the schema for the static schema logstream")); - } + for partition in custom_partitions { + if !static_schema + .fields + .iter() + .any(|field| &field.name == partition) + { + return Err(anyhow!("custom partition field {partition} does not exist in the schema for the static schema logstream")); } } + for mut field in static_schema.fields { - if !time_partition.is_empty() && field.name == time_partition { + if time_partition.as_ref().is_some_and(|p| p == &field.name) { time_partition_exists = true; field.data_type = "datetime".to_string(); } @@ -126,12 +116,10 @@ pub fn convert_static_schema_to_arrow_schema( parsed_schema.fields.push(parsed_field); } - if !time_partition.is_empty() && !time_partition_exists { - return Err(anyhow! { - format!( - "time partition field {time_partition} does not exist in the schema for the static schema logstream" - ), - }); + if let Some(time_partition) = time_partition { + if !time_partition_exists { + return Err(anyhow!("time partition field {time_partition} does not exist in the schema for the static schema logstream")); + } } add_parseable_fields_to_static_schema(parsed_schema) } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index e02094584..a9207e70d 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -23,7 +23,10 @@ use crate::{ option::StandaloneWithDistributed, parseable::StreamNotFound, stats::FullStats, - utils::json::{deserialize_string_as_true, serialize_bool_as_true}, + utils::json::{ + deserialize_custom_partitions, deserialize_string_as_true, serialize_bool_as_true, + serialize_custom_partitions, + }, }; use chrono::Local; @@ -104,8 +107,11 @@ pub struct ObjectStoreFormat { pub time_partition: Option, #[serde(skip_serializing_if = "Option::is_none")] pub time_partition_limit: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub custom_partition: Option, + #[serde( + deserialize_with = "deserialize_custom_partitions", + serialize_with = "serialize_custom_partitions", + )] + pub custom_partitions: Vec, #[serde( default, // sets to false if not configured deserialize_with = "deserialize_string_as_true", @@ -132,8 +138,11 @@ pub struct StreamInfo { pub time_partition: Option, #[serde(skip_serializing_if = "Option::is_none")] pub time_partition_limit: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub custom_partition: Option, + #[serde( + deserialize_with = "deserialize_custom_partitions", + serialize_with = "serialize_custom_partitions", + )] + pub custom_partition: Vec, #[serde( default, // sets to false if not configured deserialize_with = "deserialize_string_as_true", @@ -217,7 +226,7 @@ impl Default for ObjectStoreFormat { retention: None, time_partition: None, time_partition_limit: None, - custom_partition: None, + custom_partitions: vec![], static_schema_flag: false, hot_tier_enabled: false, log_source: LogSource::default(), diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index bc022ecd1..9cb308990 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -176,13 +176,16 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { Ok(()) } - async fn update_custom_partition_in_stream( + async fn update_custom_partitions_in_stream( &self, stream_name: &str, - custom_partition: Option<&String>, + custom_partitions: &[String], ) -> Result<(), ObjectStorageError> { + if custom_partitions.is_empty() { + return Ok(()); + } let mut format = self.get_object_store_format(stream_name).await?; - format.custom_partition = custom_partition.cloned(); + format.custom_partitions = custom_partitions.to_vec(); let format_json = to_bytes(&format); self.put_object(&stream_json_path(stream_name), format_json) .await?; @@ -651,7 +654,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { debug!("Starting object_store_sync for stream- {stream_name}"); let stream = PARSEABLE.get_or_create_stream(&stream_name); - let custom_partition = stream.get_custom_partition(); + let custom_partitions = stream.get_custom_partitions(); for file in stream.parquet_files() { let filename = file .file_name() @@ -673,13 +676,8 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { .add(compressed_size as i64); let mut file_suffix = str::replacen(filename, ".", "/", 3); - let custom_partition_clone = custom_partition.clone(); - if custom_partition_clone.is_some() { - let custom_partition_fields = custom_partition_clone.unwrap(); - let custom_partition_list = - custom_partition_fields.split(',').collect::>(); - file_suffix = - str::replacen(filename, ".", "/", 3 + custom_partition_list.len()); + if !custom_partitions.is_empty() { + file_suffix = str::replacen(filename, ".", "/", 3 + custom_partitions.len()); } let stream_relative_path = format!("{stream_name}/{file_suffix}"); diff --git a/src/utils/json/flatten.rs b/src/utils/json/flatten.rs index f5a897764..e8830842e 100644 --- a/src/utils/json/flatten.rs +++ b/src/utils/json/flatten.rs @@ -58,14 +58,14 @@ pub fn flatten( separator: &str, time_partition: Option<&String>, time_partition_limit: Option, - custom_partition: Option<&String>, + custom_partitions: &[String], validation_required: bool, ) -> Result<(), JsonFlattenError> { match nested_value { Value::Object(nested_dict) => { if validation_required { validate_time_partition(nested_dict, time_partition, time_partition_limit)?; - validate_custom_partition(nested_dict, custom_partition)?; + validate_custom_partition(nested_dict, custom_partitions)?; } let mut map = Map::new(); flatten_object(&mut map, None, nested_dict, separator)?; @@ -79,7 +79,7 @@ pub fn flatten( separator, time_partition, time_partition_limit, - custom_partition, + custom_partitions, validation_required, )?; } @@ -94,14 +94,9 @@ pub fn flatten( // not null, empty, an object , an array, or contain a `.` when serialized pub fn validate_custom_partition( value: &Map, - custom_partition: Option<&String>, + custom_partitions: &[String], ) -> Result<(), JsonFlattenError> { - let Some(custom_partition) = custom_partition else { - return Ok(()); - }; - let custom_partition_list = custom_partition.split(',').collect::>(); - - for field in custom_partition_list { + for field in custom_partitions { let trimmed_field = field.trim(); let Some(field_value) = value.get(trimmed_field) else { return Err(JsonFlattenError::FieldNotPartOfLog( @@ -362,7 +357,7 @@ mod tests { fn flatten_single_key_string() { let mut obj = json!({"key": "value"}); let expected = obj.clone(); - flatten(&mut obj, "_", None, None, None, false).unwrap(); + flatten(&mut obj, "_", None, None, &[], false).unwrap(); assert_eq!(obj, expected); } @@ -370,7 +365,7 @@ mod tests { fn flatten_single_key_int() { let mut obj = json!({"key": 1}); let expected = obj.clone(); - flatten(&mut obj, "_", None, None, None, false).unwrap(); + flatten(&mut obj, "_", None, None, &[], false).unwrap(); assert_eq!(obj, expected); } @@ -378,7 +373,7 @@ mod tests { fn flatten_multiple_key_value() { let mut obj = json!({"key1": 1, "key2": "value2"}); let expected = obj.clone(); - flatten(&mut obj, "_", None, None, None, false).unwrap(); + flatten(&mut obj, "_", None, None, &[], false).unwrap(); assert_eq!(obj, expected); } @@ -386,7 +381,7 @@ mod tests { fn flatten_nested_single_key_value() { let mut obj = json!({"key": "value", "nested_key": {"key":"value"}}); let expected = json!({"key": "value", "nested_key.key": "value"}); - flatten(&mut obj, ".", None, None, None, false).unwrap(); + flatten(&mut obj, ".", None, None, &[], false).unwrap(); assert_eq!(obj, expected); } @@ -395,7 +390,7 @@ mod tests { let mut obj = json!({"key": "value", "nested_key": {"key1":"value1", "key2": "value2"}}); let expected = json!({"key": "value", "nested_key.key1": "value1", "nested_key.key2": "value2"}); - flatten(&mut obj, ".", None, None, None, false).unwrap(); + flatten(&mut obj, ".", None, None, &[], false).unwrap(); assert_eq!(obj, expected); } @@ -403,7 +398,7 @@ mod tests { fn nested_key_value_with_array() { let mut obj = json!({"key": "value", "nested_key": {"key1":[1,2,3]}}); let expected = json!({"key": "value", "nested_key.key1": [1,2,3]}); - flatten(&mut obj, ".", None, None, None, false).unwrap(); + flatten(&mut obj, ".", None, None, &[], false).unwrap(); assert_eq!(obj, expected); } @@ -411,7 +406,7 @@ mod tests { fn nested_obj_array() { let mut obj = json!({"key": [{"a": "value0"}, {"a": "value1"}]}); let expected = json!({"key.a": ["value0", "value1"]}); - flatten(&mut obj, ".", None, None, None, false).unwrap(); + flatten(&mut obj, ".", None, None, &[], false).unwrap(); assert_eq!(obj, expected); } @@ -419,7 +414,7 @@ mod tests { fn nested_obj_array_nulls() { let mut obj = json!({"key": [{"a": "value0"}, {"a": "value1", "b": "value1"}]}); let expected = json!({"key.a": ["value0", "value1"], "key.b": [null, "value1"]}); - flatten(&mut obj, ".", None, None, None, false).unwrap(); + flatten(&mut obj, ".", None, None, &[], false).unwrap(); assert_eq!(obj, expected); } @@ -427,7 +422,7 @@ mod tests { fn nested_obj_array_nulls_reversed() { let mut obj = json!({"key": [{"a": "value0", "b": "value0"}, {"a": "value1"}]}); let expected = json!({"key.a": ["value0", "value1"], "key.b": ["value0", null]}); - flatten(&mut obj, ".", None, None, None, false).unwrap(); + flatten(&mut obj, ".", None, None, &[], false).unwrap(); assert_eq!(obj, expected); } @@ -435,7 +430,7 @@ mod tests { fn nested_obj_array_nested_obj() { let mut obj = json!({"key": [{"a": {"p": 0}, "b": "value0"}, {"b": "value1"}]}); let expected = json!({"key.a.p": [0, null], "key.b": ["value0", "value1"]}); - flatten(&mut obj, ".", None, None, None, false).unwrap(); + flatten(&mut obj, ".", None, None, &[], false).unwrap(); assert_eq!(obj, expected); } @@ -443,14 +438,14 @@ mod tests { fn nested_obj_array_nested_obj_array() { let mut obj = json!({"key": [{"a": [{"p": "value0", "q": "value0"}, {"p": "value1", "q": null}], "b": "value0"}, {"b": "value1"}]}); let expected = json!({"key.a.p": [["value0", "value1"], null], "key.a.q": [["value0", null], null], "key.b": ["value0", "value1"]}); - flatten(&mut obj, ".", None, None, None, false).unwrap(); + flatten(&mut obj, ".", None, None, &[], false).unwrap(); assert_eq!(obj, expected); } #[test] fn flatten_mixed_object() { let mut obj = json!({"a": 42, "arr": ["1", {"key": "2"}, {"key": {"nested": "3"}}]}); - assert!(flatten(&mut obj, ".", None, None, None, false).is_err()); + assert!(flatten(&mut obj, ".", None, None, &[], false).is_err()); } #[test] @@ -543,22 +538,22 @@ mod tests { let mut value = json!({ "a": 1, }); - assert!(flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).is_ok()); + assert!(flatten(&mut value, "_", None, None, &["a".to_string()], true).is_ok()); let mut value = json!({ "a": true, }); - assert!(flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).is_ok()); + assert!(flatten(&mut value, "_", None, None, &["a".to_string()], true).is_ok()); let mut value = json!({ "a": "yes", }); - assert!(flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).is_ok()); + assert!(flatten(&mut value, "_", None, None, &["a".to_string()], true).is_ok()); let mut value = json!({ "a": -1, }); - assert!(flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).is_ok()); + assert!(flatten(&mut value, "_", None, None, &["a".to_string()], true).is_ok()); } #[test] @@ -567,7 +562,7 @@ mod tests { "a": null, }); matches!( - flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).unwrap_err(), + flatten(&mut value, "_", None, None, &["a".to_string()], true).unwrap_err(), JsonFlattenError::FieldEmptyOrNull(_) ); @@ -575,7 +570,7 @@ mod tests { "a": "", }); matches!( - flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).unwrap_err(), + flatten(&mut value, "_", None, None, &["a".to_string()], true).unwrap_err(), JsonFlattenError::FieldEmptyOrNull(_) ); @@ -583,7 +578,7 @@ mod tests { "a": {"b": 1}, }); matches!( - flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).unwrap_err(), + flatten(&mut value, "_", None, None, &["a".to_string()], true).unwrap_err(), JsonFlattenError::FieldIsObject(_) ); @@ -591,7 +586,7 @@ mod tests { "a": ["b", "c"], }); matches!( - flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).unwrap_err(), + flatten(&mut value, "_", None, None, &["a".to_string()], true).unwrap_err(), JsonFlattenError::FieldIsArray(_) ); @@ -599,7 +594,7 @@ mod tests { "a": "b.c", }); matches!( - flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).unwrap_err(), + flatten(&mut value, "_", None, None, &["a".to_string()], true).unwrap_err(), JsonFlattenError::FieldContainsPeriod(_) ); @@ -607,7 +602,7 @@ mod tests { "a": 1.0, }); matches!( - flatten(&mut value, "_", None, None, Some(&"a".to_string()), true).unwrap_err(), + flatten(&mut value, "_", None, None, &["a".to_string()], true).unwrap_err(), JsonFlattenError::FieldContainsPeriod(_) ); } diff --git a/src/utils/json/mod.rs b/src/utils/json/mod.rs index efa9cb2e2..2ec25254a 100644 --- a/src/utils/json/mod.rs +++ b/src/utils/json/mod.rs @@ -25,6 +25,9 @@ use serde_json; use serde_json::Value; use crate::event::format::LogSource; +use crate::handlers::http::modal::utils::logstream_utils::{ + parse_custom_partition, CustomPartition, +}; use crate::metadata::SchemaVersion; pub mod flatten; @@ -36,7 +39,7 @@ pub fn flatten_json_body( body: Value, time_partition: Option<&String>, time_partition_limit: Option, - custom_partition: Option<&String>, + custom_partitions: &[String], schema_version: SchemaVersion, validation_required: bool, log_source: &LogSource, @@ -58,7 +61,7 @@ pub fn flatten_json_body( "_", time_partition, time_partition_limit, - custom_partition, + custom_partitions, validation_required, )?; Ok(nested_value) @@ -68,7 +71,7 @@ pub fn convert_array_to_object( body: Value, time_partition: Option<&String>, time_partition_limit: Option, - custom_partition: Option<&String>, + custom_partitions: &[String], schema_version: SchemaVersion, log_source: &LogSource, ) -> Result, anyhow::Error> { @@ -76,7 +79,7 @@ pub fn convert_array_to_object( body, time_partition, time_partition_limit, - custom_partition, + custom_partitions, schema_version, true, log_source, @@ -142,6 +145,54 @@ where } } +struct PartitionsFromStr; + +impl Visitor<'_> for PartitionsFromStr { + type Value = Vec; + + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { + formatter.write_str("a comma-separated list of custom partitions (e.g., \"a,b,c\")") + } + + fn visit_borrowed_str(self, v: &'_ str) -> Result + where + E: serde::de::Error, + { + self.visit_str(v) + } + + fn visit_str(self, s: &str) -> Result + where + E: serde::de::Error, + { + parse_custom_partition(s) + .map_err(|e| E::custom(format!("Expected list: \"a, b, c\", got: {s}; error: {e}",))) + } +} + +/// Used to convert "a,b,c" to ["a", "b", "c"], to support backward compatibility. +pub fn deserialize_custom_partitions<'de, D>( + deserializer: D, +) -> Result, D::Error> +where + D: serde::Deserializer<'de>, +{ + deserializer.deserialize_str(PartitionsFromStr) +} + +/// Used to convert ["a", "b", "c"] to "a,b,c" for backward compatibility. +pub fn serialize_custom_partitions(value: &[String], serializer: S) -> Result +where + S: serde::Serializer, +{ + if value.is_empty() { + // Skip serializing this field + serializer.serialize_none() + } else { + serializer.serialize_str(&value.join(",")) + } +} + #[cfg(test)] mod tests { use crate::event::format::LogSource; @@ -159,7 +210,7 @@ mod tests { value, None, None, - None, + &[], crate::metadata::SchemaVersion::V1, false, &LogSource::default() @@ -178,7 +229,7 @@ mod tests { value, None, None, - None, + &[], crate::metadata::SchemaVersion::V1, false, &LogSource::default() From a67d8bf885236b25a2c1476936bf6bbd1a46f70b Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 16 Feb 2025 18:04:29 +0530 Subject: [PATCH 2/2] refactor: DRY --- src/handlers/http/modal/utils/ingest_utils.rs | 19 +++---------------- 1 file changed, 3 insertions(+), 16 deletions(-) diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 1e6f53188..309e74857 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -31,10 +31,9 @@ use crate::{ kinesis::{flatten_kinesis_logs, Message}, }, metadata::SchemaVersion, - parseable::{StreamNotFound, PARSEABLE}, + parseable::PARSEABLE, storage::StreamType, utils::json::{convert_array_to_object, flatten::convert_to_array}, - LOCK_EXPECT, }; pub async fn flatten_and_push_logs( @@ -67,9 +66,7 @@ pub async fn push_logs( ) -> Result<(), PostError> { let stream = PARSEABLE.get_stream(stream_name)?; let time_partition = stream.get_time_partition(); - let time_partition_limit = PARSEABLE - .get_stream(stream_name)? - .get_time_partition_limit(); + let time_partition_limit = stream.get_time_partition_limit(); let static_schema_flag = stream.get_static_schema_flag(); let custom_partitions = stream.get_custom_partitions(); let schema_version = stream.get_schema_version(); @@ -101,17 +98,7 @@ pub async fn push_logs( _ => Utc::now().naive_utc(), }; let custom_partition_values = get_custom_partition_values(&value, &custom_partitions); - let schema = PARSEABLE - .streams - .read() - .unwrap() - .get(stream_name) - .ok_or_else(|| StreamNotFound(stream_name.to_owned()))? - .metadata - .read() - .expect(LOCK_EXPECT) - .schema - .clone(); + let schema = stream.get_schema_raw(); let (rb, is_first_event) = into_event_batch( value, schema,