diff --git a/src/query/listing_table_builder.rs b/src/query/listing_table_builder.rs index 685a34a4b..0a3edf2bf 100644 --- a/src/query/listing_table_builder.rs +++ b/src/query/listing_table_builder.rs @@ -25,11 +25,11 @@ use datafusion::{ listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}, }, error::DataFusionError, - logical_expr::{col, SortExpr}, + logical_expr::col, }; -use futures_util::{future, stream::FuturesUnordered, Future, TryStreamExt}; +use futures_util::{stream::FuturesUnordered, Future, TryStreamExt}; use itertools::Itertools; -use object_store::{ObjectMeta, ObjectStore}; +use object_store::{path::Path, ObjectMeta, ObjectStore}; use crate::{ event::DEFAULT_TIMESTAMP_KEY, @@ -60,25 +60,25 @@ impl ListingTableBuilder { client: Arc, time_filters: &[PartialTimeFilter], ) -> Result { + // Extract the minimum start time from the time filters. let start_time = time_filters .iter() - .filter_map(|x| match x { - PartialTimeFilter::Low(Bound::Excluded(x)) => Some(x), - PartialTimeFilter::Low(Bound::Included(x)) => Some(x), + .filter_map(|filter| match filter { + PartialTimeFilter::Low(Bound::Excluded(x)) + | PartialTimeFilter::Low(Bound::Included(x)) => Some(x), _ => None, }) - .min() - .cloned(); + .min(); + // Extract the maximum end time from the time filters. let end_time = time_filters .iter() - .filter_map(|x| match x { - PartialTimeFilter::High(Bound::Excluded(x)) => Some(x), - PartialTimeFilter::High(Bound::Included(x)) => Some(x), + .filter_map(|filter| match filter { + PartialTimeFilter::High(Bound::Excluded(x)) + | PartialTimeFilter::High(Bound::Included(x)) => Some(x), _ => None, }) - .max() - .cloned(); + .max(); let Some((start_time, end_time)) = start_time.zip(end_time) else { return Err(DataFusionError::NotImplemented( @@ -87,6 +87,7 @@ impl ListingTableBuilder { )); }; + // Generate prefixes for the given time range let prefixes = TimePeriod::new( start_time.and_utc(), end_time.and_utc(), @@ -94,55 +95,41 @@ impl ListingTableBuilder { ) .generate_prefixes(); - let prefixes = prefixes - .into_iter() - .map(|entry| { - let path = - relative_path::RelativePathBuf::from(format!("{}/{}", &self.stream, entry)); - storage.absolute_url(path.as_relative_path()).to_string() - }) - .collect_vec(); - - let mut minute_resolve: HashMap> = HashMap::new(); + // Categorizes prefixes into "minute" and general resolve lists. + let mut minute_resolve = HashMap::>::new(); let mut all_resolve = Vec::new(); - for prefix in prefixes { - let components = prefix.split_terminator('/'); - if components.last().is_some_and(|x| x.starts_with("minute")) { - let hour_prefix = &prefix[0..prefix.rfind("minute").expect("minute exists")]; + let path = relative_path::RelativePathBuf::from(format!("{}/{}", &self.stream, prefix)); + storage.absolute_url(path.as_relative_path()).to_string(); + if let Some(pos) = prefix.rfind("minute") { + let hour_prefix = &prefix[..pos]; minute_resolve .entry(hour_prefix.to_owned()) - .and_modify(|list| list.push(prefix)) - .or_default(); + .or_default() + .push(prefix); } else { - all_resolve.push(prefix) + all_resolve.push(prefix); } } - type ResolveFuture = Pin< - Box, object_store::Error>> + Send + 'static>, - >; - // Pin>>> + Send + 'async_trait>> - // BoxStream<'_, Result> + /// Resolve all prefixes asynchronously and collect the object metadata. + type ResolveFuture = + Pin, object_store::Error>> + Send>>; let tasks: FuturesUnordered = FuturesUnordered::new(); - - for (listing_prefix, prefix) in minute_resolve { + for (listing_prefix, prefixes) in minute_resolve { let client = Arc::clone(&client); tasks.push(Box::pin(async move { - let mut list = client - .list(Some(&object_store::path::Path::from(listing_prefix))) - .try_collect::>() - .await?; + let path = Path::from(listing_prefix); + let mut objects = client.list(Some(&path)).try_collect::>().await?; - list.retain(|object| { - prefix.iter().any(|prefix| { - object - .location + objects.retain(|obj| { + prefixes.iter().any(|prefix| { + obj.location .prefix_matches(&object_store::path::Path::from(prefix.as_ref())) }) }); - Ok(list) + Ok(objects) })); } @@ -157,25 +144,23 @@ impl ListingTableBuilder { })); } - let res: Vec> = tasks - .and_then(|res| { - future::ok( - res.into_iter() - .map(|res| res.location.to_string()) - .collect_vec(), - ) - }) - .try_collect() + let listing = tasks + .try_collect::>>() .await - .map_err(|err| DataFusionError::External(Box::new(err)))?; - - let mut res = res.into_iter().flatten().collect_vec(); - res.sort(); - res.reverse(); + .map_err(|err| DataFusionError::External(Box::new(err)))? + .into_iter() + .flat_map(|res| { + res.into_iter() + .map(|obj| obj.location.to_string()) + .collect::>() + }) + .sorted() + .rev() + .collect_vec(); Ok(Self { stream: self.stream, - listing: res, + listing, }) } @@ -188,25 +173,21 @@ impl ListingTableBuilder { if self.listing.is_empty() { return Ok(None); } - let file_sort_order: Vec>; - let file_format = ParquetFormat::default().with_enable_pruning(true); - if let Some(time_partition) = time_partition { - file_sort_order = vec![vec![col(time_partition).sort(true, false)]]; - } else { - file_sort_order = vec![vec![col(DEFAULT_TIMESTAMP_KEY).sort(true, false)]]; - } + let file_sort_order = vec![vec![time_partition + .map_or_else(|| col(DEFAULT_TIMESTAMP_KEY), col) + .sort(true, false)]]; + let file_format = ParquetFormat::default().with_enable_pruning(true); let listing_options = ListingOptions::new(Arc::new(file_format)) .with_file_extension(".parquet") .with_file_sort_order(file_sort_order) .with_collect_stat(true) .with_target_partitions(1); - let config = ListingTableConfig::new_with_multi_paths(map(self.listing)) .with_listing_options(listing_options) .with_schema(schema); + let listing_table = ListingTable::try_new(config)?; - let listing_table = Arc::new(ListingTable::try_new(config)?); - Ok(Some(listing_table)) + Ok(Some(Arc::new(listing_table))) } } diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index f27cb6998..0766aa2e5 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -333,12 +333,12 @@ impl TableProvider for StandardTableProvider { .await .map_err(|err| DataFusionError::Plan(err.to_string()))?; let time_partition = object_store_format.time_partition; - let mut time_filters = extract_primary_filter(filters, time_partition.clone()); + let mut time_filters = extract_primary_filter(filters, &time_partition); if time_filters.is_empty() { return Err(DataFusionError::Plan("potentially unbounded query on time range. Table scanning requires atleast one time bound".to_string())); } - if include_now(filters, time_partition.clone()) { + if include_now(filters, &time_partition) { if let Some(records) = event::STREAM_WRITERS.recordbatches_cloned(&self.stream, &self.schema) { @@ -683,11 +683,11 @@ pub enum PartialTimeFilter { } impl PartialTimeFilter { - fn try_from_expr(expr: &Expr, time_partition: Option) -> Option { + fn try_from_expr(expr: &Expr, time_partition: &Option) -> Option { let Expr::BinaryExpr(binexpr) = expr else { return None; }; - let (op, time) = extract_timestamp_bound(binexpr.clone(), time_partition)?; + let (op, time) = extract_timestamp_bound(binexpr, time_partition)?; let value = match op { Operator::Gt => PartialTimeFilter::Low(Bound::Excluded(time)), Operator::GtEq => PartialTimeFilter::Low(Bound::Included(time)), @@ -814,7 +814,7 @@ fn return_listing_time_filters( } } -pub fn include_now(filters: &[Expr], time_partition: Option) -> bool { +pub fn include_now(filters: &[Expr], time_partition: &Option) -> bool { let current_minute = Utc::now() .with_second(0) .and_then(|x| x.with_nanosecond(0)) @@ -846,7 +846,7 @@ fn expr_in_boundary(filter: &Expr) -> bool { let Expr::BinaryExpr(binexpr) = filter else { return false; }; - let Some((op, time)) = extract_timestamp_bound(binexpr.clone(), None) else { + let Some((op, time)) = extract_timestamp_bound(binexpr, &None) else { return false; }; @@ -860,39 +860,33 @@ fn expr_in_boundary(filter: &Expr) -> bool { ) } -fn extract_from_lit(expr: BinaryExpr, time_partition: Option) -> Option { - let mut column_name: String = String::default(); - if let Expr::Column(column) = *expr.left { - column_name = column.name; - } - if let Expr::Literal(value) = *expr.right { - match value { - ScalarValue::TimestampMillisecond(Some(value), _) => { - Some(DateTime::from_timestamp_millis(value).unwrap().naive_utc()) - } - ScalarValue::TimestampNanosecond(Some(value), _) => { - Some(DateTime::from_timestamp_nanos(value).naive_utc()) - } - ScalarValue::Utf8(Some(str_value)) => { - if time_partition.is_some() && column_name == time_partition.unwrap() { - Some(str_value.parse::().unwrap()) - } else { - None - } - } - _ => None, - } - } else { - None - } -} - -/* `BinaryExp` doesn't implement `Copy` */ fn extract_timestamp_bound( - binexpr: BinaryExpr, - time_partition: Option, + binexpr: &BinaryExpr, + time_partition: &Option, ) -> Option<(Operator, NaiveDateTime)> { - Some((binexpr.op, extract_from_lit(binexpr, time_partition)?)) + let Expr::Literal(value) = binexpr.right.as_ref() else { + return None; + }; + + let is_time_partition = match (binexpr.left.as_ref(), time_partition) { + (Expr::Column(column), Some(time_partition)) => &column.name == time_partition, + _ => false, + }; + + match value { + ScalarValue::TimestampMillisecond(Some(value), _) => Some(( + binexpr.op, + DateTime::from_timestamp_millis(*value).unwrap().naive_utc(), + )), + ScalarValue::TimestampNanosecond(Some(value), _) => Some(( + binexpr.op, + DateTime::from_timestamp_nanos(*value).naive_utc(), + )), + ScalarValue::Utf8(Some(str_value)) if is_time_partition => { + Some((binexpr.op, str_value.parse::().unwrap())) + } + _ => None, + } } async fn collect_manifest_files( @@ -917,24 +911,26 @@ async fn collect_manifest_files( .collect()) } -// extract start time and end time from filter preficate +// Extract start time and end time from filter predicate fn extract_primary_filter( filters: &[Expr], - time_partition: Option, + time_partition: &Option, ) -> Vec { - let mut time_filters = Vec::new(); - filters.iter().for_each(|expr| { - let _ = expr.apply(&mut |expr| { - let time = PartialTimeFilter::try_from_expr(expr, time_partition.clone()); - if let Some(time) = time { - time_filters.push(time); - Ok(TreeNodeRecursion::Stop) - } else { - Ok(TreeNodeRecursion::Jump) - } - }); - }); - time_filters + filters + .iter() + .filter_map(|expr| { + let mut time_filter = None; + let _ = expr.apply(&mut |expr| { + if let Some(time) = PartialTimeFilter::try_from_expr(expr, time_partition) { + time_filter = Some(time); + Ok(TreeNodeRecursion::Stop) // Stop further traversal + } else { + Ok(TreeNodeRecursion::Jump) // Skip this node + } + }); + time_filter + }) + .collect() } trait ManifestExt: ManifestFile { @@ -1048,11 +1044,16 @@ fn satisfy_constraints(value: CastRes, op: Operator, stats: &TypedStatistics) -> mod tests { use std::ops::Add; - use chrono::{DateTime, Duration, NaiveDate, NaiveTime, Utc}; + use chrono::{DateTime, Duration, NaiveDate, NaiveDateTime, NaiveTime, Utc}; + use datafusion::{ + logical_expr::{BinaryExpr, Operator}, + prelude::Expr, + scalar::ScalarValue, + }; use crate::catalog::snapshot::ManifestItem; - use super::{is_overlapping_query, PartialTimeFilter}; + use super::{extract_timestamp_bound, is_overlapping_query, PartialTimeFilter}; fn datetime_min(year: i32, month: u32, day: u32) -> DateTime { NaiveDate::from_ymd_opt(year, month, day) @@ -1135,4 +1136,144 @@ mod tests { assert!(!res) } + + #[test] + fn timestamp_in_milliseconds() { + let binexpr = BinaryExpr { + left: Box::new(Expr::Column("timestamp_column".into())), + op: Operator::Eq, + right: Box::new(Expr::Literal(ScalarValue::TimestampMillisecond( + Some(1672531200000), + None, + ))), + }; + + let time_partition = Some("timestamp_column".to_string()); + let result = extract_timestamp_bound(&binexpr, &time_partition); + + let expected = Some(( + Operator::Eq, + NaiveDateTime::parse_from_str("2023-01-01 00:00:00", "%Y-%m-%d %H:%M:%S").unwrap(), + )); + + assert_eq!(result, expected); + } + + #[test] + fn timestamp_in_nanoseconds() { + let binexpr = BinaryExpr { + left: Box::new(Expr::Column("timestamp_column".into())), + op: Operator::Gt, + right: Box::new(Expr::Literal(ScalarValue::TimestampNanosecond( + Some(1672531200000000000), + None, + ))), + }; + + let time_partition = Some("timestamp_column".to_string()); + let result = extract_timestamp_bound(&binexpr, &time_partition); + + let expected = Some(( + Operator::Gt, + NaiveDateTime::parse_from_str("2023-01-01 00:00:00", "%Y-%m-%d %H:%M:%S").unwrap(), + )); + + assert_eq!(result, expected); + } + + #[test] + fn string_timestamp() { + let timestamp = "2023-01-01T00:00:00"; + let binexpr = BinaryExpr { + left: Box::new(Expr::Column("timestamp_column".into())), + op: Operator::Lt, + right: Box::new(Expr::Literal(ScalarValue::Utf8(Some(timestamp.to_owned())))), + }; + + let time_partition = Some("timestamp_column".to_string()); + let result = extract_timestamp_bound(&binexpr, &time_partition); + + let expected = Some(( + Operator::Lt, + NaiveDateTime::parse_from_str(timestamp, "%Y-%m-%dT%H:%M:%S").unwrap(), + )); + + assert_eq!(result, expected); + } + + #[test] + fn unexpected_utf8_column() { + let timestamp = "2023-01-01T00:00:00"; + let binexpr = BinaryExpr { + left: Box::new(Expr::Column("other_column".into())), + op: Operator::Eq, + right: Box::new(Expr::Literal(ScalarValue::Utf8(Some(timestamp.to_owned())))), + }; + + let time_partition = Some("timestamp_column".to_string()); + let result = extract_timestamp_bound(&binexpr, &time_partition); + + assert!(result.is_none()); + } + + #[test] + fn unsupported_literal_type() { + let binexpr = BinaryExpr { + left: Box::new(Expr::Column("timestamp_column".into())), + op: Operator::Eq, + right: Box::new(Expr::Literal(ScalarValue::Int32(Some(42)))), + }; + + let time_partition = Some("timestamp_column".to_string()); + let result = extract_timestamp_bound(&binexpr, &time_partition); + + assert!(result.is_none()); + } + + #[test] + fn no_literal_on_right() { + let binexpr = BinaryExpr { + left: Box::new(Expr::Column("timestamp_column".into())), + op: Operator::Eq, + right: Box::new(Expr::Column("other_column".into())), + }; + + let time_partition = Some("timestamp_column".to_string()); + let result = extract_timestamp_bound(&binexpr, &time_partition); + + assert!(result.is_none()); + } + + #[test] + fn non_time_partition_timestamps() { + let binexpr = BinaryExpr { + left: Box::new(Expr::Column("timestamp_column".into())), + op: Operator::Eq, + right: Box::new(Expr::Literal(ScalarValue::TimestampMillisecond( + Some(1672531200000), + None, + ))), + }; + + let time_partition = None; + let result = extract_timestamp_bound(&binexpr, &time_partition); + let expected = Some(( + Operator::Eq, + NaiveDateTime::parse_from_str("2023-01-01T00:00:00", "%Y-%m-%dT%H:%M:%S").unwrap(), + )); + + assert_eq!(result, expected); + + let binexpr = BinaryExpr { + left: Box::new(Expr::Column("timestamp_column".into())), + op: Operator::Eq, + right: Box::new(Expr::Literal(ScalarValue::TimestampNanosecond( + Some(1672531200000000000), + None, + ))), + }; + let result = extract_timestamp_bound(&binexpr, &time_partition); + + assert_eq!(result, expected); + } } diff --git a/src/utils/arrow/flight.rs b/src/utils/arrow/flight.rs index 2fb6bd849..6781f2d6e 100644 --- a/src/utils/arrow/flight.rs +++ b/src/utils/arrow/flight.rs @@ -135,7 +135,7 @@ pub fn send_to_ingester(start: i64, end: i64) -> bool { ); let ex = [Expr::BinaryExpr(ex1), Expr::BinaryExpr(ex2)]; - CONFIG.parseable.mode == Mode::Query && include_now(&ex, None) + CONFIG.parseable.mode == Mode::Query && include_now(&ex, &None) } fn lit_timestamp_milli(time: i64) -> Expr {