diff --git a/.gitignore b/.gitignore index efe3eb1..e007bda 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,6 @@ Cargo.lock # Added by cargo /target + +.idea +.DS_Store diff --git a/Cargo.toml b/Cargo.toml index d728fce..e01781c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,19 +28,20 @@ keywords = ["arrow", "arrow-rs", "datafusion"] rust-version = "1.80" [dependencies] -arrow = "54" -arrow-schema = "54" +arrow = { version = "54.1.0" } +arrow-schema = { version = "54.1.0" } async-trait = "0.1" +chrono = "= 0.4.39" dashmap = "6" -datafusion = "45" -datafusion-common = "45" -datafusion-expr = "45" -datafusion-functions = "45" -datafusion-functions-aggregate = "45" -datafusion-optimizer = "45" -datafusion-physical-expr = "45" -datafusion-physical-plan = "45" -datafusion-sql = "45" +datafusion = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "1c92803" } +datafusion-common = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "1c92803" } +datafusion-expr = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "1c92803" } +datafusion-functions = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "1c92803" } +datafusion-functions-aggregate = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "1c92803" } +datafusion-optimizer = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "1c92803" } +datafusion-physical-expr = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "1c92803" } +datafusion-physical-plan = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "1c92803" } +datafusion-sql = { git = "https://github.com/polygon-io/arrow-datafusion", rev = "1c92803" } futures = "0.3" itertools = "0.13" log = "0.4" diff --git a/deny.toml b/deny.toml index 90ed3af..a24420a 100644 --- a/deny.toml +++ b/deny.toml @@ -23,6 +23,7 @@ allow = [ "BSD-2-Clause", "BSD-3-Clause", "CC0-1.0", - "Unicode-3.0" + "Unicode-3.0", + "Zlib" ] version = 2 diff --git a/src/lib.rs b/src/lib.rs index 2ccb398..8b26d85 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -43,3 +43,18 @@ pub mod materialized; /// An implementation of Query Rewriting, an optimization that rewrites queries to make use of materialized views. pub mod rewrite; + +/// Configuration options for materialized view related features. +#[derive(Debug, Clone)] +pub struct MaterializedConfig { + /// Whether or not query rewriting should exploit this materialized view. + pub use_in_query_rewrite: bool, +} + +impl Default for MaterializedConfig { + fn default() -> Self { + Self { + use_in_query_rewrite: true, + } + } +} diff --git a/src/materialized.rs b/src/materialized.rs index 4a0a7ac..e089591 100644 --- a/src/materialized.rs +++ b/src/materialized.rs @@ -44,6 +44,8 @@ use datafusion::{ use datafusion_expr::LogicalPlan; use itertools::Itertools; +use crate::MaterializedConfig; + /// The identifier of the column that [`RowMetadataSource`](row_metadata::RowMetadataSource) implementations should store row metadata in. pub const META_COLUMN: &str = "__meta"; @@ -102,6 +104,12 @@ pub fn cast_to_listing_table(table: &dyn TableProvider) -> Option<&dyn ListingTa pub trait Materialized: ListingTableLike { /// The query that defines this materialized view. fn query(&self) -> LogicalPlan; + + /// Configuration to control materialized view related features. + /// By default, returns the default value for [`MaterializedConfig`] + fn config(&self) -> MaterializedConfig { + MaterializedConfig::default() + } } /// Register a [`Materialized`] implementation in this registry. diff --git a/src/materialized/dependencies.rs b/src/materialized/dependencies.rs index e5bc282..26e1047 100644 --- a/src/materialized/dependencies.rs +++ b/src/materialized/dependencies.rs @@ -31,7 +31,6 @@ use datafusion_expr::{ col, lit, utils::split_conjunction, Expr, LogicalPlan, LogicalPlanBuilder, TableScan, }; use datafusion_functions::string::expr_fn::{concat, concat_ws}; -use datafusion_optimizer::{analyzer::expand_wildcard_rule::ExpandWildcardRule, AnalyzerRule}; use datafusion_sql::TableReference; use itertools::{Either, Itertools}; use std::{collections::HashSet, sync::Arc}; @@ -188,9 +187,18 @@ impl TableFunctionImpl for StaleFilesUdtf { )? .aggregate( vec![ - // Trim the final path element - regexp_replace(col("file_path"), lit(r"/[^/]*$"), lit("/"), None) - .alias("existing_target"), + // We want to omit the file name along with any "special" partitions + // from the path before comparing it to the target partition. Special + // partitions must be leaf most nodes and are designated by a leading + // underscore. These are useful for adding additional information to a + // filename without affecting partitioning or staleness checks. + regexp_replace( + col("file_path"), + lit(r"(/_[^/=]+=[^/]+)*/[^/]*$"), + lit("/"), + None, + ) + .alias("existing_target"), ], vec![max(col("last_modified")).alias("target_last_modified")], )? @@ -250,9 +258,6 @@ pub fn mv_dependencies_plan( .filter_map(|(i, f)| partition_cols.contains(f.name()).then_some(i)) .collect(); - // First expand all wildcards - let plan = ExpandWildcardRule {}.analyze(plan, config_options)?; - let pruned_plan_with_source_files = if partition_cols.is_empty() { get_source_files_all_partitions( materialized_view, @@ -1173,8 +1178,7 @@ mod test { } let cases = &[ - TestCase { - name: "un-transformed partition column", + TestCase { name: "un-transformed partition column", query_to_analyze: "SELECT column1 AS partition_column, concat(column2, column3) AS some_value FROM t1", table_name: "m1", @@ -1206,6 +1210,38 @@ mod test { "+--------------------------------+----------------------+-----------------------+----------+", ], }, + TestCase { name: "omit 'special' partition columns", + query_to_analyze: + "SELECT column1 AS partition_column, concat(column2, column3) AS some_value FROM t1", + table_name: "m1", + table_path: ListingTableUrl::parse("s3://m1/").unwrap(), + partition_cols: vec!["partition_column"], + file_extension: ".parquet", + expected_output: vec![ + "+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+", + "| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |", + "+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+", + "| s3://m1/partition_column=2021/ | datafusion | test | t1 | s3://t1/column1=2021/data.01.parquet | 2023-07-11T16:29:26 |", + "| s3://m1/partition_column=2022/ | datafusion | test | t1 | s3://t1/column1=2022/data.01.parquet | 2023-07-11T16:45:22 |", + "| s3://m1/partition_column=2023/ | datafusion | test | t1 | s3://t1/column1=2023/data.01.parquet | 2023-07-11T16:45:44 |", + "+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+", + ], + // second file is old + file_metadata: " + ('datafusion', 'test', 'm1', 's3://m1/partition_column=2021/_v=123/data.01.parquet', '2023-07-12T16:00:00Z', 0), + ('datafusion', 'test', 'm1', 's3://m1/partition_column=2022/_v=123/data.01.parquet', '2023-07-10T16:00:00Z', 0), + ('datafusion', 'test', 'm1', 's3://m1/partition_column=2023/_v=123/data.01.parquet', '2023-07-12T16:00:00Z', 0) + ", + expected_stale_files_output: vec![ + "+--------------------------------+----------------------+-----------------------+----------+", + "| target | target_last_modified | sources_last_modified | is_stale |", + "+--------------------------------+----------------------+-----------------------+----------+", + "| s3://m1/partition_column=2021/ | 2023-07-12T16:00:00 | 2023-07-11T16:29:26 | false |", + "| s3://m1/partition_column=2022/ | 2023-07-10T16:00:00 | 2023-07-11T16:45:22 | true |", + "| s3://m1/partition_column=2023/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:44 | false |", + "+--------------------------------+----------------------+-----------------------+----------+", + ], + }, TestCase { name: "transform year/month/day partition into timestamp partition", query_to_analyze: " diff --git a/src/materialized/file_metadata.rs b/src/materialized/file_metadata.rs index de67d1c..2cd2d63 100644 --- a/src/materialized/file_metadata.rs +++ b/src/materialized/file_metadata.rs @@ -53,12 +53,16 @@ use crate::materialized::cast_to_listing_table; pub struct FileMetadata { table_schema: SchemaRef, catalog_list: Arc, + metadata_provider: Arc, } impl FileMetadata { /// Construct a new [`FileMetadata`] table provider that lists files for all /// tables in the provided catalog list. - pub fn new(catalog_list: Arc) -> Self { + pub fn new( + catalog_list: Arc, + metadata_provider: Arc, + ) -> Self { Self { table_schema: Arc::new(Schema::new(vec![ Field::new("table_catalog", DataType::Utf8, false), @@ -73,6 +77,7 @@ impl FileMetadata { Field::new("size", DataType::UInt64, false), ])), catalog_list, + metadata_provider, } } } @@ -114,6 +119,7 @@ impl TableProvider for FileMetadata { filters, limit, self.catalog_list.clone(), + self.metadata_provider.clone(), )?; Ok(Arc::new(exec)) @@ -136,6 +142,7 @@ pub struct FileMetadataExec { limit: Option, metrics: ExecutionPlanMetricsSet, catalog_list: Arc, + metadata_provider: Arc, } impl FileMetadataExec { @@ -145,6 +152,7 @@ impl FileMetadataExec { filters: Vec>, limit: Option, catalog_list: Arc, + metadata_provider: Arc, ) -> Result { let projected_schema = match projection.as_ref() { Some(projection) => Arc::new(table_schema.project(projection)?), @@ -167,6 +175,7 @@ impl FileMetadataExec { limit, metrics: ExecutionPlanMetricsSet::new(), catalog_list, + metadata_provider, }; Ok(exec) @@ -319,6 +328,7 @@ impl FileMetadataExec { let table_schema = self.table_schema.clone(); let catalog_list = self.catalog_list.clone(); + let metadata_provider = self.metadata_provider.clone(); let record_batch = async move { // If we cannot determine the catalog, build from the entire catalog list. @@ -328,6 +338,7 @@ impl FileMetadataExec { debug!("No catalog filter exists, returning entire catalog list."); return FileMetadataBuilder::build_from_catalog_list( catalog_list, + metadata_provider, table_schema, context, ) @@ -352,6 +363,7 @@ impl FileMetadataExec { return FileMetadataBuilder::build_from_catalog( &catalog_name, catalog_provider, + metadata_provider, table_schema, context, ) @@ -379,6 +391,7 @@ impl FileMetadataExec { &catalog_name, &schema_name, schema_provider, + metadata_provider, table_schema, context, ) @@ -402,6 +415,7 @@ impl FileMetadataExec { &schema_name, &table_name, table_provider, + metadata_provider, table_schema, context, ) @@ -448,6 +462,7 @@ impl DisplayAs for FileMetadataExec { struct FileMetadataBuilder { schema: SchemaRef, + metadata_provider: Arc, catalog_names: StringBuilder, schema_names: StringBuilder, table_names: StringBuilder, @@ -457,9 +472,10 @@ struct FileMetadataBuilder { } impl FileMetadataBuilder { - fn new(schema: SchemaRef) -> Self { + fn new(schema: SchemaRef, metadata_provider: Arc) -> Self { Self { schema, + metadata_provider, catalog_names: StringBuilder::new(), schema_names: StringBuilder::new(), table_names: StringBuilder::new(), @@ -471,6 +487,7 @@ impl FileMetadataBuilder { async fn build_from_catalog_list( catalog_list: Arc, + metadata_provider: Arc, schema: SchemaRef, context: Arc, ) -> Result> { @@ -481,11 +498,19 @@ impl FileMetadataBuilder { Some(catalog_provider) => catalog_provider, None => continue, }; + let metadata_provider = metadata_provider.clone(); let schema = schema.clone(); let context = context.clone(); tasks.push(async move { - Self::build_from_catalog(&catalog_name, catalog_provider, schema, context).await + Self::build_from_catalog( + &catalog_name, + catalog_provider, + metadata_provider, + schema, + context, + ) + .await }); } @@ -504,6 +529,7 @@ impl FileMetadataBuilder { async fn build_from_catalog( catalog_name: &str, catalog_provider: Arc, + metadata_provider: Arc, schema: SchemaRef, context: Arc, ) -> Result> { @@ -514,6 +540,7 @@ impl FileMetadataBuilder { Some(schema_provider) => schema_provider, None => continue, }; + let metadata_provider = metadata_provider.clone(); let schema = schema.clone(); let context = context.clone(); @@ -522,6 +549,7 @@ impl FileMetadataBuilder { catalog_name, &schema_name, schema_provider, + metadata_provider, schema, context, ) @@ -545,6 +573,7 @@ impl FileMetadataBuilder { catalog_name: &str, schema_name: &str, schema_provider: Arc, + metadata_provider: Arc, schema: SchemaRef, context: Arc, ) -> Result> { @@ -555,6 +584,7 @@ impl FileMetadataBuilder { Some(table_provider) => table_provider, None => continue, }; + let metadata_provider = metadata_provider.clone(); let schema = schema.clone(); let context = context.clone(); @@ -564,6 +594,7 @@ impl FileMetadataBuilder { schema_name, &table_name, table_provider, + metadata_provider, schema, context, ) @@ -587,10 +618,11 @@ impl FileMetadataBuilder { schema_name: &str, table_name: &str, table_provider: Arc, + metadata_provider: Arc, schema: SchemaRef, context: Arc, ) -> Result> { - let mut builder = Self::new(schema.clone()); + let mut builder = Self::new(schema.clone(), metadata_provider.clone()); let listing_table_like = match cast_to_listing_table(table_provider.as_ref()) { None => return Ok(None), @@ -628,17 +660,19 @@ impl FileMetadataBuilder { let store_url = table_path.object_store(); let store = context.runtime_env().object_store(table_path)?; - let mut file_stream = list_all_files( - store.as_ref(), - table_path, - file_ext, - context - .session_config() - .options() - .execution - .listing_table_ignore_subdirectory, - ) - .await; + let mut file_stream = self + .metadata_provider + .list_all_files( + store.clone(), + table_path.clone(), + file_ext.to_string(), + context + .session_config() + .options() + .execution + .listing_table_ignore_subdirectory, + ) + .await; while let Some(file_meta) = file_stream.try_next().await? { self.append( @@ -687,38 +721,61 @@ impl FileMetadataBuilder { } } -// Mostly copied from ListingTableUrl::list_all_files, which is private to that crate -// Modified to handle empty tables -async fn list_all_files<'a>( - store: &'a dyn ObjectStore, - url: &'a ListingTableUrl, - file_extension: &'a str, - ignore_subdirectory: bool, -) -> BoxStream<'a, Result> { - // Check if the directory exists yet - if let Err(object_store::Error::NotFound { path, .. }) = - store.list_with_delimiter(Some(url.prefix())).await - { - debug!( +/// Provides [`ObjectMetadata`] data to the [`FileMetadata`] table provider. +#[async_trait] +pub trait FileMetadataProvider: std::fmt::Debug + Send + Sync { + /// List all files in the store for the given `url` prefix. + async fn list_all_files( + &self, + store: Arc, + url: ListingTableUrl, + file_extension: String, + ignore_subdirectory: bool, + ) -> BoxStream<'static, Result>; +} + +/// Default implementation of the [`FileMetadataProvider`]. +#[derive(Debug)] +pub struct DefaultFileMetadataProvider; + +#[async_trait] +impl FileMetadataProvider for DefaultFileMetadataProvider { + // Mostly copied from ListingTableUrl::list_all_files, which is private to that crate + // Modified to handle empty tables + async fn list_all_files( + &self, + store: Arc, + url: ListingTableUrl, + file_extension: String, + ignore_subdirectory: bool, + ) -> BoxStream<'static, Result> { + // Check if the directory exists yet + if let Err(object_store::Error::NotFound { path, .. }) = + store.list_with_delimiter(Some(url.prefix())).await + { + debug!( "attempted to list empty table at {path} during file_metadata listing, returning empty list" ); - return Box::pin(stream::empty()); - } + return Box::pin(stream::empty()); + } - let is_dir = url.as_str().ends_with('/'); - let list = match is_dir { - true => store.list(Some(url.prefix())), - false => futures::stream::once(store.head(url.prefix())).boxed(), - }; + let is_dir = url.as_str().ends_with('/'); + let prefix = url.prefix().clone(); + + let list = match is_dir { + true => stream::iter(store.list(Some(&prefix)).collect::>().await).boxed(), + false => futures::stream::once(async move { store.head(&prefix).await }).boxed(), + }; - list.map_err(Into::into) - .try_filter(move |meta| { - let path = &meta.location; - let extension_match = path.as_ref().ends_with(file_extension); - let glob_match = url.contains(path, ignore_subdirectory); - futures::future::ready(extension_match && glob_match) - }) - .boxed() + list.map_err(Into::into) + .try_filter(move |meta| { + let path = &meta.location; + let extension_match = path.as_ref().ends_with(&file_extension); + let glob_match = url.contains(path, ignore_subdirectory); + futures::future::ready(extension_match && glob_match) + }) + .boxed() + } } #[cfg(test)] @@ -739,7 +796,7 @@ mod test { use tempfile::TempDir; use url::Url; - use super::FileMetadata; + use super::{DefaultFileMetadataProvider, FileMetadata}; struct TestContext { _dir: TempDir, @@ -862,7 +919,10 @@ mod test { ctx.register_table( "file_metadata", - Arc::new(FileMetadata::new(Arc::clone(ctx.state().catalog_list()))), + Arc::new(FileMetadata::new( + Arc::clone(ctx.state().catalog_list()), + Arc::new(DefaultFileMetadataProvider), + )), ) .context("register file metadata table")?; diff --git a/src/materialized/hive_partition.rs b/src/materialized/hive_partition.rs index 075750e..43ebfde 100644 --- a/src/materialized/hive_partition.rs +++ b/src/materialized/hive_partition.rs @@ -22,8 +22,8 @@ use arrow_schema::DataType; use datafusion_common::{DataFusionError, Result, ScalarValue}; use datafusion_expr::{ - expr::ScalarFunction, ColumnarValue, Expr, ScalarUDF, ScalarUDFImpl, Signature, TypeSignature, - Volatility, + expr::ScalarFunction, ColumnarValue, Expr, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, + Signature, TypeSignature, Volatility, }; pub static HIVE_PARTITION_UDF_NAME: &str = "hive_partition"; @@ -101,7 +101,8 @@ impl ScalarUDFImpl for HivePartitionUdf { Ok(DataType::Utf8) } - fn invoke(&self, values: &[ColumnarValue]) -> Result { + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let values = args.args; let null_if_missing = values .get(2) .map(|val| match val { @@ -113,7 +114,7 @@ impl ScalarUDFImpl for HivePartitionUdf { .transpose()? .unwrap_or(false); - let arrays = ColumnarValue::values_to_arrays(values)?; + let arrays = ColumnarValue::values_to_arrays(&values)?; let [file_paths, table_partition_columns]: [Option<&StringArray>; 2] = [&arrays[0], &arrays[1]].map(|arg| arg.as_any().downcast_ref()); diff --git a/src/rewrite/exploitation.rs b/src/rewrite/exploitation.rs index 4819bc4..7ac5ced 100644 --- a/src/rewrite/exploitation.rs +++ b/src/rewrite/exploitation.rs @@ -60,6 +60,10 @@ impl ViewMatcher { continue; }; + if !mv.config().use_in_query_rewrite { + continue; + } + // Analyze the plan to normalize things such as wildcard expressions let analyzed_plan = session_state.analyzer().execute_and_check( mv.query(), @@ -296,12 +300,6 @@ impl UserDefinedLogicalNodeCore for OneOf { write!(f, "OneOf") } - fn from_template(&self, _exprs: &[datafusion::prelude::Expr], inputs: &[LogicalPlan]) -> Self { - Self { - branches: inputs.to_vec(), - } - } - fn with_exprs_and_inputs( &self, _exprs: Vec, diff --git a/src/rewrite/normal_form.rs b/src/rewrite/normal_form.rs index 812367e..b84f8c0 100644 --- a/src/rewrite/normal_form.rs +++ b/src/rewrite/normal_form.rs @@ -263,7 +263,7 @@ impl SpjNormalForm { source: Arc, ) -> Result> { log::trace!("rewriting from {qualifier}"); - let mut new_output_exprs = vec![]; + let mut new_output_exprs = Vec::with_capacity(self.output_exprs.len()); // check that our output exprs are sub-expressions of the other one's output exprs for (i, output_expr) in self.output_exprs.iter().enumerate() { let new_output_expr = other @@ -334,6 +334,7 @@ impl SpjNormalForm { /// Stores information on filters from a Select-Project-Join plan. #[derive(Debug, Clone)] struct Predicate { + /// Full table schema, including all possible columns. schema: DFSchema, /// List of column equivalence classes. eq_classes: Vec, @@ -350,7 +351,15 @@ impl Predicate { let mut schema = DFSchema::empty(); plan.apply(|plan| { if let LogicalPlan::TableScan(scan) = plan { - schema = schema.join(&scan.projected_schema)?; + let new_schema = DFSchema::try_from_qualified_schema( + scan.table_name.clone(), + scan.source.schema().as_ref(), + )?; + schema = if schema.fields().is_empty() { + new_schema + } else { + schema.join(&new_schema)? + } } Ok(TreeNodeRecursion::Continue) @@ -367,14 +376,20 @@ impl Predicate { // Collect all referenced columns plan.apply(|plan| { if let LogicalPlan::TableScan(scan) = plan { - for (i, column) in scan.projected_schema.columns().iter().enumerate() { + for (i, (table_ref, field)) in DFSchema::try_from_qualified_schema( + scan.table_name.clone(), + scan.source.schema().as_ref(), + )? + .iter() + .enumerate() + { + let column = Column::new(table_ref.cloned(), field.name()); + let data_type = field.data_type(); new.eq_classes .push(ColumnEquivalenceClass::new_singleton(column.clone())); - new.eq_class_idx_by_column.insert(column.clone(), i); + new.eq_class_idx_by_column.insert(column, i); new.ranges_by_equivalence_class - .push(Some(Interval::make_unbounded( - scan.projected_schema.data_type(column)?, - )?)); + .push(Some(Interval::make_unbounded(data_type)?)); } } @@ -481,8 +496,19 @@ impl Predicate { let range = self .eq_class_idx_by_column .get(c) - .and_then(|&idx| self.ranges_by_equivalence_class.get_mut(idx)) - .unwrap(); + .ok_or_else(|| { + DataFusionError::Plan(format!("column {c} not found in equivalence classes")) + }) + .and_then(|&idx| { + self.ranges_by_equivalence_class + .get_mut(idx) + .ok_or_else(|| { + DataFusionError::Plan(format!( + "range not found class not found for column {c} with equivalence class {:?}", self.eq_classes.get(idx) + )) + }) + })?; + let new_range = match op { Operator::Eq => Interval::try_new(value.clone(), value.clone()), Operator::LtEq => { @@ -944,17 +970,47 @@ fn get_table_scan_columns(scan: &TableScan) -> Result> { #[cfg(test)] mod test { use arrow::compute::concat_batches; - use datafusion::{datasource::provider_as_source, prelude::SessionContext}; + use datafusion::{ + datasource::provider_as_source, + prelude::{SessionConfig, SessionContext}, + }; use datafusion_common::{DataFusionError, Result}; use datafusion_sql::TableReference; + use tempfile::tempdir; use super::SpjNormalForm; async fn setup() -> Result { - let ctx = SessionContext::new(); + let ctx = SessionContext::new_with_config( + SessionConfig::new() + .set_bool("datafusion.execution.parquet.pushdown_filters", true) + .set_bool("datafusion.explain.logical_plan_only", true), + ); + + let t1_path = tempdir()?; + + // Create external table to exercise parquet filter pushdown. + // This will put the filters directly inside the `TableScan` node. + // This is important because `TableScan` can have filters on + // columns not in its own output. + ctx.sql(&format!( + " + CREATE EXTERNAL TABLE t1 ( + column1 VARCHAR, + column2 BIGINT, + column3 CHAR + ) + STORED AS PARQUET + LOCATION '{}'", + t1_path.path().to_string_lossy() + )) + .await + .map_err(|e| e.context("setup `t1` table"))? + .collect() + .await?; ctx.sql( - "CREATE TABLE t1 AS VALUES + "INSERT INTO t1 VALUES ('2021', 3, 'A'), ('2022', 4, 'B'), ('2023', 5, 'C')", @@ -976,8 +1032,7 @@ mod test { o_orderdate DATE, p_name VARCHAR, p_partkey INT - ) - ", + )", ) .await .map_err(|e| e.context("parse `example` table ddl"))? @@ -1010,6 +1065,15 @@ mod test { let query_plan = context.sql(case.query).await?.into_optimized_plan()?; let query_normal_form = SpjNormalForm::new(&query_plan)?; + for plan in [&base_plan, &query_plan] { + context + .execute_logical_plan(plan.clone()) + .await? + .explain(false, false)? + .show() + .await?; + } + let table_ref = TableReference::bare("mv"); let rewritten = query_normal_form .rewrite_from( @@ -1021,16 +1085,14 @@ mod test { "expected rewrite to succeed".to_string(), ))?; - assert_eq!(rewritten.schema().as_ref(), query_plan.schema().as_ref()); + context + .execute_logical_plan(rewritten.clone()) + .await? + .explain(false, false)? + .show() + .await?; - for plan in [&base_plan, &query_plan, &rewritten] { - context - .execute_logical_plan(plan.clone()) - .await? - .explain(false, false)? - .show() - .await?; - } + assert_eq!(rewritten.schema().as_ref(), query_plan.schema().as_ref()); let expected = concat_batches( &query_plan.schema().as_ref().clone().into(), @@ -1097,31 +1159,31 @@ mod test { TestCase { name: "example from paper", base: "\ - SELECT - l_orderkey, - o_custkey, + SELECT + l_orderkey, + o_custkey, l_partkey, l_shipdate, o_orderdate, l_quantity*l_extendedprice AS gross_revenue FROM example WHERE - l_orderkey = o_orderkey AND - l_partkey = p_partkey AND - p_partkey >= 150 AND - o_custkey >= 50 AND - o_custkey <= 500 AND + l_orderkey = o_orderkey AND + l_partkey = p_partkey AND + p_partkey >= 150 AND + o_custkey >= 50 AND + o_custkey <= 500 AND p_name LIKE '%abc%' ", - query: "SELECT - l_orderkey, - o_custkey, + query: "SELECT + l_orderkey, + o_custkey, l_partkey, l_quantity*l_extendedprice FROM example - WHERE + WHERE l_orderkey = o_orderkey AND l_partkey = p_partkey AND - l_partkey >= 150 AND + l_partkey >= 150 AND l_partkey <= 160 AND o_custkey = 123 AND o_orderdate = l_shipdate AND @@ -1129,6 +1191,11 @@ mod test { l_quantity*l_extendedprice > 100 ", }, + TestCase { + name: "naked table scan with pushed down filters", + base: "SELECT column1 FROM t1 WHERE column2 <= 3", + query: "SELECT column1 FROM t1 WHERE column2 <= 3", + }, ]; for case in cases { diff --git a/tests/materialized_listing_table.rs b/tests/materialized_listing_table.rs index 6ab2bf1..5ad9d25 100644 --- a/tests/materialized_listing_table.rs +++ b/tests/materialized_listing_table.rs @@ -39,7 +39,7 @@ use datafusion_expr::{ }; use datafusion_materialized_views::materialized::{ dependencies::{mv_dependencies, stale_files}, - file_metadata::FileMetadata, + file_metadata::{DefaultFileMetadataProvider, FileMetadata}, register_materialized, row_metadata::RowMetadataRegistry, ListingTableLike, Materialized, @@ -146,7 +146,10 @@ async fn setup() -> Result { // Now register the `mv_dependencies` and `stale_files` UDTFs // They have `FileMetadata` and `RowMetadataRegistry` as dependencies. - let file_metadata = Arc::new(FileMetadata::new(Arc::clone(ctx.state().catalog_list()))); + let file_metadata = Arc::new(FileMetadata::new( + Arc::clone(ctx.state().catalog_list()), + Arc::new(DefaultFileMetadataProvider), + )); let row_metadata_registry = Arc::new(RowMetadataRegistry::new(Arc::clone(&file_metadata))); ctx.register_udtf(