diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index c44d14abd381a..dad8472dedc42 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -150,6 +150,11 @@ impl TestOutput { self.metric_value("row_groups_matched_statistics") } + /// The number of row_groups fully matched by statistics + fn row_groups_fully_matched_statistics(&self) -> Option { + self.metric_value("row_groups_fully_matched_statistics") + } + /// The number of row_groups pruned by statistics fn row_groups_pruned_statistics(&self) -> Option { self.metric_value("row_groups_pruned_statistics") @@ -178,6 +183,11 @@ impl TestOutput { self.metric_value("page_index_rows_pruned") } + /// The number of row groups pruned by limit pruning + fn limit_pruned_row_groups(&self) -> Option { + self.metric_value("limit_pruned_row_groups") + } + fn description(&self) -> String { format!( "Input:\n{}\nQuery:\n{}\nOutput:\n{}\nMetrics:\n{}", @@ -191,20 +201,41 @@ impl TestOutput { /// and the appropriate scenario impl ContextWithParquet { async fn new(scenario: Scenario, unit: Unit) -> Self { - Self::with_config(scenario, unit, SessionConfig::new()).await + Self::with_config(scenario, unit, SessionConfig::new(), None, None).await + } + + /// Set custom schema and batches for the test + pub async fn with_custom_data( + scenario: Scenario, + unit: Unit, + schema: Arc, + batches: Vec, + ) -> Self { + Self::with_config( + scenario, + unit, + SessionConfig::new(), + Some(schema), + Some(batches), + ) + .await } async fn with_config( scenario: Scenario, unit: Unit, mut config: SessionConfig, + custom_schema: Option>, + custom_batches: Option>, ) -> Self { // Use a single partition for deterministic results no matter how many CPUs the host has config = config.with_target_partitions(1); let file = match unit { Unit::RowGroup(row_per_group) => { config = config.with_parquet_bloom_filter_pruning(true); - make_test_file_rg(scenario, row_per_group).await + config.options_mut().execution.parquet.pushdown_filters = true; + make_test_file_rg(scenario, row_per_group, custom_schema, custom_batches) + .await } Unit::Page(row_per_page) => { config = config.with_parquet_page_index_pruning(true); @@ -1030,7 +1061,12 @@ fn create_data_batch(scenario: Scenario) -> Vec { } /// Create a test parquet file with various data types -async fn make_test_file_rg(scenario: Scenario, row_per_group: usize) -> NamedTempFile { +async fn make_test_file_rg( + scenario: Scenario, + row_per_group: usize, + custom_schema: Option>, + custom_batches: Option>, +) -> NamedTempFile { let mut output_file = tempfile::Builder::new() .prefix("parquet_pruning") .suffix(".parquet") @@ -1043,8 +1079,14 @@ async fn make_test_file_rg(scenario: Scenario, row_per_group: usize) -> NamedTem .set_statistics_enabled(EnabledStatistics::Page) .build(); - let batches = create_data_batch(scenario); - let schema = batches[0].schema(); + let (batches, schema) = + if let (Some(schema), Some(batches)) = (custom_schema, custom_batches) { + (batches, schema) + } else { + let batches = create_data_batch(scenario); + let schema = batches[0].schema(); + (batches, schema) + }; let mut writer = ArrowWriter::try_new(&mut output_file, schema, Some(props)).unwrap(); diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs b/datafusion/core/tests/parquet/row_group_pruning.rs index 44409166d3ce3..42c4488d8444a 100644 --- a/datafusion/core/tests/parquet/row_group_pruning.rs +++ b/datafusion/core/tests/parquet/row_group_pruning.rs @@ -18,8 +18,12 @@ //! This file contains an end to end test of parquet pruning. It writes //! data into a parquet file and then verifies row groups are pruned as //! expected. +use std::sync::Arc; + +use arrow::array::{ArrayRef, Int32Array, RecordBatch}; +use arrow_schema::{DataType, Field, Schema}; use datafusion::prelude::SessionConfig; -use datafusion_common::ScalarValue; +use datafusion_common::{DataFusionError, ScalarValue}; use itertools::Itertools; use crate::parquet::Unit::RowGroup; @@ -30,10 +34,12 @@ struct RowGroupPruningTest { query: String, expected_errors: Option, expected_row_group_matched_by_statistics: Option, + expected_row_group_fully_matched_by_statistics: Option, expected_row_group_pruned_by_statistics: Option, expected_files_pruned_by_statistics: Option, expected_row_group_matched_by_bloom_filter: Option, expected_row_group_pruned_by_bloom_filter: Option, + expected_limit_pruned_row_groups: Option, expected_rows: usize, } impl RowGroupPruningTest { @@ -45,9 +51,11 @@ impl RowGroupPruningTest { expected_errors: None, expected_row_group_matched_by_statistics: None, expected_row_group_pruned_by_statistics: None, + expected_row_group_fully_matched_by_statistics: None, expected_files_pruned_by_statistics: None, expected_row_group_matched_by_bloom_filter: None, expected_row_group_pruned_by_bloom_filter: None, + expected_limit_pruned_row_groups: None, expected_rows: 0, } } @@ -76,6 +84,15 @@ impl RowGroupPruningTest { self } + // Set the expected fully matched row groups by statistics + fn with_fully_matched_by_stats( + mut self, + fully_matched_by_stats: Option, + ) -> Self { + self.expected_row_group_fully_matched_by_statistics = fully_matched_by_stats; + self + } + // Set the expected pruned row groups by statistics fn with_pruned_by_stats(mut self, pruned_by_stats: Option) -> Self { self.expected_row_group_pruned_by_statistics = pruned_by_stats; @@ -99,6 +116,11 @@ impl RowGroupPruningTest { self } + fn with_limit_pruned_row_groups(mut self, pruned_by_limit: Option) -> Self { + self.expected_limit_pruned_row_groups = pruned_by_limit; + self + } + /// Set the number of expected rows from the output of this test fn with_expected_rows(mut self, rows: usize) -> Self { self.expected_rows = rows; @@ -143,6 +165,65 @@ impl RowGroupPruningTest { self.expected_row_group_pruned_by_bloom_filter, "mismatched row_groups_pruned_bloom_filter", ); + + assert_eq!( + output.result_rows, + self.expected_rows, + "Expected {} rows, got {}: {}", + output.result_rows, + self.expected_rows, + output.description(), + ); + } + + // Execute the test with the current configuration + async fn test_row_group_prune_with_custom_data( + self, + schema: Arc, + batches: Vec, + max_row_per_group: usize, + ) { + let output = ContextWithParquet::with_custom_data( + self.scenario, + RowGroup(max_row_per_group), + schema, + batches, + ) + .await + .query(&self.query) + .await; + + println!("{}", output.description()); + assert_eq!( + output.predicate_evaluation_errors(), + self.expected_errors, + "mismatched predicate_evaluation error" + ); + assert_eq!( + output.row_groups_matched_statistics(), + self.expected_row_group_matched_by_statistics, + "mismatched row_groups_matched_statistics", + ); + assert_eq!( + output.row_groups_fully_matched_statistics(), + self.expected_row_group_fully_matched_by_statistics, + "mismatched row_groups_fully_matched_statistics", + ); + assert_eq!( + output.row_groups_pruned_statistics(), + self.expected_row_group_pruned_by_statistics, + "mismatched row_groups_pruned_statistics", + ); + assert_eq!( + output.files_ranges_pruned_statistics(), + self.expected_files_pruned_by_statistics, + "mismatched files_ranges_pruned_statistics", + ); + assert_eq!( + output.limit_pruned_row_groups(), + self.expected_limit_pruned_row_groups, + "mismatched limit_pruned_row_groups", + ); assert_eq!( output.result_rows, self.expected_rows, @@ -287,11 +368,16 @@ async fn prune_disabled() { let expected_rows = 10; let config = SessionConfig::new().with_parquet_pruning(false); - let output = - ContextWithParquet::with_config(Scenario::Timestamps, RowGroup(5), config) - .await - .query(query) - .await; + let output = ContextWithParquet::with_config( + Scenario::Timestamps, + RowGroup(5), + config, + None, + None, + ) + .await + .query(query) + .await; println!("{}", output.description()); // This should not prune any @@ -1634,3 +1720,241 @@ async fn test_bloom_filter_decimal_dict() { .test_row_group_prune() .await; } + +// Helper function to create a batch with a single Int32 column. +fn make_i32_batch( + name: &str, + values: Vec, +) -> datafusion_common::error::Result { + let schema = Arc::new(Schema::new(vec![Field::new(name, DataType::Int32, false)])); + let array: ArrayRef = Arc::new(Int32Array::from(values)); + RecordBatch::try_new(schema, vec![array]).map_err(DataFusionError::from) +} + +// Helper function to create a batch with two Int32 columns +fn make_two_col_i32_batch( + name_a: &str, + name_b: &str, + values_a: Vec, + values_b: Vec, +) -> datafusion_common::error::Result { + let schema = Arc::new(Schema::new(vec![ + Field::new(name_a, DataType::Int32, false), + Field::new(name_b, DataType::Int32, false), + ])); + let array_a: ArrayRef = Arc::new(Int32Array::from(values_a)); + let array_b: ArrayRef = Arc::new(Int32Array::from(values_b)); + RecordBatch::try_new(schema, vec![array_a, array_b]).map_err(DataFusionError::from) +} + +#[tokio::test] +async fn test_limit_pruning_basic() -> datafusion_common::error::Result<()> { + // Scenario: Simple integer column, multiple row groups + // Query: SELECT c1 FROM t WHERE c1 = 0 LIMIT 2 + // We expect 2 rows in total. + + // Row Group 0: c1 = [0, -2] -> Partially matched, 1 row + // Row Group 1: c1 = [1, 2] -> Fully matched, 2 rows + // Row Group 2: c1 = [3, 4] -> Fully matched, 2 rows + // Row Group 3: c1 = [5, 6] -> Fully matched, 2 rows + // Row Group 4: c1 = [-1, -2] -> Not matched + + // If limit = 2, and RG1 is fully matched and has 2 rows, we should + // only scan RG1 and prune other row groups + // RG4 is pruned by statistics. RG2 and RG3 are pruned by limit. + // So 2 row groups are effectively pruned due to limit pruning. + + let schema = Arc::new(Schema::new(vec![Field::new("c1", DataType::Int32, false)])); + let query = "SELECT c1 FROM t WHERE c1 >= 0 LIMIT 2"; + + let batches = vec![ + make_i32_batch("c1", vec![0, -2])?, + make_i32_batch("c1", vec![0, 0])?, + make_i32_batch("c1", vec![0, 0])?, + make_i32_batch("c1", vec![0, 0])?, + make_i32_batch("c1", vec![-1, -2])?, + ]; + + RowGroupPruningTest::new() + .with_scenario(Scenario::Int) // Assuming Scenario::Int can handle this data + .with_query(query) + .with_expected_errors(Some(0)) + .with_expected_rows(2) + .with_pruned_files(Some(0)) + .with_matched_by_stats(Some(4)) + .with_fully_matched_by_stats(Some(3)) + .with_pruned_by_stats(Some(1)) + .with_limit_pruned_row_groups(Some(3)) + .test_row_group_prune_with_custom_data(schema, batches, 2) + .await; + + Ok(()) +} + +#[tokio::test] +async fn test_limit_pruning_complex_filter() -> datafusion_common::error::Result<()> { + // Test Case 1: Complex filter with two columns (a = 1 AND b > 1 AND b < 4) + // Row Group 0: a=[1,1,1], b=[0,2,3] -> Partially matched, 2 rows match (b=2,3) + // Row Group 1: a=[1,1,1], b=[2,2,2] -> Fully matched, 3 rows + // Row Group 2: a=[1,1,1], b=[2,3,3] -> Fully matched, 3 rows + // Row Group 3: a=[1,1,1], b=[2,2,3] -> Fully matched, 3 rows + // Row Group 4: a=[2,2,2], b=[2,2,2] -> Not matched (a != 1) + // Row Group 5: a=[1,1,1], b=[5,6,7] -> Not matched (b >= 4) + + // With LIMIT 5, we need RG1 (3 rows) + RG2 (2 rows from 3) = 5 rows + // RG4 and RG5 should be pruned by statistics + // RG3 should be pruned by limit + // RG0 is partially matched, so it depends on the order + + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + ])); + let query = "SELECT a, b FROM t WHERE a = 1 AND b > 1 AND b < 4 LIMIT 5"; + + let batches = vec![ + make_two_col_i32_batch("a", "b", vec![1, 1, 1], vec![0, 2, 3])?, + make_two_col_i32_batch("a", "b", vec![1, 1, 1], vec![2, 2, 2])?, + make_two_col_i32_batch("a", "b", vec![1, 1, 1], vec![2, 3, 3])?, + make_two_col_i32_batch("a", "b", vec![1, 1, 1], vec![2, 2, 3])?, + make_two_col_i32_batch("a", "b", vec![2, 2, 2], vec![2, 2, 2])?, + make_two_col_i32_batch("a", "b", vec![1, 1, 1], vec![5, 6, 7])?, + ]; + + RowGroupPruningTest::new() + .with_scenario(Scenario::Int) + .with_query(query) + .with_expected_errors(Some(0)) + .with_expected_rows(5) + .with_pruned_files(Some(0)) + .with_matched_by_stats(Some(4)) // RG0,1,2,3 are matched + .with_fully_matched_by_stats(Some(3)) + .with_pruned_by_stats(Some(2)) // RG4,5 are pruned + .with_limit_pruned_row_groups(Some(2)) // RG0, RG3 is pruned by limit + .test_row_group_prune_with_custom_data(schema, batches, 3) + .await; + + Ok(()) +} + +#[tokio::test] +async fn test_limit_pruning_multiple_fully_matched( +) -> datafusion_common::error::Result<()> { + // Test Case 2: Limit requires multiple fully matched row groups + // Row Group 0: a=[5,5,5,5] -> Fully matched, 4 rows + // Row Group 1: a=[5,5,5,5] -> Fully matched, 4 rows + // Row Group 2: a=[5,5,5,5] -> Fully matched, 4 rows + // Row Group 3: a=[5,5,5,5] -> Fully matched, 4 rows + // Row Group 4: a=[1,2,3,4] -> Not matched + + // With LIMIT 8, we need RG0 (4 rows) + RG1 (4 rows) 8 rows + // RG2,3 should be pruned by limit + // RG4 should be pruned by statistics + + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let query = "SELECT a FROM t WHERE a = 5 LIMIT 8"; + + let batches = vec![ + make_i32_batch("a", vec![5, 5, 5, 5])?, + make_i32_batch("a", vec![5, 5, 5, 5])?, + make_i32_batch("a", vec![5, 5, 5, 5])?, + make_i32_batch("a", vec![5, 5, 5, 5])?, + make_i32_batch("a", vec![1, 2, 3, 4])?, + ]; + + RowGroupPruningTest::new() + .with_scenario(Scenario::Int) + .with_query(query) + .with_expected_errors(Some(0)) + .with_expected_rows(8) + .with_pruned_files(Some(0)) + .with_matched_by_stats(Some(4)) // RG0,1,2,3 matched + .with_fully_matched_by_stats(Some(4)) + .with_pruned_by_stats(Some(1)) // RG4 pruned + .with_limit_pruned_row_groups(Some(2)) // RG2,3 pruned by limit + .test_row_group_prune_with_custom_data(schema, batches, 4) + .await; + + Ok(()) +} + +#[tokio::test] +async fn test_limit_pruning_no_fully_matched() -> datafusion_common::error::Result<()> { + // Test Case 3: No fully matched row groups - all are partially matched + // Row Group 0: a=[1,2,3] -> Partially matched, 1 row (a=2) + // Row Group 1: a=[2,3,4] -> Partially matched, 1 row (a=2) + // Row Group 2: a=[2,5,6] -> Partially matched, 1 row (a=2) + // Row Group 3: a=[2,7,8] -> Partially matched, 1 row (a=2) + // Row Group 4: a=[9,10,11] -> Not matched + + // With LIMIT 3, we need to scan RG0,1,2 to get 3 matching rows + // Cannot prune much by limit since all matching RGs are partial + // RG4 should be pruned by statistics + + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let query = "SELECT a FROM t WHERE a = 2 LIMIT 3"; + + let batches = vec![ + make_i32_batch("a", vec![1, 2, 3])?, + make_i32_batch("a", vec![2, 3, 4])?, + make_i32_batch("a", vec![2, 5, 6])?, + make_i32_batch("a", vec![2, 7, 8])?, + make_i32_batch("a", vec![9, 10, 11])?, + ]; + + RowGroupPruningTest::new() + .with_scenario(Scenario::Int) + .with_query(query) + .with_expected_errors(Some(0)) + .with_expected_rows(3) + .with_pruned_files(Some(0)) + .with_matched_by_stats(Some(4)) // RG0,1,2,3 matched + .with_fully_matched_by_stats(Some(0)) + .with_pruned_by_stats(Some(1)) // RG4 pruned + .with_limit_pruned_row_groups(Some(0)) // RG3 pruned by limit + .test_row_group_prune_with_custom_data(schema, batches, 3) + .await; + + Ok(()) +} + +#[tokio::test] +async fn test_limit_pruning_exceeds_fully_matched() -> datafusion_common::error::Result<()> +{ + // Test Case 4: Limit exceeds all fully matched rows, need partially matched + // Row Group 0: a=[10,11,12,12] -> Partially matched, 1 row (a=10) + // Row Group 1: a=[10,10,10,10] -> Fully matched, 4 rows + // Row Group 2: a=[10,10,10,10] -> Fully matched, 4 rows + // Row Group 3: a=[10,13,14,11] -> Partially matched, 1 row (a=10) + // Row Group 4: a=[20,21,22,22] -> Not matched + + // With LIMIT 10, we need RG1 (4) + RG2 (4) = 8 from fully matched + // Still need 2 more, so we need to scan partially matched RG0 and RG3 + // All matching row groups should be scanned, only RG4 pruned by statistics + + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let query = "SELECT a FROM t WHERE a = 10 LIMIT 10"; + + let batches = vec![ + make_i32_batch("a", vec![10, 11, 12, 12])?, + make_i32_batch("a", vec![10, 10, 10, 10])?, + make_i32_batch("a", vec![10, 10, 10, 10])?, + make_i32_batch("a", vec![10, 13, 14, 11])?, + make_i32_batch("a", vec![20, 21, 22, 22])?, + ]; + + RowGroupPruningTest::new() + .with_scenario(Scenario::Int) + .with_query(query) + .with_expected_errors(Some(0)) + .with_expected_rows(10) // Total: 1 + 3 + 4 + 1 = 9 (less than limit) + .with_pruned_files(Some(0)) + .with_matched_by_stats(Some(4)) // RG0,1,2,3 matched + .with_fully_matched_by_stats(Some(2)) + .with_pruned_by_stats(Some(1)) // RG4 pruned + .with_limit_pruned_row_groups(Some(0)) // No limit pruning since we need all RGs + .test_row_group_prune_with_custom_data(schema, batches, 4) + .await; + + Ok(()) +} diff --git a/datafusion/datasource-parquet/src/metrics.rs b/datafusion/datasource-parquet/src/metrics.rs index 574fe2a040eab..8c1c97873a6a6 100644 --- a/datafusion/datasource-parquet/src/metrics.rs +++ b/datafusion/datasource-parquet/src/metrics.rs @@ -48,6 +48,10 @@ pub struct ParquetFileMetrics { pub row_groups_matched_bloom_filter: Count, /// Number of row groups pruned by bloom filters pub row_groups_pruned_bloom_filter: Count, + /// Number of row groups pruned due to limit pruning. + pub limit_pruned_row_groups: Count, + /// Number of row groups whose statistics were checked and fully matched + pub row_groups_fully_matched_statistics: Count, /// Number of row groups whose statistics were checked and matched (not pruned) pub row_groups_matched_statistics: Count, /// Number of row groups pruned by statistics @@ -93,6 +97,14 @@ impl ParquetFileMetrics { .with_new_label("filename", filename.to_string()) .counter("row_groups_pruned_bloom_filter", partition); + let limit_pruned_row_groups = MetricBuilder::new(metrics) + .with_new_label("filename", filename.to_string()) + .counter("limit_pruned_row_groups", partition); + + let row_groups_fully_matched_statistics = MetricBuilder::new(metrics) + .with_new_label("filename", filename.to_string()) + .counter("row_groups_fully_matched_statistics", partition); + let row_groups_matched_statistics = MetricBuilder::new(metrics) .with_new_label("filename", filename.to_string()) .counter("row_groups_matched_statistics", partition); @@ -145,8 +157,10 @@ impl ParquetFileMetrics { predicate_evaluation_errors, row_groups_matched_bloom_filter, row_groups_pruned_bloom_filter, + row_groups_fully_matched_statistics, row_groups_matched_statistics, row_groups_pruned_statistics, + limit_pruned_row_groups, bytes_scanned, pushdown_rows_pruned, pushdown_rows_matched, diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 664b86e964eba..fc7c5611af714 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -376,8 +376,12 @@ impl FileOpener for ParquetOpener { } } - let mut access_plan = row_groups.build(); + // Prune by limit + if let Some(limit) = limit { + row_groups.prune_by_limit(limit, rg_metadata, &file_metrics); + } + let mut access_plan = row_groups.build(); // page index pruning: if all data on individual pages can // be ruled using page metadata, rows from other columns // with that range can be skipped as well diff --git a/datafusion/datasource-parquet/src/row_group_filter.rs b/datafusion/datasource-parquet/src/row_group_filter.rs index 51d50d780f103..a13143e5d1643 100644 --- a/datafusion/datasource-parquet/src/row_group_filter.rs +++ b/datafusion/datasource-parquet/src/row_group_filter.rs @@ -24,6 +24,8 @@ use arrow::datatypes::Schema; use datafusion_common::pruning::PruningStatistics; use datafusion_common::{Column, Result, ScalarValue}; use datafusion_datasource::FileRange; +use datafusion_physical_expr::expressions::NotExpr; +use datafusion_physical_expr::PhysicalExprSimplifier; use datafusion_pruning::PruningPredicate; use parquet::arrow::arrow_reader::statistics::StatisticsConverter; use parquet::arrow::parquet_column; @@ -46,13 +48,19 @@ use parquet::{ pub struct RowGroupAccessPlanFilter { /// which row groups should be accessed access_plan: ParquetAccessPlan, + /// which row groups are fully contained within the pruning predicate + is_fully_matched: Vec, } impl RowGroupAccessPlanFilter { /// Create a new `RowGroupPlanBuilder` for pruning out the groups to scan /// based on metadata and statistics pub fn new(access_plan: ParquetAccessPlan) -> Self { - Self { access_plan } + let num_row_groups = access_plan.len(); + Self { + access_plan, + is_fully_matched: vec![false; num_row_groups], + } } /// Return true if there are no row groups @@ -65,6 +73,49 @@ impl RowGroupAccessPlanFilter { self.access_plan } + /// Returns the is_fully_matched vector + pub fn is_fully_matched(&self) -> &Vec { + &self.is_fully_matched + } + + /// Prunes the access plan based on the limit and fully contained row groups. + pub fn prune_by_limit( + &mut self, + limit: usize, + rg_metadata: &[RowGroupMetaData], + metrics: &ParquetFileMetrics, + ) { + let mut fully_matched_row_group_indexes: Vec = Vec::new(); + let mut fully_matched_rows_count: usize = 0; + + // Iterate through the currently accessible row groups + for &idx in self.access_plan.row_group_indexes().iter() { + if self.is_fully_matched[idx] { + let row_group_row_count = rg_metadata[idx].num_rows() as usize; + fully_matched_row_group_indexes.push(idx); + fully_matched_rows_count += row_group_row_count; + if fully_matched_rows_count >= limit { + break; + } + } + } + + if fully_matched_rows_count >= limit { + let original_num_accessible_row_groups = + self.access_plan.row_group_indexes().len(); + let new_num_accessible_row_groups = fully_matched_row_group_indexes.len(); + let pruned_count = original_num_accessible_row_groups + .saturating_sub(new_num_accessible_row_groups); + metrics.limit_pruned_row_groups.add(pruned_count); + + let mut new_access_plan = ParquetAccessPlan::new_none(rg_metadata.len()); + for &idx in &fully_matched_row_group_indexes { + new_access_plan.scan(idx); + } + self.access_plan = new_access_plan; + } + } + /// Prune remaining row groups to only those within the specified range. /// /// Updates this set to mark row groups that should not be scanned @@ -130,15 +181,56 @@ impl RowGroupAccessPlanFilter { // try to prune the row groups in a single call match predicate.prune(&pruning_stats) { Ok(values) => { - // values[i] is false means the predicate could not be true for row group i + let mut fully_contained_candidates_original_idx: Vec = Vec::new(); for (idx, &value) in row_group_indexes.iter().zip(values.iter()) { if !value { self.access_plan.skip(*idx); metrics.row_groups_pruned_statistics.add(1); } else { + fully_contained_candidates_original_idx.push(*idx); metrics.row_groups_matched_statistics.add(1); } } + + // Note: this part of code shouldn't be expensive with a limited number of row groups + // If we do find it's expensive, we can consider optimizing it further. + if !fully_contained_candidates_original_idx.is_empty() { + // Use NotExpr to create the inverted predicate + let inverted_expr = + Arc::new(NotExpr::new(Arc::clone(predicate.orig_expr()))); + // Simplify the NOT expression (e.g., NOT(c1 = 0) -> c1 != 0) + // before building the pruning predicate + let mut simplifier = PhysicalExprSimplifier::new(arrow_schema); + let inverted_expr = simplifier.simplify(inverted_expr).unwrap(); + if let Ok(inverted_predicate) = PruningPredicate::try_new( + inverted_expr, + Arc::clone(predicate.schema()), + ) { + let inverted_pruning_stats = RowGroupPruningStatistics { + parquet_schema, + row_group_metadatas: fully_contained_candidates_original_idx + .iter() + .map(|&i| &groups[i]) + .collect::>(), + arrow_schema, + }; + + if let Ok(inverted_values) = + inverted_predicate.prune(&inverted_pruning_stats) + { + for (i, &original_row_group_idx) in + fully_contained_candidates_original_idx.iter().enumerate() + { + // If the inverted predicate *also* prunes this row group (meaning inverted_values[i] is false), + // it implies that *all* rows in this group satisfy the original predicate. + if !inverted_values[i] { + self.is_fully_matched[original_row_group_idx] = true; + metrics.row_groups_fully_matched_statistics.add(1); + } + } + } + } + } } // stats filter array could not be built, so we can't prune Err(e) => { diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index da456b7071f77..8e5a82ef998a7 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -260,7 +260,7 @@ fn insufficient_capacity_err( additional: usize, available: usize, ) -> DataFusionError { - resources_datafusion_err!("Failed to allocate additional {} for {} with {} already allocated for this reservation - {} remain available for the total pool", + resources_datafusion_err!("Failed to allocate additional {} for {} with {} already allocated for this reservation - {} remain available for the total pool", human_readable_size(additional), reservation.registration.consumer.name, human_readable_size(reservation.size), human_readable_size(available)) } diff --git a/datafusion/physical-expr/src/simplifier/mod.rs b/datafusion/physical-expr/src/simplifier/mod.rs index 80d6ee0a7b914..d0c787867dd06 100644 --- a/datafusion/physical-expr/src/simplifier/mod.rs +++ b/datafusion/physical-expr/src/simplifier/mod.rs @@ -24,8 +24,9 @@ use datafusion_common::{ }; use std::sync::Arc; -use crate::PhysicalExpr; +use crate::{simplifier::not::simplify_not_expr_recursive, PhysicalExpr}; +pub mod not; pub mod unwrap_cast; /// Simplifies physical expressions by applying various optimizations @@ -56,6 +57,11 @@ impl<'a> TreeNodeRewriter for PhysicalExprSimplifier<'a> { type Node = Arc; fn f_up(&mut self, node: Self::Node) -> Result> { + // Apply NOT expression simplification first + let not_simplified = simplify_not_expr_recursive(node, self.schema)?; + let node = not_simplified.data; + let transformed = not_simplified.transformed; + // Apply unwrap cast optimization #[cfg(test)] let original_type = node.data_type(self.schema).unwrap(); @@ -66,7 +72,12 @@ impl<'a> TreeNodeRewriter for PhysicalExprSimplifier<'a> { original_type, "Simplified expression should have the same data type as the original" ); - Ok(unwrapped) + // Combine transformation results + let final_transformed = transformed || unwrapped.transformed; + Ok(Transformed::new_transformed( + unwrapped.data, + final_transformed, + )) } } diff --git a/datafusion/physical-expr/src/simplifier/not.rs b/datafusion/physical-expr/src/simplifier/not.rs new file mode 100644 index 0000000000000..d3e69bc74904e --- /dev/null +++ b/datafusion/physical-expr/src/simplifier/not.rs @@ -0,0 +1,377 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Simplify NOT expressions in physical expressions +//! +//! This module provides optimizations for NOT expressions such as: +//! - Double negation elimination: NOT(NOT(expr)) -> expr +//! - NOT with binary comparisons: NOT(a = b) -> a != b +//! - NOT with IN expressions: NOT(a IN (list)) -> a NOT IN (list) +//! - De Morgan's laws: NOT(A AND B) -> NOT A OR NOT B +//! - Constant folding: NOT(TRUE) -> FALSE, NOT(FALSE) -> TRUE + +use std::sync::Arc; + +use arrow::datatypes::Schema; +use datafusion_common::{tree_node::Transformed, Result, ScalarValue}; +use datafusion_expr::Operator; + +use crate::expressions::{lit, BinaryExpr, Literal, NotExpr}; +use crate::PhysicalExpr; + +/// Attempts to simplify NOT expressions +pub(crate) fn simplify_not_expr( + expr: Arc, + schema: &Schema, +) -> Result>> { + // Check if this is a NOT expression + let not_expr = match expr.as_any().downcast_ref::() { + Some(not_expr) => not_expr, + None => return Ok(Transformed::no(expr)), + }; + + let inner_expr = not_expr.arg(); + + // Handle NOT(NOT(expr)) -> expr (double negation elimination) + if let Some(inner_not) = inner_expr.as_any().downcast_ref::() { + // Recursively simplify the inner expression + let simplified = + simplify_not_expr_recursive(Arc::clone(inner_not.arg()), schema)?; + // We eliminated double negation, so always return transformed=true + return Ok(Transformed::yes(simplified.data)); + } + + // Handle NOT(literal) -> !literal + if let Some(literal) = inner_expr.as_any().downcast_ref::() { + if let ScalarValue::Boolean(Some(val)) = literal.value() { + return Ok(Transformed::yes(lit(ScalarValue::Boolean(Some(!val))))); + } + if let ScalarValue::Boolean(None) = literal.value() { + return Ok(Transformed::yes(lit(ScalarValue::Boolean(None)))); + } + } + + // Handle NOT(binary_expr) where we can flip the operator + if let Some(binary_expr) = inner_expr.as_any().downcast_ref::() { + if let Some(negated_op) = negate_operator(binary_expr.op()) { + // Recursively simplify the left and right expressions first + let left_simplified = + simplify_not_expr_recursive(Arc::clone(binary_expr.left()), schema)?; + let right_simplified = + simplify_not_expr_recursive(Arc::clone(binary_expr.right()), schema)?; + + let new_binary = Arc::new(BinaryExpr::new( + left_simplified.data, + negated_op, + right_simplified.data, + )); + // We flipped the operator, so always return transformed=true + return Ok(Transformed::yes(new_binary)); + } + + // Handle De Morgan's laws for AND/OR + match binary_expr.op() { + Operator::And => { + // NOT(A AND B) -> NOT A OR NOT B + let not_left = Arc::new(NotExpr::new(Arc::clone(binary_expr.left()))); + let not_right = Arc::new(NotExpr::new(Arc::clone(binary_expr.right()))); + + // Recursively simplify the NOT expressions + let simplified_left = simplify_not_expr_recursive(not_left, schema)?; + let simplified_right = simplify_not_expr_recursive(not_right, schema)?; + + let new_binary = Arc::new(BinaryExpr::new( + simplified_left.data, + Operator::Or, + simplified_right.data, + )); + return Ok(Transformed::yes(new_binary)); + } + Operator::Or => { + // NOT(A OR B) -> NOT A AND NOT B + let not_left = Arc::new(NotExpr::new(Arc::clone(binary_expr.left()))); + let not_right = Arc::new(NotExpr::new(Arc::clone(binary_expr.right()))); + + // Recursively simplify the NOT expressions + let simplified_left = simplify_not_expr_recursive(not_left, schema)?; + let simplified_right = simplify_not_expr_recursive(not_right, schema)?; + + let new_binary = Arc::new(BinaryExpr::new( + simplified_left.data, + Operator::And, + simplified_right.data, + )); + return Ok(Transformed::yes(new_binary)); + } + _ => {} + } + } + + // If no simplification possible, return the original expression + Ok(Transformed::no(expr)) +} + +/// Helper function that recursively simplifies expressions, including NOT expressions +pub fn simplify_not_expr_recursive( + expr: Arc, + schema: &Schema, +) -> Result>> { + // First, try to simplify any NOT expressions in this expression + let not_simplified = simplify_not_expr(Arc::clone(&expr), schema)?; + + // If the expression was transformed, we might have created new opportunities for simplification + if not_simplified.transformed { + // Recursively simplify the result + let further_simplified = + simplify_not_expr_recursive(Arc::clone(¬_simplified.data), schema)?; + if further_simplified.transformed { + return Ok(Transformed::yes(further_simplified.data)); + } else { + return Ok(not_simplified); + } + } + + // If this expression wasn't a NOT expression, try to simplify its children + // This handles cases where NOT expressions might be nested deeper in the tree + if let Some(binary_expr) = expr.as_any().downcast_ref::() { + let left_simplified = + simplify_not_expr_recursive(Arc::clone(binary_expr.left()), schema)?; + let right_simplified = + simplify_not_expr_recursive(Arc::clone(binary_expr.right()), schema)?; + + if left_simplified.transformed || right_simplified.transformed { + let new_binary = Arc::new(BinaryExpr::new( + left_simplified.data, + *binary_expr.op(), + right_simplified.data, + )); + return Ok(Transformed::yes(new_binary)); + } + } + + Ok(not_simplified) +} + +/// Returns the negated version of a comparison operator, if possible +fn negate_operator(op: &Operator) -> Option { + match op { + Operator::Eq => Some(Operator::NotEq), + Operator::NotEq => Some(Operator::Eq), + Operator::Lt => Some(Operator::GtEq), + Operator::LtEq => Some(Operator::Gt), + Operator::Gt => Some(Operator::LtEq), + Operator::GtEq => Some(Operator::Lt), + Operator::IsDistinctFrom => Some(Operator::IsNotDistinctFrom), + Operator::IsNotDistinctFrom => Some(Operator::IsDistinctFrom), + // For other operators, we can't directly negate them + _ => None, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::expressions::{col, lit, BinaryExpr, NotExpr}; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::ScalarValue; + use datafusion_expr::Operator; + + fn test_schema() -> Schema { + Schema::new(vec![ + Field::new("a", DataType::Boolean, false), + Field::new("b", DataType::Int32, false), + ]) + } + + #[test] + fn test_double_negation_elimination() -> Result<()> { + let schema = test_schema(); + + // Create NOT(NOT(b > 5)) + let inner_expr: Arc = Arc::new(BinaryExpr::new( + col("b", &schema)?, + Operator::Gt, + lit(ScalarValue::Int32(Some(5))), + )); + let inner_not = Arc::new(NotExpr::new(Arc::clone(&inner_expr))); + let double_not = Arc::new(NotExpr::new(inner_not)); + + let result = simplify_not_expr_recursive(double_not, &schema)?; + + assert!(result.transformed); + // Should be simplified back to the original b > 5 + assert_eq!(result.data.to_string(), inner_expr.to_string()); + Ok(()) + } + + #[test] + fn test_not_literal() -> Result<()> { + let schema = test_schema(); + + // NOT(TRUE) -> FALSE + let not_true = Arc::new(NotExpr::new(lit(ScalarValue::Boolean(Some(true))))); + let result = simplify_not_expr(not_true, &schema)?; + assert!(result.transformed); + + if let Some(literal) = result.data.as_any().downcast_ref::() { + assert_eq!(literal.value(), &ScalarValue::Boolean(Some(false))); + } else { + panic!("Expected literal result"); + } + + // NOT(FALSE) -> TRUE + let not_false = Arc::new(NotExpr::new(lit(ScalarValue::Boolean(Some(false))))); + let result = simplify_not_expr_recursive(not_false, &schema)?; + assert!(result.transformed); + + if let Some(literal) = result.data.as_any().downcast_ref::() { + assert_eq!(literal.value(), &ScalarValue::Boolean(Some(true))); + } else { + panic!("Expected literal result"); + } + + Ok(()) + } + + #[test] + fn test_negate_comparison() -> Result<()> { + let schema = test_schema(); + + // NOT(b = 5) -> b != 5 + let eq_expr = Arc::new(BinaryExpr::new( + col("b", &schema)?, + Operator::Eq, + lit(ScalarValue::Int32(Some(5))), + )); + let not_eq = Arc::new(NotExpr::new(eq_expr)); + + let result = simplify_not_expr_recursive(not_eq, &schema)?; + assert!(result.transformed); + + if let Some(binary) = result.data.as_any().downcast_ref::() { + assert_eq!(binary.op(), &Operator::NotEq); + } else { + panic!("Expected binary expression result"); + } + + Ok(()) + } + + #[test] + fn test_demorgans_law_and() -> Result<()> { + let schema = test_schema(); + + // NOT(a AND b) -> NOT a OR NOT b + let and_expr = Arc::new(BinaryExpr::new( + col("a", &schema)?, + Operator::And, + col("b", &schema)?, + )); + let not_and = Arc::new(NotExpr::new(and_expr)); + + let result = simplify_not_expr_recursive(not_and, &schema)?; + assert!(result.transformed); + + if let Some(binary) = result.data.as_any().downcast_ref::() { + assert_eq!(binary.op(), &Operator::Or); + // Left and right should both be NOT expressions + assert!(binary.left().as_any().downcast_ref::().is_some()); + assert!(binary.right().as_any().downcast_ref::().is_some()); + } else { + panic!("Expected binary expression result"); + } + + Ok(()) + } + + #[test] + fn test_demorgans_law_or() -> Result<()> { + let schema = test_schema(); + + // NOT(a OR b) -> NOT a AND NOT b + let or_expr = Arc::new(BinaryExpr::new( + col("a", &schema)?, + Operator::Or, + col("b", &schema)?, + )); + let not_or = Arc::new(NotExpr::new(or_expr)); + + let result = simplify_not_expr_recursive(not_or, &schema)?; + assert!(result.transformed); + + if let Some(binary) = result.data.as_any().downcast_ref::() { + assert_eq!(binary.op(), &Operator::And); + // Left and right should both be NOT expressions + assert!(binary.left().as_any().downcast_ref::().is_some()); + assert!(binary.right().as_any().downcast_ref::().is_some()); + } else { + panic!("Expected binary expression result"); + } + + Ok(()) + } + + #[test] + fn test_demorgans_with_comparison_simplification() -> Result<()> { + let schema = test_schema(); + + // NOT(b = 1 AND b = 2) -> b != 1 OR b != 2 + // This tests the combination of De Morgan's law and operator negation + let eq1 = Arc::new(BinaryExpr::new( + col("b", &schema)?, + Operator::Eq, + lit(ScalarValue::Int32(Some(1))), + )); + let eq2 = Arc::new(BinaryExpr::new( + col("b", &schema)?, + Operator::Eq, + lit(ScalarValue::Int32(Some(2))), + )); + let and_expr = Arc::new(BinaryExpr::new(eq1, Operator::And, eq2)); + let not_and = Arc::new(NotExpr::new(and_expr)); + + let result = simplify_not_expr_recursive(not_and, &schema)?; + assert!(result.transformed, "Expression should be transformed"); + + // Verify the result is an OR expression + if let Some(or_binary) = result.data.as_any().downcast_ref::() { + assert_eq!(or_binary.op(), &Operator::Or, "Top level should be OR"); + + // Verify left side is b != 1 + if let Some(left_binary) = + or_binary.left().as_any().downcast_ref::() + { + assert_eq!(left_binary.op(), &Operator::NotEq, "Left should be NotEq"); + } else { + panic!("Expected left to be a binary expression with !="); + } + + // Verify right side is b != 2 + if let Some(right_binary) = + or_binary.right().as_any().downcast_ref::() + { + assert_eq!(right_binary.op(), &Operator::NotEq, "Right should be NotEq"); + } else { + panic!("Expected right to be a binary expression with !="); + } + } else { + panic!("Expected binary OR expression result"); + } + + Ok(()) + } +} diff --git a/datafusion/pruning/src/pruning_predicate.rs b/datafusion/pruning/src/pruning_predicate.rs index 5e92dbe227fdd..63e1c571a4ae1 100644 --- a/datafusion/pruning/src/pruning_predicate.rs +++ b/datafusion/pruning/src/pruning_predicate.rs @@ -473,7 +473,6 @@ impl PruningPredicate { // Simplify the newly created predicate to get rid of redundant casts, comparisons, etc. let predicate_expr = PhysicalExprSimplifier::new(&predicate_schema).simplify(predicate_expr)?; - let literal_guarantees = LiteralGuarantee::analyze(&expr); Ok(Self {