Skip to content

Commit 021188e

Browse files
authored
Push down projection expressions into ParquetOpener (#19111)
Closes #14993 Once this is merged I think we can say we support projection expression pushdown into scans and it is implemented for Parquet. Remaining TODOs which I think should be tracked in other issues (I'll find them or create them later): - Support for pushing down struct field access into the Parquet scan (#11745) - Refactor other file sources that could push down expressions to use the same approach as Parquet (Avro?) - Replace remaining uses of `SchemaAdapter` with `PhysicalExprAdapter` and decide if we want to actually deprecate `SchemaAdapter`
1 parent 2a08013 commit 021188e

File tree

4 files changed

+236
-198
lines changed

4 files changed

+236
-198
lines changed

datafusion/datasource-parquet/src/opener.rs

Lines changed: 94 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use crate::{
2323
apply_file_schema_type_coercions, coerce_int96_to_resolution, row_filter,
2424
ParquetAccessPlan, ParquetFileMetrics, ParquetFileReaderFactory,
2525
};
26-
use arrow::array::RecordBatch;
26+
use arrow::array::{RecordBatch, RecordBatchOptions};
2727
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
2828
use datafusion_physical_expr::projection::ProjectionExprs;
2929
use datafusion_physical_expr::utils::reassign_expr_columns;
@@ -66,8 +66,8 @@ use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader};
6666
pub(super) struct ParquetOpener {
6767
/// Execution partition index
6868
pub partition_index: usize,
69-
/// Column indexes in `table_schema` needed by the query
70-
pub projection: Arc<[usize]>,
69+
/// Projection to apply on top of the table schema (i.e. can reference partition columns).
70+
pub projection: ProjectionExprs,
7171
/// Target number of rows in each output RecordBatch
7272
pub batch_size: usize,
7373
/// Optional limit on the number of rows to read
@@ -104,7 +104,7 @@ pub(super) struct ParquetOpener {
104104
#[cfg(feature = "parquet_encryption")]
105105
pub file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
106106
/// Rewrite expressions in the context of the file schema
107-
pub(crate) expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
107+
pub(crate) expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory>,
108108
/// Optional factory to create file decryption properties dynamically
109109
#[cfg(feature = "parquet_encryption")]
110110
pub encryption_factory:
@@ -137,7 +137,7 @@ impl FileOpener for ParquetOpener {
137137

138138
let batch_size = self.batch_size;
139139

140-
// Replace any partition column references in the predicate
140+
// Build partition values map for replacing partition column references
141141
// with their literal values from this file's partition values.
142142
//
143143
// For example, given
@@ -157,29 +157,39 @@ impl FileOpener for ParquetOpener {
157157
// there are other cases where partition columns may appear in more
158158
// complex predicates that cannot be simplified until we are about to
159159
// open the file (such as dynamic predicates)
160-
let mut predicate = {
161-
// Only replace partition columns if there are partition values.
162-
// For non-partitioned tables, skip the clone and replacement traversal.
163-
if partitioned_file.partition_values.is_empty() {
164-
self.predicate.clone()
165-
} else {
166-
let partition_values: HashMap<&str, &ScalarValue> = self
167-
.table_schema
168-
.table_partition_cols()
169-
.iter()
170-
.zip(partitioned_file.partition_values.iter())
171-
.map(|(field, value)| (field.name().as_str(), value))
172-
.collect();
173-
174-
self.predicate
175-
.clone()
176-
.map(|p| replace_columns_with_literals(p, &partition_values))
177-
.transpose()?
178-
}
179-
};
160+
let partition_values: HashMap<&str, &ScalarValue> = self
161+
.table_schema
162+
.table_partition_cols()
163+
.iter()
164+
.zip(partitioned_file.partition_values.iter())
165+
.map(|(field, value)| (field.name().as_str(), value))
166+
.collect();
167+
168+
// Calculate the output schema from the original projection (before literal replacement)
169+
// so we get correct field names from column references
170+
let logical_file_schema = Arc::clone(self.table_schema.file_schema());
171+
let output_schema = Arc::new(
172+
self.projection
173+
.project_schema(self.table_schema.table_schema())?,
174+
);
180175

181-
let projection = Arc::clone(&self.projection);
182-
let logical_file_schema = Arc::clone(self.table_schema.table_schema());
176+
// Apply partition column replacement to projection expressions
177+
let mut projection = self.projection.clone();
178+
if !partition_values.is_empty() {
179+
projection = projection.try_map_exprs(|expr| {
180+
replace_columns_with_literals(Arc::clone(&expr), &partition_values)
181+
})?;
182+
}
183+
184+
// Apply partition column replacement to predicate
185+
let mut predicate = if partition_values.is_empty() {
186+
self.predicate.clone()
187+
} else {
188+
self.predicate
189+
.clone()
190+
.map(|p| replace_columns_with_literals(p, &partition_values))
191+
.transpose()?
192+
};
183193
let reorder_predicates = self.reorder_filters;
184194
let pushdown_filters = self.pushdown_filters;
185195
let force_filter_selections = self.force_filter_selections;
@@ -191,8 +201,7 @@ impl FileOpener for ParquetOpener {
191201
let predicate_creation_errors = MetricBuilder::new(&self.metrics)
192202
.global_counter("num_predicate_creation_errors");
193203

194-
let expr_adapter_factory = self.expr_adapter_factory.clone();
195-
let mut predicate_file_schema = Arc::clone(self.table_schema.file_schema());
204+
let expr_adapter_factory = Arc::clone(&self.expr_adapter_factory);
196205

197206
let enable_page_index = self.enable_page_index;
198207
#[cfg(feature = "parquet_encryption")]
@@ -311,30 +320,29 @@ impl FileOpener for ParquetOpener {
311320
}
312321
}
313322

314-
// Adapt the predicate to the physical file schema.
323+
// Adapt the projection & filter predicate to the physical file schema.
315324
// This evaluates missing columns and inserts any necessary casts.
316-
if let Some(expr_adapter_factory) = expr_adapter_factory.as_ref() {
317-
predicate = predicate
318-
.map(|p| {
319-
let expr = expr_adapter_factory
320-
.create(
321-
Arc::clone(&logical_file_schema),
322-
Arc::clone(&physical_file_schema),
323-
)
324-
.rewrite(p)?;
325-
// After rewriting to the file schema, further simplifications may be possible.
326-
// For example, if `'a' = col_that_is_missing` becomes `'a' = NULL` that can then be simplified to `FALSE`
327-
// and we can avoid doing any more work on the file (bloom filters, loading the page index, etc.).
328-
PhysicalExprSimplifier::new(&physical_file_schema).simplify(expr)
329-
})
330-
.transpose()?;
331-
predicate_file_schema = Arc::clone(&physical_file_schema);
332-
}
325+
// After rewriting to the file schema, further simplifications may be possible.
326+
// For example, if `'a' = col_that_is_missing` becomes `'a' = NULL` that can then be simplified to `FALSE`
327+
// and we can avoid doing any more work on the file (bloom filters, loading the page index, etc.).
328+
// Additionally, if any casts were inserted we can move casts from the column to the literal side:
329+
// `CAST(col AS INT) = 5` can become `col = CAST(5 AS <col type>)`, which can be evaluated statically.
330+
let rewriter = expr_adapter_factory.create(
331+
Arc::clone(&logical_file_schema),
332+
Arc::clone(&physical_file_schema),
333+
);
334+
let simplifier = PhysicalExprSimplifier::new(&physical_file_schema);
335+
predicate = predicate
336+
.map(|p| simplifier.simplify(rewriter.rewrite(p)?))
337+
.transpose()?;
338+
// Adapt projections to the physical file schema as well
339+
projection = projection
340+
.try_map_exprs(|p| simplifier.simplify(rewriter.rewrite(p)?))?;
333341

334342
// Build predicates for this specific file
335343
let (pruning_predicate, page_pruning_predicate) = build_pruning_predicates(
336344
predicate.as_ref(),
337-
&predicate_file_schema,
345+
&physical_file_schema,
338346
&predicate_creation_errors,
339347
);
340348

@@ -358,15 +366,6 @@ impl FileOpener for ParquetOpener {
358366
reader_metadata,
359367
);
360368

361-
let mut projection =
362-
ProjectionExprs::from_indices(&projection, &logical_file_schema);
363-
if let Some(expr_adapter_factory) = expr_adapter_factory {
364-
let adapter = expr_adapter_factory.create(
365-
Arc::clone(&logical_file_schema),
366-
Arc::clone(&physical_file_schema),
367-
);
368-
projection = projection.try_map_exprs(|expr| adapter.rewrite(expr))?;
369-
}
370369
let indices = projection.column_indices();
371370

372371
let mask = ProjectionMask::roots(builder.parquet_schema(), indices);
@@ -376,7 +375,6 @@ impl FileOpener for ParquetOpener {
376375
let row_filter = row_filter::build_row_filter(
377376
&predicate,
378377
&physical_file_schema,
379-
&predicate_file_schema,
380378
builder.metadata(),
381379
reorder_predicates,
382380
&file_metrics,
@@ -507,6 +505,9 @@ impl FileOpener for ParquetOpener {
507505
let predicate_cache_records = file_metrics.predicate_cache_records.clone();
508506

509507
let stream_schema = Arc::clone(stream.schema());
508+
// Check if we need to replace the schema to handle things like differing nullability or metadata.
509+
// See note below about file vs. output schema.
510+
let replace_schema = !stream_schema.eq(&output_schema);
510511

511512
// Rebase column indices to match the narrowed stream schema.
512513
// The projection expressions have indices based on physical_file_schema,
@@ -517,13 +518,34 @@ impl FileOpener for ParquetOpener {
517518
let projector = projection.make_projector(&stream_schema)?;
518519

519520
let stream = stream.map_err(DataFusionError::from).map(move |b| {
520-
b.and_then(|b| {
521+
b.and_then(|mut b| {
521522
copy_arrow_reader_metrics(
522523
&arrow_reader_metrics,
523524
&predicate_cache_inner_records,
524525
&predicate_cache_records,
525526
);
526-
projector.project_batch(&b)
527+
b = projector.project_batch(&b)?;
528+
if replace_schema {
529+
// Ensure the output batch has the expected schema.
530+
// This handles things like schema level and field level metadata, which may not be present
531+
// in the physical file schema.
532+
// It is also possible for nullability to differ; some writers create files with
533+
// OPTIONAL fields even when there are no nulls in the data.
534+
// In these cases it may make sense for the logical schema to be `NOT NULL`.
535+
// RecordBatch::try_new_with_options checks that if the schema is NOT NULL
536+
// the array cannot contain nulls, amongst other checks.
537+
let (_stream_schema, arrays, num_rows) = b.into_parts();
538+
let options =
539+
RecordBatchOptions::new().with_row_count(Some(num_rows));
540+
RecordBatch::try_new_with_options(
541+
Arc::clone(&output_schema),
542+
arrays,
543+
&options,
544+
)
545+
.map_err(Into::into)
546+
} else {
547+
Ok(b)
548+
}
527549
})
528550
});
529551

@@ -826,7 +848,8 @@ mod test {
826848
use datafusion_datasource::{file_stream::FileOpener, PartitionedFile, TableSchema};
827849
use datafusion_expr::{col, lit};
828850
use datafusion_physical_expr::{
829-
expressions::DynamicFilterPhysicalExpr, planner::logical2physical, PhysicalExpr,
851+
expressions::DynamicFilterPhysicalExpr, planner::logical2physical,
852+
projection::ProjectionExprs, PhysicalExpr,
830853
};
831854
use datafusion_physical_expr_adapter::DefaultPhysicalExprAdapterFactory;
832855
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
@@ -910,7 +933,7 @@ mod test {
910933
let make_opener = |predicate| {
911934
ParquetOpener {
912935
partition_index: 0,
913-
projection: Arc::new([0, 1]),
936+
projection: ProjectionExprs::from_indices(&[0, 1], &schema),
914937
batch_size: 1024,
915938
limit: None,
916939
predicate: Some(predicate),
@@ -929,7 +952,7 @@ mod test {
929952
coerce_int96: None,
930953
#[cfg(feature = "parquet_encryption")]
931954
file_decryption_properties: None,
932-
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
955+
expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory),
933956
#[cfg(feature = "parquet_encryption")]
934957
encryption_factory: None,
935958
max_predicate_cache_size: None,
@@ -978,7 +1001,7 @@ mod test {
9781001
let make_opener = |predicate| {
9791002
ParquetOpener {
9801003
partition_index: 0,
981-
projection: Arc::new([0]),
1004+
projection: ProjectionExprs::from_indices(&[0], &file_schema),
9821005
batch_size: 1024,
9831006
limit: None,
9841007
predicate: Some(predicate),
@@ -1000,7 +1023,7 @@ mod test {
10001023
coerce_int96: None,
10011024
#[cfg(feature = "parquet_encryption")]
10021025
file_decryption_properties: None,
1003-
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
1026+
expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory),
10041027
#[cfg(feature = "parquet_encryption")]
10051028
encryption_factory: None,
10061029
max_predicate_cache_size: None,
@@ -1065,7 +1088,7 @@ mod test {
10651088
let make_opener = |predicate| {
10661089
ParquetOpener {
10671090
partition_index: 0,
1068-
projection: Arc::new([0]),
1091+
projection: ProjectionExprs::from_indices(&[0], &file_schema),
10691092
batch_size: 1024,
10701093
limit: None,
10711094
predicate: Some(predicate),
@@ -1087,7 +1110,7 @@ mod test {
10871110
coerce_int96: None,
10881111
#[cfg(feature = "parquet_encryption")]
10891112
file_decryption_properties: None,
1090-
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
1113+
expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory),
10911114
#[cfg(feature = "parquet_encryption")]
10921115
encryption_factory: None,
10931116
max_predicate_cache_size: None,
@@ -1155,7 +1178,7 @@ mod test {
11551178
let make_opener = |predicate| {
11561179
ParquetOpener {
11571180
partition_index: 0,
1158-
projection: Arc::new([0]),
1181+
projection: ProjectionExprs::from_indices(&[0], &file_schema),
11591182
batch_size: 1024,
11601183
limit: None,
11611184
predicate: Some(predicate),
@@ -1177,7 +1200,7 @@ mod test {
11771200
coerce_int96: None,
11781201
#[cfg(feature = "parquet_encryption")]
11791202
file_decryption_properties: None,
1180-
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
1203+
expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory),
11811204
#[cfg(feature = "parquet_encryption")]
11821205
encryption_factory: None,
11831206
max_predicate_cache_size: None,
@@ -1253,7 +1276,7 @@ mod test {
12531276
let make_opener = |predicate| {
12541277
ParquetOpener {
12551278
partition_index: 0,
1256-
projection: Arc::new([0]),
1279+
projection: ProjectionExprs::from_indices(&[0], &file_schema),
12571280
batch_size: 1024,
12581281
limit: None,
12591282
predicate: Some(predicate),
@@ -1275,7 +1298,7 @@ mod test {
12751298
coerce_int96: None,
12761299
#[cfg(feature = "parquet_encryption")]
12771300
file_decryption_properties: None,
1278-
expr_adapter_factory: Some(Arc::new(DefaultPhysicalExprAdapterFactory)),
1301+
expr_adapter_factory: Arc::new(DefaultPhysicalExprAdapterFactory),
12791302
#[cfg(feature = "parquet_encryption")]
12801303
encryption_factory: None,
12811304
max_predicate_cache_size: None,

0 commit comments

Comments
 (0)