Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1253,6 +1253,7 @@ impl TableProvider for ListingTable {
.with_output_ordering(output_ordering)
.with_table_partition_cols(table_partition_cols)
.with_expr_adapter(self.expr_adapter_factory.clone())
.with_limit_pruning(limit.is_some())
.build(),
)
.await
Expand Down
16 changes: 10 additions & 6 deletions datafusion/core/tests/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,18 @@ impl TestOutput {
self.metric_value("row_groups_matched_statistics")
}

/*
/// The number of row_groups fully matched by statistics
fn row_groups_fully_matched_statistics(&self) -> Option<usize> {
self.metric_value("row_groups_fully_matched_statistics")
}

/// The number of row groups pruned by limit pruning
fn limit_pruned_row_groups(&self) -> Option<usize> {
self.metric_value("limit_pruned_row_groups")
}
*/

/// The number of row_groups pruned by statistics
fn row_groups_pruned_statistics(&self) -> Option<usize> {
self.metric_value("row_groups_pruned_statistics")
Expand Down Expand Up @@ -183,11 +190,6 @@ impl TestOutput {
self.metric_value("page_index_rows_pruned")
}

/// The number of row groups pruned by limit pruning
fn limit_pruned_row_groups(&self) -> Option<usize> {
self.metric_value("limit_pruned_row_groups")
}

fn description(&self) -> String {
format!(
"Input:\n{}\nQuery:\n{}\nOutput:\n{}\nMetrics:\n{}",
Expand All @@ -204,7 +206,8 @@ impl ContextWithParquet {
Self::with_config(scenario, unit, SessionConfig::new(), None, None).await
}

/// Set custom schema and batches for the test
// Set custom schema and batches for the test
/*
pub async fn with_custom_data(
scenario: Scenario,
unit: Unit,
Expand All @@ -220,6 +223,7 @@ impl ContextWithParquet {
)
.await
}
*/

async fn with_config(
scenario: Scenario,
Expand Down
30 changes: 16 additions & 14 deletions datafusion/core/tests/parquet/row_group_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,8 @@
//! This file contains an end to end test of parquet pruning. It writes
//! data into a parquet file and then verifies row groups are pruned as
//! expected.
use std::sync::Arc;

use arrow::array::{ArrayRef, Int32Array, RecordBatch};
use arrow_schema::{DataType, Field, Schema};
use datafusion::prelude::SessionConfig;
use datafusion_common::{DataFusionError, ScalarValue};
use datafusion_common::ScalarValue;
use itertools::Itertools;

use crate::parquet::Unit::RowGroup;
Expand All @@ -34,12 +30,12 @@ struct RowGroupPruningTest {
query: String,
expected_errors: Option<usize>,
expected_row_group_matched_by_statistics: Option<usize>,
expected_row_group_fully_matched_by_statistics: Option<usize>,
// expected_row_group_fully_matched_by_statistics: Option<usize>,
expected_row_group_pruned_by_statistics: Option<usize>,
expected_files_pruned_by_statistics: Option<usize>,
expected_row_group_matched_by_bloom_filter: Option<usize>,
expected_row_group_pruned_by_bloom_filter: Option<usize>,
expected_limit_pruned_row_groups: Option<usize>,
// expected_limit_pruned_row_groups: Option<usize>,
expected_rows: usize,
}
impl RowGroupPruningTest {
Expand All @@ -51,11 +47,11 @@ impl RowGroupPruningTest {
expected_errors: None,
expected_row_group_matched_by_statistics: None,
expected_row_group_pruned_by_statistics: None,
expected_row_group_fully_matched_by_statistics: None,
// expected_row_group_fully_matched_by_statistics: None,
expected_files_pruned_by_statistics: None,
expected_row_group_matched_by_bloom_filter: None,
expected_row_group_pruned_by_bloom_filter: None,
expected_limit_pruned_row_groups: None,
// expected_limit_pruned_row_groups: None,
expected_rows: 0,
}
}
Expand Down Expand Up @@ -85,6 +81,7 @@ impl RowGroupPruningTest {
}

// Set the expected fully matched row groups by statistics
/*
fn with_fully_matched_by_stats(
mut self,
fully_matched_by_stats: Option<usize>,
Expand All @@ -93,6 +90,12 @@ impl RowGroupPruningTest {
self
}

fn with_limit_pruned_row_groups(mut self, pruned_by_limit: Option<usize>) -> Self {
self.expected_limit_pruned_row_groups = pruned_by_limit;
self
}
*/

// Set the expected pruned row groups by statistics
fn with_pruned_by_stats(mut self, pruned_by_stats: Option<usize>) -> Self {
self.expected_row_group_pruned_by_statistics = pruned_by_stats;
Expand All @@ -116,11 +119,6 @@ impl RowGroupPruningTest {
self
}

fn with_limit_pruned_row_groups(mut self, pruned_by_limit: Option<usize>) -> Self {
self.expected_limit_pruned_row_groups = pruned_by_limit;
self
}

/// Set the number of expected rows from the output of this test
fn with_expected_rows(mut self, rows: usize) -> Self {
self.expected_rows = rows;
Expand Down Expand Up @@ -177,6 +175,7 @@ impl RowGroupPruningTest {
}

// Execute the test with the current configuration
/*
async fn test_row_group_prune_with_custom_data(
self,
schema: Arc<Schema>,
Expand Down Expand Up @@ -233,6 +232,7 @@ impl RowGroupPruningTest {
output.description(),
);
}
*/
}

#[tokio::test]
Expand Down Expand Up @@ -1721,6 +1721,7 @@ async fn test_bloom_filter_decimal_dict() {
.await;
}

/*
// Helper function to create a batch with a single Int32 column.
fn make_i32_batch(
name: &str,
Expand Down Expand Up @@ -1958,3 +1959,4 @@ async fn test_limit_pruning_exceeds_fully_matched() -> datafusion_common::error:

Ok(())
}
*/
15 changes: 13 additions & 2 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ pub(super) struct ParquetOpener {
pub enable_row_group_stats_pruning: bool,
/// Coerce INT96 timestamps to specific TimeUnit
pub coerce_int96: Option<TimeUnit>,
/// Should limit pruning be applied
pub enable_limit_pruning: bool,
/// Optional parquet FileDecryptionProperties
#[cfg(feature = "parquet_encryption")]
pub file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
Expand Down Expand Up @@ -144,6 +146,7 @@ impl FileOpener for ParquetOpener {
let enable_bloom_filter = self.enable_bloom_filter;
let enable_row_group_stats_pruning = self.enable_row_group_stats_pruning;
let limit = self.limit;
let enable_limit_pruning = self.enable_limit_pruning;

let predicate_creation_errors = MetricBuilder::new(&self.metrics)
.global_counter("num_predicate_creation_errors");
Expand Down Expand Up @@ -377,8 +380,10 @@ impl FileOpener for ParquetOpener {
}

// Prune by limit
if let Some(limit) = limit {
row_groups.prune_by_limit(limit, rg_metadata, &file_metrics);
if enable_limit_pruning {
if let Some(limit) = limit {
row_groups.prune_by_limit(limit, rg_metadata, &file_metrics);
}
}

let mut access_plan = row_groups.build();
Expand Down Expand Up @@ -826,6 +831,7 @@ mod test {
reorder_filters: false,
enable_page_index: false,
enable_bloom_filter: false,
enable_limit_pruning: false,
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
enable_row_group_stats_pruning: true,
coerce_int96: None,
Expand Down Expand Up @@ -914,6 +920,7 @@ mod test {
reorder_filters: false,
enable_page_index: false,
enable_bloom_filter: false,
enable_limit_pruning: false,
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
enable_row_group_stats_pruning: true,
coerce_int96: None,
Expand Down Expand Up @@ -1018,6 +1025,7 @@ mod test {
reorder_filters: false,
enable_page_index: false,
enable_bloom_filter: false,
enable_limit_pruning: false,
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
enable_row_group_stats_pruning: true,
coerce_int96: None,
Expand Down Expand Up @@ -1132,6 +1140,7 @@ mod test {
reorder_filters: true,
enable_page_index: false,
enable_bloom_filter: false,
enable_limit_pruning: false,
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
enable_row_group_stats_pruning: false, // note that this is false!
coerce_int96: None,
Expand Down Expand Up @@ -1247,6 +1256,7 @@ mod test {
reorder_filters: false,
enable_page_index: false,
enable_bloom_filter: false,
enable_limit_pruning: false,
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
enable_row_group_stats_pruning: true,
coerce_int96: None,
Expand Down Expand Up @@ -1429,6 +1439,7 @@ mod test {
reorder_filters: false,
enable_page_index: false,
enable_bloom_filter: false,
enable_limit_pruning: false,
schema_adapter_factory: Arc::new(CustomSchemaAdapterFactory),
enable_row_group_stats_pruning: false,
coerce_int96: None,
Expand Down
1 change: 1 addition & 0 deletions datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,7 @@ impl FileSource for ParquetSource {
enable_page_index: self.enable_page_index(),
enable_bloom_filter: self.bloom_filter_on_read(),
enable_row_group_stats_pruning: self.table_parquet_options.global.pruning,
enable_limit_pruning: base_config.limit_pruning,
schema_adapter_factory,
coerce_int96,
#[cfg(feature = "parquet_encryption")]
Expand Down
14 changes: 14 additions & 0 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ pub struct FileScanConfig {
/// Expression adapter used to adapt filters and projections that are pushed down into the scan
/// from the logical schema to the physical schema of the file.
pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
/// If there is a limit pushed down at the logical plan level, we can enable limit_pruning
pub limit_pruning: bool,
}

/// A builder for [`FileScanConfig`]'s.
Expand Down Expand Up @@ -275,6 +277,8 @@ pub struct FileScanConfigBuilder {
new_lines_in_values: Option<bool>,
batch_size: Option<usize>,
expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
/// If there is a limit pushed down at the logical plan level, we can enable limit_pruning
limit_pruning: bool,
}

impl FileScanConfigBuilder {
Expand Down Expand Up @@ -304,6 +308,7 @@ impl FileScanConfigBuilder {
constraints: None,
batch_size: None,
expr_adapter_factory: None,
limit_pruning: false,
}
}

Expand Down Expand Up @@ -426,6 +431,12 @@ impl FileScanConfigBuilder {
self
}

/// Enable or disable limit pruning.
pub fn with_limit_pruning(mut self, limit_pruning: bool) -> Self {
self.limit_pruning = limit_pruning;
self
}

/// Build the final [`FileScanConfig`] with all the configured settings.
///
/// This method takes ownership of the builder and returns the constructed `FileScanConfig`.
Expand All @@ -446,6 +457,7 @@ impl FileScanConfigBuilder {
new_lines_in_values,
batch_size,
expr_adapter_factory: expr_adapter,
limit_pruning,
} = self;

let constraints = constraints.unwrap_or_default();
Expand Down Expand Up @@ -473,6 +485,7 @@ impl FileScanConfigBuilder {
new_lines_in_values,
batch_size,
expr_adapter_factory: expr_adapter,
limit_pruning,
}
}
}
Expand All @@ -494,6 +507,7 @@ impl From<FileScanConfig> for FileScanConfigBuilder {
constraints: Some(config.constraints),
batch_size: config.batch_size,
expr_adapter_factory: config.expr_adapter_factory,
limit_pruning: config.limit_pruning,
}
}
}
Expand Down
Loading