diff --git a/Cargo.toml b/Cargo.toml index 07c88d7..30a06ee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,19 +28,19 @@ keywords = ["arrow", "arrow-rs", "datafusion"] rust-version = "1.80" [dependencies] -arrow = "55.1.0" -arrow-schema = "55.1.0" +arrow = "55.2.0" +arrow-schema = "55.2.0" async-trait = "0.1" dashmap = "6" -datafusion = "48" -datafusion-common = "48" -datafusion-expr = "48" -datafusion-functions = "48" -datafusion-functions-aggregate = "48" -datafusion-optimizer = "48" -datafusion-physical-expr = "48" -datafusion-physical-plan = "48" -datafusion-sql = "48" +datafusion = "49" +datafusion-common = "49" +datafusion-expr = "49" +datafusion-functions = "49" +datafusion-functions-aggregate = "49" +datafusion-optimizer = "49" +datafusion-physical-expr = "49" +datafusion-physical-plan = "49" +datafusion-sql = "49" futures = "0.3" itertools = "0.14" log = "0.4" diff --git a/deny.toml b/deny.toml index a24420a..b516c5d 100644 --- a/deny.toml +++ b/deny.toml @@ -24,6 +24,8 @@ allow = [ "BSD-3-Clause", "CC0-1.0", "Unicode-3.0", - "Zlib" + "Zlib", + "ISC", + "bzip2-1.0.6" ] version = 2 diff --git a/src/materialized/dependencies.rs b/src/materialized/dependencies.rs index de50181..9150e93 100644 --- a/src/materialized/dependencies.rs +++ b/src/materialized/dependencies.rs @@ -1447,7 +1447,7 @@ mod test { .enumerate() .filter_map(|(i, c)| case.partition_cols.contains(&c.name.as_str()).then_some(i)) .collect(); - println!("indices: {:?}", partition_col_indices); + println!("indices: {partition_col_indices:?}"); let analyzed = pushdown_projection_inexact(plan.clone(), &partition_col_indices)?; println!( "inexact projection pushdown:\n{}", @@ -1720,19 +1720,19 @@ mod test { ", projection: &["year"], expected_plan: vec![ - "+--------------+--------------------------------------------------+", - "| plan_type | plan |", - "+--------------+--------------------------------------------------+", - "| logical_plan | Union |", - "| | Projection: coalesce(t1.year, t2.year) AS year |", - "| | Full Join: Using t1.year = t2.year |", - "| | SubqueryAlias: t1 |", - "| | Projection: t1.column1 AS year |", - "| | TableScan: t1 projection=[column1] |", - "| | SubqueryAlias: t2 |", - "| | TableScan: t2 projection=[year] |", - "| | TableScan: t3 projection=[year] |", - "+--------------+--------------------------------------------------+", + "+--------------+--------------------------------------------------------------------+", + "| plan_type | plan |", + "+--------------+--------------------------------------------------------------------+", + "| logical_plan | Union |", + "| | Projection: coalesce(CAST(t1.year AS Utf8View), t2.year) AS year |", + "| | Full Join: Using CAST(t1.year AS Utf8View) = t2.year |", + "| | SubqueryAlias: t1 |", + "| | Projection: t1.column1 AS year |", + "| | TableScan: t1 projection=[column1] |", + "| | SubqueryAlias: t2 |", + "| | TableScan: t2 projection=[year] |", + "| | TableScan: t3 projection=[year] |", + "+--------------+--------------------------------------------------------------------+", ], expected_output: vec![ "+------+", diff --git a/src/materialized/file_metadata.rs b/src/materialized/file_metadata.rs index d5a5c8e..2c68405 100644 --- a/src/materialized/file_metadata.rs +++ b/src/materialized/file_metadata.rs @@ -226,7 +226,7 @@ impl ExecutionPlan for FileMetadataExec { .map(|record_batch| { record_batch .project(&projection) - .map_err(|e| DataFusionError::ArrowError(e, None)) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None)) }) .collect::>(); } @@ -858,7 +858,7 @@ mod test { .await?; ctx.sql( - "INSERT INTO t1 VALUES + "INSERT INTO t1 VALUES (1, '2021'), (2, '2022'), (3, '2023'), @@ -882,7 +882,7 @@ mod test { .await?; ctx.sql( - "INSERT INTO private.t1 VALUES + "INSERT INTO private.t1 VALUES (1, '2021', '01'), (2, '2022', '02'), (3, '2023', '03'), @@ -906,7 +906,7 @@ mod test { .await?; ctx.sql( - "INSERT INTO datafusion_mv.public.t3 VALUES + "INSERT INTO datafusion_mv.public.t3 VALUES (1, '2021-01-01'), (2, '2022-02-02'), (3, '2023-03-03'), @@ -929,8 +929,8 @@ mod test { ctx.sql( // Remove timestamps and trim (randomly generated) file names since they're not stable in tests "CREATE VIEW file_metadata_test_view AS SELECT - * EXCLUDE(file_path, last_modified), - regexp_replace(file_path, '/[^/]*$', '/') AS file_path + * EXCLUDE(file_path, last_modified), + regexp_replace(file_path, '/[^/]*$', '/') AS file_path FROM file_metadata", ) .await diff --git a/src/materialized/row_metadata.rs b/src/materialized/row_metadata.rs index 6476f39..fa12cdf 100644 --- a/src/materialized/row_metadata.rs +++ b/src/materialized/row_metadata.rs @@ -98,7 +98,7 @@ impl RowMetadataRegistry { .get(&table.to_string()) .map(|o| Arc::clone(o.value())) .or_else(|| self.default_source.clone()) - .ok_or_else(|| DataFusionError::Internal(format!("No metadata source for {}", table))) + .ok_or_else(|| DataFusionError::Internal(format!("No metadata source for {table}"))) } } diff --git a/src/rewrite/exploitation.rs b/src/rewrite/exploitation.rs index e8f3003..eef0708 100644 --- a/src/rewrite/exploitation.rs +++ b/src/rewrite/exploitation.rs @@ -23,7 +23,7 @@ use datafusion::catalog::TableProvider; use datafusion::datasource::provider_as_source; use datafusion::execution::context::SessionState; use datafusion::execution::{SendableRecordBatchStream, TaskContext}; -use datafusion::physical_expr::{LexRequirement, PhysicalSortExpr, PhysicalSortRequirement}; +use datafusion::physical_expr::{PhysicalSortExpr, PhysicalSortRequirement}; use datafusion::physical_expr_common::sort_expr::format_physical_sort_requirement_list; use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; @@ -32,6 +32,7 @@ use datafusion_common::tree_node::{Transformed, TreeNode, TreeNodeRecursion, Tre use datafusion_common::{DataFusionError, Result, TableReference}; use datafusion_expr::{Extension, LogicalPlan, UserDefinedLogicalNode, UserDefinedLogicalNodeCore}; use datafusion_optimizer::OptimizerRule; +use datafusion_physical_expr::OrderingRequirements; use itertools::Itertools; use ordered_float::OrderedFloat; @@ -316,7 +317,7 @@ pub struct OneOfExec { // Optionally declare a required input ordering // This will inform DataFusion to add sorts to children, // which will improve cost estimation of candidates - required_input_ordering: Option, + required_input_ordering: Option, // Index of the candidate with the best cost best: usize, // Cost function to use in optimization @@ -337,7 +338,7 @@ impl OneOfExec { /// Create a new `OneOfExec` pub fn try_new( candidates: Vec>, - required_input_ordering: Option, + required_input_ordering: Option, cost: CostFn, ) -> Result { if candidates.is_empty() { @@ -366,7 +367,7 @@ impl OneOfExec { /// Modify this plan's required input ordering. /// Used for sort pushdown - pub fn with_required_input_ordering(self, requirement: Option) -> Self { + pub fn with_required_input_ordering(self, requirement: Option) -> Self { Self { required_input_ordering: requirement, ..self @@ -387,7 +388,7 @@ impl ExecutionPlan for OneOfExec { self.candidates[self.best].properties() } - fn required_input_ordering(&self) -> Vec> { + fn required_input_ordering(&self) -> Vec> { vec![self.required_input_ordering.clone(); self.children().len()] } @@ -455,12 +456,16 @@ impl DisplayAs for OneOfExec { format_physical_sort_requirement_list( &self .required_input_ordering - .clone() - .unwrap_or_default() - .into_iter() - .map(PhysicalSortExpr::from) - .map(PhysicalSortRequirement::from) - .collect_vec() + .as_ref() + .map(|req| { + req.clone() + .into_single() + .into_iter() + .map(PhysicalSortExpr::from) + .map(PhysicalSortRequirement::from) + .collect_vec() + }) + .unwrap_or_default(), ) ) } diff --git a/src/rewrite/normal_form.rs b/src/rewrite/normal_form.rs index 8d4bc62..7efd53d 100644 --- a/src/rewrite/normal_form.rs +++ b/src/rewrite/normal_form.rs @@ -988,8 +988,7 @@ mod test { let ctx = SessionContext::new_with_config( SessionConfig::new() .set_bool("datafusion.execution.parquet.pushdown_filters", true) - .set_bool("datafusion.explain.logical_plan_only", true) - .set_bool("datafusion.sql_parser.map_varchar_to_utf8view", false), + .set_bool("datafusion.explain.logical_plan_only", true), ); let t1_path = tempdir()?; diff --git a/tests/materialized_listing_table.rs b/tests/materialized_listing_table.rs index 5ad9d25..7798f9a 100644 --- a/tests/materialized_listing_table.rs +++ b/tests/materialized_listing_table.rs @@ -185,7 +185,7 @@ async fn setup() -> Result { .await?; ctx.sql( - "INSERT INTO t1 VALUES + "INSERT INTO t1 VALUES (1, '2023-01-01', 'A'), (2, '2023-01-02', 'B'), (3, '2023-01-03', 'C'), @@ -251,7 +251,7 @@ async fn test_materialized_listing_table_incremental_maintenance() -> Result<()> // Insert another row into the source table ctx.sql( - "INSERT INTO t1 VALUES + "INSERT INTO t1 VALUES (7, '2024-12-07', 'W')", ) .await? @@ -352,12 +352,13 @@ impl MaterializedListingTable { file_sort_order: opts.file_sort_order, }); + let mut listing_table_config = ListingTableConfig::new(config.table_path); + if let Some(options) = options { + listing_table_config = listing_table_config.with_listing_options(options); + } + listing_table_config = listing_table_config.with_schema(Arc::new(file_schema)); Ok(MaterializedListingTable { - inner: ListingTable::try_new(ListingTableConfig { - table_paths: vec![config.table_path], - file_schema: Some(Arc::new(file_schema)), - options, - })?, + inner: ListingTable::try_new(listing_table_config)?, query: normalized_query, schema: normalized_schema, })