diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 14b5bfa54eda2..82ab966dbd8b8 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1253,6 +1253,7 @@ impl TableProvider for ListingTable { .with_output_ordering(output_ordering) .with_table_partition_cols(table_partition_cols) .with_expr_adapter(self.expr_adapter_factory.clone()) + .with_limit_pruning(limit.is_some()) .build(), ) .await diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index dad8472dedc42..6ce962f1b786c 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -150,11 +150,18 @@ impl TestOutput { self.metric_value("row_groups_matched_statistics") } + /* /// The number of row_groups fully matched by statistics fn row_groups_fully_matched_statistics(&self) -> Option { self.metric_value("row_groups_fully_matched_statistics") } + /// The number of row groups pruned by limit pruning + fn limit_pruned_row_groups(&self) -> Option { + self.metric_value("limit_pruned_row_groups") + } + */ + /// The number of row_groups pruned by statistics fn row_groups_pruned_statistics(&self) -> Option { self.metric_value("row_groups_pruned_statistics") @@ -183,11 +190,6 @@ impl TestOutput { self.metric_value("page_index_rows_pruned") } - /// The number of row groups pruned by limit pruning - fn limit_pruned_row_groups(&self) -> Option { - self.metric_value("limit_pruned_row_groups") - } - fn description(&self) -> String { format!( "Input:\n{}\nQuery:\n{}\nOutput:\n{}\nMetrics:\n{}", @@ -204,7 +206,8 @@ impl ContextWithParquet { Self::with_config(scenario, unit, SessionConfig::new(), None, None).await } - /// Set custom schema and batches for the test + // Set custom schema and batches for the test + /* pub async fn with_custom_data( scenario: Scenario, unit: Unit, @@ -220,6 +223,7 @@ impl ContextWithParquet { ) .await } + */ async fn with_config( scenario: Scenario, diff --git a/datafusion/core/tests/parquet/row_group_pruning.rs b/datafusion/core/tests/parquet/row_group_pruning.rs index 42c4488d8444a..81cbdfb27121e 100644 --- a/datafusion/core/tests/parquet/row_group_pruning.rs +++ b/datafusion/core/tests/parquet/row_group_pruning.rs @@ -18,12 +18,8 @@ //! This file contains an end to end test of parquet pruning. It writes //! data into a parquet file and then verifies row groups are pruned as //! expected. -use std::sync::Arc; - -use arrow::array::{ArrayRef, Int32Array, RecordBatch}; -use arrow_schema::{DataType, Field, Schema}; use datafusion::prelude::SessionConfig; -use datafusion_common::{DataFusionError, ScalarValue}; +use datafusion_common::ScalarValue; use itertools::Itertools; use crate::parquet::Unit::RowGroup; @@ -34,12 +30,12 @@ struct RowGroupPruningTest { query: String, expected_errors: Option, expected_row_group_matched_by_statistics: Option, - expected_row_group_fully_matched_by_statistics: Option, + // expected_row_group_fully_matched_by_statistics: Option, expected_row_group_pruned_by_statistics: Option, expected_files_pruned_by_statistics: Option, expected_row_group_matched_by_bloom_filter: Option, expected_row_group_pruned_by_bloom_filter: Option, - expected_limit_pruned_row_groups: Option, + // expected_limit_pruned_row_groups: Option, expected_rows: usize, } impl RowGroupPruningTest { @@ -51,11 +47,11 @@ impl RowGroupPruningTest { expected_errors: None, expected_row_group_matched_by_statistics: None, expected_row_group_pruned_by_statistics: None, - expected_row_group_fully_matched_by_statistics: None, + // expected_row_group_fully_matched_by_statistics: None, expected_files_pruned_by_statistics: None, expected_row_group_matched_by_bloom_filter: None, expected_row_group_pruned_by_bloom_filter: None, - expected_limit_pruned_row_groups: None, + // expected_limit_pruned_row_groups: None, expected_rows: 0, } } @@ -85,6 +81,7 @@ impl RowGroupPruningTest { } // Set the expected fully matched row groups by statistics + /* fn with_fully_matched_by_stats( mut self, fully_matched_by_stats: Option, @@ -93,6 +90,12 @@ impl RowGroupPruningTest { self } + fn with_limit_pruned_row_groups(mut self, pruned_by_limit: Option) -> Self { + self.expected_limit_pruned_row_groups = pruned_by_limit; + self + } + */ + // Set the expected pruned row groups by statistics fn with_pruned_by_stats(mut self, pruned_by_stats: Option) -> Self { self.expected_row_group_pruned_by_statistics = pruned_by_stats; @@ -116,11 +119,6 @@ impl RowGroupPruningTest { self } - fn with_limit_pruned_row_groups(mut self, pruned_by_limit: Option) -> Self { - self.expected_limit_pruned_row_groups = pruned_by_limit; - self - } - /// Set the number of expected rows from the output of this test fn with_expected_rows(mut self, rows: usize) -> Self { self.expected_rows = rows; @@ -177,6 +175,7 @@ impl RowGroupPruningTest { } // Execute the test with the current configuration + /* async fn test_row_group_prune_with_custom_data( self, schema: Arc, @@ -233,6 +232,7 @@ impl RowGroupPruningTest { output.description(), ); } + */ } #[tokio::test] @@ -1721,6 +1721,7 @@ async fn test_bloom_filter_decimal_dict() { .await; } +/* // Helper function to create a batch with a single Int32 column. fn make_i32_batch( name: &str, @@ -1958,3 +1959,4 @@ async fn test_limit_pruning_exceeds_fully_matched() -> datafusion_common::error: Ok(()) } +*/ diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index fc7c5611af714..3b1f1ae35917b 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -97,6 +97,8 @@ pub(super) struct ParquetOpener { pub enable_row_group_stats_pruning: bool, /// Coerce INT96 timestamps to specific TimeUnit pub coerce_int96: Option, + /// Should limit pruning be applied + pub enable_limit_pruning: bool, /// Optional parquet FileDecryptionProperties #[cfg(feature = "parquet_encryption")] pub file_decryption_properties: Option>, @@ -144,6 +146,7 @@ impl FileOpener for ParquetOpener { let enable_bloom_filter = self.enable_bloom_filter; let enable_row_group_stats_pruning = self.enable_row_group_stats_pruning; let limit = self.limit; + let enable_limit_pruning = self.enable_limit_pruning; let predicate_creation_errors = MetricBuilder::new(&self.metrics) .global_counter("num_predicate_creation_errors"); @@ -377,8 +380,10 @@ impl FileOpener for ParquetOpener { } // Prune by limit - if let Some(limit) = limit { - row_groups.prune_by_limit(limit, rg_metadata, &file_metrics); + if enable_limit_pruning { + if let Some(limit) = limit { + row_groups.prune_by_limit(limit, rg_metadata, &file_metrics); + } } let mut access_plan = row_groups.build(); @@ -826,6 +831,7 @@ mod test { reorder_filters: false, enable_page_index: false, enable_bloom_filter: false, + enable_limit_pruning: false, schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), enable_row_group_stats_pruning: true, coerce_int96: None, @@ -914,6 +920,7 @@ mod test { reorder_filters: false, enable_page_index: false, enable_bloom_filter: false, + enable_limit_pruning: false, schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), enable_row_group_stats_pruning: true, coerce_int96: None, @@ -1018,6 +1025,7 @@ mod test { reorder_filters: false, enable_page_index: false, enable_bloom_filter: false, + enable_limit_pruning: false, schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), enable_row_group_stats_pruning: true, coerce_int96: None, @@ -1132,6 +1140,7 @@ mod test { reorder_filters: true, enable_page_index: false, enable_bloom_filter: false, + enable_limit_pruning: false, schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), enable_row_group_stats_pruning: false, // note that this is false! coerce_int96: None, @@ -1247,6 +1256,7 @@ mod test { reorder_filters: false, enable_page_index: false, enable_bloom_filter: false, + enable_limit_pruning: false, schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory), enable_row_group_stats_pruning: true, coerce_int96: None, @@ -1429,6 +1439,7 @@ mod test { reorder_filters: false, enable_page_index: false, enable_bloom_filter: false, + enable_limit_pruning: false, schema_adapter_factory: Arc::new(CustomSchemaAdapterFactory), enable_row_group_stats_pruning: false, coerce_int96: None, diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 644cea85ca0a9..3bfc1463ae960 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -576,6 +576,7 @@ impl FileSource for ParquetSource { enable_page_index: self.enable_page_index(), enable_bloom_filter: self.bloom_filter_on_read(), enable_row_group_stats_pruning: self.table_parquet_options.global.pruning, + enable_limit_pruning: base_config.limit_pruning, schema_adapter_factory, coerce_int96, #[cfg(feature = "parquet_encryption")] diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 4e2235eae8fec..e5bbbdb028e7d 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -196,6 +196,8 @@ pub struct FileScanConfig { /// Expression adapter used to adapt filters and projections that are pushed down into the scan /// from the logical schema to the physical schema of the file. pub expr_adapter_factory: Option>, + /// If there is a limit pushed down at the logical plan level, we can enable limit_pruning + pub limit_pruning: bool, } /// A builder for [`FileScanConfig`]'s. @@ -275,6 +277,8 @@ pub struct FileScanConfigBuilder { new_lines_in_values: Option, batch_size: Option, expr_adapter_factory: Option>, + /// If there is a limit pushed down at the logical plan level, we can enable limit_pruning + limit_pruning: bool, } impl FileScanConfigBuilder { @@ -304,6 +308,7 @@ impl FileScanConfigBuilder { constraints: None, batch_size: None, expr_adapter_factory: None, + limit_pruning: false, } } @@ -426,6 +431,12 @@ impl FileScanConfigBuilder { self } + /// Enable or disable limit pruning. + pub fn with_limit_pruning(mut self, limit_pruning: bool) -> Self { + self.limit_pruning = limit_pruning; + self + } + /// Build the final [`FileScanConfig`] with all the configured settings. /// /// This method takes ownership of the builder and returns the constructed `FileScanConfig`. @@ -446,6 +457,7 @@ impl FileScanConfigBuilder { new_lines_in_values, batch_size, expr_adapter_factory: expr_adapter, + limit_pruning, } = self; let constraints = constraints.unwrap_or_default(); @@ -473,6 +485,7 @@ impl FileScanConfigBuilder { new_lines_in_values, batch_size, expr_adapter_factory: expr_adapter, + limit_pruning, } } } @@ -494,6 +507,7 @@ impl From for FileScanConfigBuilder { constraints: Some(config.constraints), batch_size: config.batch_size, expr_adapter_factory: config.expr_adapter_factory, + limit_pruning: config.limit_pruning, } } }