Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1253,6 +1253,7 @@ impl TableProvider for ListingTable {
.with_output_ordering(output_ordering)
.with_table_partition_cols(table_partition_cols)
.with_expr_adapter(self.expr_adapter_factory.clone())
.with_limit_pruning(limit.is_some())
.build(),
)
.await
Expand Down
15 changes: 13 additions & 2 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ pub(super) struct ParquetOpener {
pub enable_row_group_stats_pruning: bool,
/// Coerce INT96 timestamps to specific TimeUnit
pub coerce_int96: Option<TimeUnit>,
/// Should limit pruning be applied
pub enable_limit_pruning: bool,
/// Optional parquet FileDecryptionProperties
#[cfg(feature = "parquet_encryption")]
pub file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
Expand Down Expand Up @@ -144,6 +146,7 @@ impl FileOpener for ParquetOpener {
let enable_bloom_filter = self.enable_bloom_filter;
let enable_row_group_stats_pruning = self.enable_row_group_stats_pruning;
let limit = self.limit;
let enable_limit_pruning = self.enable_limit_pruning;

let predicate_creation_errors = MetricBuilder::new(&self.metrics)
.global_counter("num_predicate_creation_errors");
Expand Down Expand Up @@ -377,8 +380,10 @@ impl FileOpener for ParquetOpener {
}

// Prune by limit
if let Some(limit) = limit {
row_groups.prune_by_limit(limit, rg_metadata, &file_metrics);
if enable_limit_pruning {
if let Some(limit) = limit {
row_groups.prune_by_limit(limit, rg_metadata, &file_metrics);
}
}

let mut access_plan = row_groups.build();
Expand Down Expand Up @@ -826,6 +831,7 @@ mod test {
reorder_filters: false,
enable_page_index: false,
enable_bloom_filter: false,
enable_limit_pruning: false,
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
enable_row_group_stats_pruning: true,
coerce_int96: None,
Expand Down Expand Up @@ -914,6 +920,7 @@ mod test {
reorder_filters: false,
enable_page_index: false,
enable_bloom_filter: false,
enable_limit_pruning: false,
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
enable_row_group_stats_pruning: true,
coerce_int96: None,
Expand Down Expand Up @@ -1018,6 +1025,7 @@ mod test {
reorder_filters: false,
enable_page_index: false,
enable_bloom_filter: false,
enable_limit_pruning: false,
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
enable_row_group_stats_pruning: true,
coerce_int96: None,
Expand Down Expand Up @@ -1132,6 +1140,7 @@ mod test {
reorder_filters: true,
enable_page_index: false,
enable_bloom_filter: false,
enable_limit_pruning: false,
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
enable_row_group_stats_pruning: false, // note that this is false!
coerce_int96: None,
Expand Down Expand Up @@ -1247,6 +1256,7 @@ mod test {
reorder_filters: false,
enable_page_index: false,
enable_bloom_filter: false,
enable_limit_pruning: false,
schema_adapter_factory: Arc::new(DefaultSchemaAdapterFactory),
enable_row_group_stats_pruning: true,
coerce_int96: None,
Expand Down Expand Up @@ -1429,6 +1439,7 @@ mod test {
reorder_filters: false,
enable_page_index: false,
enable_bloom_filter: false,
enable_limit_pruning: false,
schema_adapter_factory: Arc::new(CustomSchemaAdapterFactory),
enable_row_group_stats_pruning: false,
coerce_int96: None,
Expand Down
1 change: 1 addition & 0 deletions datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,7 @@ impl FileSource for ParquetSource {
enable_page_index: self.enable_page_index(),
enable_bloom_filter: self.bloom_filter_on_read(),
enable_row_group_stats_pruning: self.table_parquet_options.global.pruning,
enable_limit_pruning: base_config.limit_pruning,
schema_adapter_factory,
coerce_int96,
#[cfg(feature = "parquet_encryption")]
Expand Down
14 changes: 14 additions & 0 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ pub struct FileScanConfig {
/// Expression adapter used to adapt filters and projections that are pushed down into the scan
/// from the logical schema to the physical schema of the file.
pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
/// If there is a limit pushed down at the logical plan level, we can enable limit_pruning
pub limit_pruning: bool,
}

/// A builder for [`FileScanConfig`]'s.
Expand Down Expand Up @@ -275,6 +277,8 @@ pub struct FileScanConfigBuilder {
new_lines_in_values: Option<bool>,
batch_size: Option<usize>,
expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
/// If there is a limit pushed down at the logical plan level, we can enable limit_pruning
limit_pruning: bool,
}

impl FileScanConfigBuilder {
Expand Down Expand Up @@ -304,6 +308,7 @@ impl FileScanConfigBuilder {
constraints: None,
batch_size: None,
expr_adapter_factory: None,
limit_pruning: false,
}
}

Expand Down Expand Up @@ -426,6 +431,12 @@ impl FileScanConfigBuilder {
self
}

/// Enable or disable limit pruning.
pub fn with_limit_pruning(mut self, limit_pruning: bool) -> Self {
self.limit_pruning = limit_pruning;
self
}

/// Build the final [`FileScanConfig`] with all the configured settings.
///
/// This method takes ownership of the builder and returns the constructed `FileScanConfig`.
Expand All @@ -446,6 +457,7 @@ impl FileScanConfigBuilder {
new_lines_in_values,
batch_size,
expr_adapter_factory: expr_adapter,
limit_pruning,
} = self;

let constraints = constraints.unwrap_or_default();
Expand Down Expand Up @@ -473,6 +485,7 @@ impl FileScanConfigBuilder {
new_lines_in_values,
batch_size,
expr_adapter_factory: expr_adapter,
limit_pruning,
}
}
}
Expand All @@ -494,6 +507,7 @@ impl From<FileScanConfig> for FileScanConfigBuilder {
constraints: Some(config.constraints),
batch_size: config.batch_size,
expr_adapter_factory: config.expr_adapter_factory,
limit_pruning: config.limit_pruning,
}
}
}
Expand Down
Loading